diff options
author | Guillaume Seguin <guillaume@segu.in> | 2009-11-16 00:55:08 +0100 |
---|---|---|
committer | Guillaume Seguin <guillaume@segu.in> | 2009-11-16 00:55:08 +0100 |
commit | 5626e1bdbfccb0671c969eff1d6232d28ce0380b (patch) | |
tree | 90d102439b7432747104a0acdbc1d6a96a9b2e6e | |
download | cloudpy-5626e1bdbfccb0671c969eff1d6232d28ce0380b.tar.gz cloudpy-5626e1bdbfccb0671c969eff1d6232d28ce0380b.tar.bz2 |
Initial import (only heartbeat is functionnal for now)
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | cloudpy/__init__.py | 0 | ||||
-rw-r--r-- | cloudpy/config.py | 25 | ||||
-rw-r--r-- | cloudpy/connection.py | 214 | ||||
-rw-r--r-- | cloudpy/data.py | 0 | ||||
-rw-r--r-- | cloudpy/database.py | 0 | ||||
-rw-r--r-- | cloudpy/event.py | 95 | ||||
-rw-r--r-- | cloudpy/event_base.py | 47 | ||||
-rw-r--r-- | cloudpy/heartbeat/__init__.py | 167 | ||||
-rw-r--r-- | cloudpy/heartbeat/events.py | 91 | ||||
-rw-r--r-- | cloudpy/helpers.py | 0 | ||||
-rw-r--r-- | cloudpy/package.py | 62 | ||||
-rw-r--r-- | cloudpy/task.py | 0 | ||||
-rw-r--r-- | cloudpy/uid.py | 4 | ||||
-rw-r--r-- | cloudpy/workqueue.py | 0 | ||||
-rw-r--r-- | deploy.py | 30 | ||||
-rw-r--r-- | heartbeat.py | 14 | ||||
-rw-r--r-- | setup-workers.py | 115 | ||||
-rw-r--r-- | undeploy.py | 17 | ||||
-rw-r--r-- | worker.py | 81 |
20 files changed, 965 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2d414b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.swp +*.swo +*.pyc diff --git a/cloudpy/__init__.py b/cloudpy/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/cloudpy/__init__.py diff --git a/cloudpy/config.py b/cloudpy/config.py new file mode 100644 index 0000000..1ea3c09 --- /dev/null +++ b/cloudpy/config.py @@ -0,0 +1,25 @@ +import yaml + +config = None +config_file = "config.yml" +worker_config_file = "worker_config.yml" + +def load_config (config_file = config_file): + with open (config_file) as f: + config = yaml.load (f) + return config + +def save_config (config, config_file = config_file): + with open (config_file, "w") as f: + yaml.dump (config, f) + +def load_worker_config (config_file = worker_config_file): + global config + config = load_config (config_file) + +def load_server_config (config_file = config_file): + global config + config = load_config (config_file) + +def save_worker_config (config, config_file = worker_config_file): + save_config (config, config_file) diff --git a/cloudpy/connection.py b/cloudpy/connection.py new file mode 100644 index 0000000..7090b73 --- /dev/null +++ b/cloudpy/connection.py @@ -0,0 +1,214 @@ +import socket +import select +import threading +import logging + +from errno import EWOULDBLOCK + +import event + +class Parser (object): + + _buffer = None + _separator = None + + def __init__ (self, separator = "\n"): + self._buffer = "" + self._separator = separator + + def parse (self, data): + data = self._buffer + data + bits = data.split (self._separator) + self._buffer = bits[-1] + return bits[:-1] + + def prepare (self, data): + return data + self._separator + +class Connection (object): + + _socket = None + + _host = None + _port = None + _buffer_size = None + + def __init__ (self, host, port, buffer_size = 4096): + self._host = host + self._port = int (port) + self._buffer_size = buffer_size + self._socket = socket.socket () + self.prepare () + + def prepare (self): + raise NotImplementedError + + def run_thread (self): + thread = threading.Thread () + thread.run = self.run + thread.start () + + def run (self): + raise NotImplementedError + + def close (self): + logging.log (logging.INFO, "Closing Connection %d" % id (self)) + self._socket.close () + + def on_receive (self, sock, data): + raise NotImplementedError + + def on_connection_lost (self, sock): + raise NotImplementedError + + def __delete__ (self): + self.close () + +class ClientConnection (Connection): + + _parser = None + + def __init__ (self, *args, **kwargs): + super (ClientConnection, self).__init__ (*args, **kwargs) + self._parser = Parser () + + def send (self, data): + self._socket.send (data) + + def send_event (self, ev): + ev_desc = ev.describe () + logging.log (logging.INFO, "Sending event to server : %s" \ + % (ev_desc,)) + self.send (self._parser.prepare (ev_desc)) + + def prepare (self): + self._socket.connect ((self._host, self._port)) + self._socket.settimeout (1) + self.on_connected () + + def run (self): + while True: + try: + data = self._socket.recv (self._buffer_size) + if not data: + break + self.on_receive (self._socket, data) + except socket.error, se: + if se.args[0] == "timed out": + pass + else: + break + self.on_connection_lost (self._socket) + + def on_receive (self, sock, data): + logging.log (logging.DEBUG, "Got data from server : %s" \ + % (data.replace ("\n", "#"),)) + bits = self._parser.parse (data) + for bit in bits: + ev = event.EventSerializer.load (bit) + logging.log (logging.DEBUG, "Processing %s from server" \ + % (ev.__class__.__name__,)) + ev.process (self, sock) + + def on_connection_lost (self, sock): + logging.log (logging.INFO, "Lost connection to server") + + def on_connected (self): + logging.log (logging.INFO, "Connected to server") + +class ServerConnection (Connection): + + _parsers = None + _addresses = None + + _client_sockets = None + _select_timeout = 0.1 + _stop = False + + def __init__ (self, *args, **kwargs): + super (ServerConnection, self).__init__ (*args, **kwargs) + self._parsers = {} + self._addresses = {} + + def close (self): + logging.log (logging.INFO, "Closing ServerConnection %d" % id (self)) + self._stop = True + for sock in self._client_sockets: + sock.close () + self._socket.close () + + def broadcast (self, data): + for sock in self._client_sockets: + sock.send (data) + + def send_event (self, sock, ev): + ev_desc = ev.describe () + logging.log (logging.INFO, "Sending event to %s:%d : %s" \ + % (self._addresses[sock] + (ev_desc,))) + sock.send (self._parsers[sock].prepare (ev_desc)) + + def prepare (self): + self._socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.bind ((self._host, self._port)) + logging.log (logging.INFO, "Server bound to %s:%d" \ + % (self._host, self._port)) + self._stop = False + + def run (self): + self._client_sockets = [] + self._socket.listen (1) + while not self._stop: + readables, _, _ = select.select ([self._socket] + self._client_sockets, [], [], + self._select_timeout) + for sock in readables: + if sock == self._socket: + sock, address = self._socket.accept () + logging.log (logging.INFO, "Accepted client connection from %s" + % (address,)) + self.on_accept (sock) + sock.setblocking (0) + self._client_sockets.append (sock) + else: + self.recv_socket (sock) + + def recv_socket (self, sock): + while True: + try: + data = sock.recv (self._buffer_size) + if not data: + break + self.on_receive (sock, data) + except socket.error, se: + if se.args[0] == EWOULDBLOCK: + return + else: + break + self._client_sockets.remove (sock) + self.on_connection_lost (sock) + + def get_address (self, sock): + return self._addresses[sock] + + def on_receive (self, sock, data): + logging.log (logging.DEBUG, "Got data from %s:%d : %s" \ + % (self._addresses[sock] + \ + (data.replace ("\n", "#"),))) + bits = self._parsers[sock].parse (data) + for bit in bits: + ev = event.EventSerializer.load (bit) + logging.log (logging.DEBUG, "Processing %s from %s:%d" \ + % ((ev.__class__.__name__,) \ + + self._addresses[sock])) + ev.process (self, sock) + + def on_accept (self, sock): + self._parsers[sock] = Parser () + self._addresses[sock] = sock.getpeername () + logging.log (logging.INFO, "New connection from %s:%d" \ + % self._addresses[sock]) + + def on_connection_lost (self, sock): + logging.log (logging.INFO, "Lost connection to %s:%d" \ + % self._addresses[sock]) + del self._addresses[sock] + del self._parsers[sock] diff --git a/cloudpy/data.py b/cloudpy/data.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/cloudpy/data.py diff --git a/cloudpy/database.py b/cloudpy/database.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/cloudpy/database.py diff --git a/cloudpy/event.py b/cloudpy/event.py new file mode 100644 index 0000000..d8cb29b --- /dev/null +++ b/cloudpy/event.py @@ -0,0 +1,95 @@ +import base64 +import yaml + +class ValueCoder (object): + + @staticmethod + def encode (value, type): + try: + if type == "int": + return "%d" % value + elif type == "bool": + return "1" if value else "0" + elif type == "string": + return value + elif type == "yaml": + return yaml.dump (value) + elif type.startswith ("base64"): + type2 = type.split ("-", 1)[1] + return base64.b64encode (ValueCoder.encode (value, type2)) + elif type.startswith ("list"): + type2 = type.split ("_", 1)[1] + return ",".join ([ValueCoder.encode (value2, type2) + for value2 in value]) + elif type.startswith ("dict"): + type2, type3 = type.split ("_", 2)[1:] + return ",".join (["%s:%s" % (ValueCoder.encode (value2, type2), + ValueCoder.encode (value3, type3)) + for (value2, value3) in value.items ()]) + else: + raise ValueError, "Unknown type : " + type + except Exception, e: + raise ValueError, "Couldn't encode value %s of type %s : %s" \ + % (value, type, e) + + @staticmethod + def decode (value, type): + try: + if type == "int": + return int (value) + elif type == "bool": + return value == "1" + elif type == "string": + return value + elif type == "yaml": + return yaml.load (value) + elif type.startswith ("base64"): + type2 = type.split ("-", 1)[1] + return ValueCoder.decode (base64.b64decode (value), type2) + elif type.startswith ("list"): + type2 = type.split ("_", 1)[1] + return [ValueCoder.decode (value2, type2) + for value2 in value.split (",")] + elif type.startswith ("dict"): + type2, type3 = type.split ("_", 2)[1:] + return dict ([(ValueCoder.decode (value2, type2), + ValueCoder.decode (value3, type3)) + for (value2, value3) in + [tmp.split (":") + for tmp in value.split (",")]]) + else: + raise ValueError, "Unknown type : " + type + except Exception, e: + raise ValueError, "Couldn't decode value %s as type %s : %s" \ + % (value, type, e) + +class EventSerializer (object): + + @staticmethod + def load (data): + bits = data.split (" ") + class_name = bits[0] + try: + cls = globals ()[class_name] + assert issubclass (cls, Event) + except (AssertionError, KeyError): + raise ValueError, "Unknown event : " + (data) + args_type = cls.describe_args () + args = bits[1:] + if len (args) != len (args_type): + raise ValueError, "Wrong number of arguments for event : " + data + args = [ValueCoder.decode (args[i], args_type[i]) + for i in range (len (args))] + return cls (*args) + + @staticmethod + def dump (event, *args): + args_type = event.describe_args () + if len (args) != len (args_type): + raise ValueError, "Wrong number of arguments for event : " + data + args = [ValueCoder.encode (args[i], args_type[i]) + for i in range (len (args))] + data = " ".join ([event.__class__.__name__] + args) + return data + +from heartbeat.events import * diff --git a/cloudpy/event_base.py b/cloudpy/event_base.py new file mode 100644 index 0000000..b3d5bac --- /dev/null +++ b/cloudpy/event_base.py @@ -0,0 +1,47 @@ +from event import EventSerializer + +class Event (object): + + def process (self, context): + raise NotImplementedError + + def describe (self): + return EventSerializer.dump (self, *self.get_args ()) + + def get_args (self): + raise NotImplementedError + + @staticmethod + def describe_args (): + raise NotImplementedError + +class EmptyEvent (Event): + + def get_args (self): + return [] + + @staticmethod + def describe_args (): + return [] + +class SingleValueEvent (Event): + + value = None + + def __init__ (self, value): + self.value = value + + def get_args (self): + return [self.value] + +class BoolEvent (SingleValueEvent): + + @staticmethod + def describe_args (): + return ["bool"] + +class StringEvent (SingleValueEvent): + + @staticmethod + def describe_args (): + return ["string"] diff --git a/cloudpy/heartbeat/__init__.py b/cloudpy/heartbeat/__init__.py new file mode 100644 index 0000000..b8dbf77 --- /dev/null +++ b/cloudpy/heartbeat/__init__.py @@ -0,0 +1,167 @@ +import logging +from datetime import datetime + +from sqlalchemy import create_engine, Table, Column, \ + Integer, String, DateTime, MetaData, ForeignKey, \ + or_ +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relation, backref, sessionmaker +from sqlalchemy.sql import func + +TableBase = declarative_base () +Session = sessionmaker () + +import cloudpy.connection +import cloudpy.package +import cloudpy.config +import cloudpy.uid +import events + +class HeartbeatClient (cloudpy.connection.ClientConnection): + + kill = False + package = None + package_id = None + undeploying = False + +class Package (TableBase): + + __tablename__ = "packages" + + id = Column (Integer, primary_key = True) + name = Column (String) + data_pack = Column (String) + entry_point = Column (String) + full_id = Column (String) + requested_workers = Column (Integer) + + def __init__ (self, name, data_pack, entry_point, + full_id, requested_workers): + self.name = name + self.data_pack = data_pack + self.entry_point = entry_point + self.full_id = full_id + self.requested_workers = requested_workers + + def __repr__ (self): + return "<Package %s (%s)>" % (self.name, self.full_id) + +class Worker (TableBase): + + __tablename__ = "workers" + + id = Column (Integer, primary_key = True) + name = Column (String) + package_id = Column (Integer, ForeignKey ('packages.id')) + last_update = Column (DateTime) + package = relation (Package, backref = backref ('workers', order_by = id)) + + def __init__ (self, name): + self.name = name + + def __repr__ (self): + return "<Worker %s>" % self.name + +class HeartbeatServer (cloudpy.connection.ServerConnection): + + db_session = None + + def __init__ (self, *args, **kwargs): + super (HeartbeatServer, self).__init__ (*args, **kwargs) + engine = create_engine ("sqlite:///heartbeat.db") + metadata = TableBase.metadata + metadata.create_all (engine) + Session.configure (bind = engine) + self.db_session = Session () + + def update_running (self, host): + worker = self.db_session.query (Worker).filter_by (name = host).first () + if not worker: + # this IS not right, a running worker should never disappear... + logging.log (logging.ERROR, "Could not find running worker %s, " \ + + "something wrong must have occured" \ + % host) + worker = Worker (host) + self.db_session.add (worker) + worker.last_update = datetime.now () + self.db_session.commit () + return not worker.package + + def update_idle (self, host): + worker = self.db_session.query (Worker).filter_by (name = host).first () + if not worker: # new worker registering + logging.log (logging.INFO, "Registering worker %s" % host) + worker = Worker (host) + self.db_session.add (worker) + worker.last_update = datetime.now () + worker.package = None + self.db_session.commit () + subq = self.db_session.query (Worker.package_id, + func.count ('*').label ('workers_count')) + subq = subq.group_by (Worker.package_id).subquery () + q = self.db_session.query (Package, subq.c.workers_count) + q = q.outerjoin ((subq, Package.id == subq.c.package_id)) + q = q.filter (or_ (subq.c.workers_count == None, + subq.c.workers_count != Package.requested_workers)) + package, count = q.first () + worker.package = package + self.db_session.commit () + return { + "name": package.name, + "data_pack": package.data_pack, + "entry_point": package.entry_point, + } + + def deploy (self, package_dict): + package_id = cloudpy.uid.generate () + logging.log (logging.INFO, "Deploying package %s as %s" \ + % (package_dict["name"], package_id)) + package = Package (package_dict["name"], + package_dict["data_pack"], + package_dict["entry_point"], + package_id, + package_dict["requested_workers"]) + self.db_session.add (package) + self.db_session.commit () + return package_id + + def undeploy (self, package_id): + logging.log (logging.INFO, "Undeploying package %s" % package_id) + query = self.db_session.query (Package) + query = query.filter (Package.full_id == package_id) + package = query.first () + if not package: + return None + else: + self.db_session.delete (package) + self.db_session.commit () + return package.data_pack + +def update (running): + host, port = cloudpy.config.config["heartbeat"].split (":") + client = HeartbeatClient (host, port) + ev = events.HeartbeatUpdateEvent (running) + client.send_event (ev) + client.run () + if running: + return client.kill + else: + return client.package + +def deploy (package, requested_workers): + package = package.to_dict () + package["requested_workers"] = requested_workers + host, port = cloudpy.config.config["heartbeat"].split (":") + client = HeartbeatClient (host, port) + ev = events.HeartbeatDeployEvent (package) + client.send_event (ev) + client.run () + return client.package_id + +def undeploy (package_id): + host, port = cloudpy.config.config["heartbeat"].split (":") + client = HeartbeatClient (host, port) + ev = events.HeartbeatUndeployEvent (package_id) + client.send_event (ev) + client.run () + return client.undeploying diff --git a/cloudpy/heartbeat/events.py b/cloudpy/heartbeat/events.py new file mode 100644 index 0000000..fca067f --- /dev/null +++ b/cloudpy/heartbeat/events.py @@ -0,0 +1,91 @@ +import socket + +from cloudpy.event_base import * + +class HeartbeatUpdateEvent (BoolEvent): + + # value == running + + def process (self, connection, sock): + address = connection.get_address (sock)[0] + remote = socket.getfqdn (address) + if self.value: + kill = connection.update_running (remote) + if kill: + connection.send_event (sock, HeartbeatKillEvent ()) + else: + connection.send_event (sock, HeartbeatOkEvent ()) + else: + package = connection.update_idle (remote) + if package: + connection.send_event (sock, HeartbeatPackageEvent (package)) + else: + connection.send_event (sock, HeartbeatOkEvent ()) + +class HeartbeatKillEvent (EmptyEvent): + + def process (self, connection, sock): + connection.kill = True + connection.close () + +class HeartbeatOkEvent (EmptyEvent): + + def process (self, connection, sock): + connection.kill = False + connection.package = None + connection.close () + +class HeartbeatPackageEvent (Event): + + package = None + + def __init__ (self, package): + self.package = package + + def get_args (self): + return [self.package] + + @staticmethod + def describe_args (): + return ["base64-yaml"] + + def process (self, connection, sock): + connection.package = self.package + connection.close () + +class HeartbeatDeployEvent (HeartbeatPackageEvent): + + def process (self, connection, sock): + package_id = connection.deploy (self.package) + if not package_id: + package_id = "" + connection.send_event (sock, HeartbeatPackageIdEvent (package_id)) + +class HeartbeatPackageIdEvent (StringEvent): + + # value == package_id + + def process (self, connection, sock): + if not self.value: + connection.package_id = None + else: + connection.package_id = self.value + connection.close () + +class HeartbeatUndeployEvent (StringEvent): + + # value == package_id + + def process (self, connection, sock): + data_pack = connection.undeploy (self.value) + if not data_pack: + data_pack = "" + connection.send_event (sock, HeartbeatUndeployDataPackEvent (data_pack)) + +class HeartbeatUndeployDataPackEvent (StringEvent): + + # value == data_pack + + def process (self, connection, sock): + connection.undeploy_data_pack = self.value + connection.close () diff --git a/cloudpy/helpers.py b/cloudpy/helpers.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/cloudpy/helpers.py diff --git a/cloudpy/package.py b/cloudpy/package.py new file mode 100644 index 0000000..de19f22 --- /dev/null +++ b/cloudpy/package.py @@ -0,0 +1,62 @@ +# coding=utf-8 + +''' + cloudpy + Author : Guillaume "iXce" Seguin + Email : guillaume@segu.in (or guillaume.seguin@ens.fr) + + # Simple Cloud Computing system # + + Copyright (C) 2009 Guillaume Seguin + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +''' + +import signal + +class Package (object): + + name = "" + data_pack = "" + entry_point = "" + + def __init__ (self, name, data_pack, entry_point): + self.name = name + self.data_pack = data_pack + self.entry_point = entry_point + + def deploy (self): + if self.data_pack: + # fetch and unpack data pack + pass + + def run (self): + print "-- test --" + + def undeploy (self): + # FIXME : catch term signal + pass + + def to_dict (self): + return { + "name": self.name, + "data_pack": self.data_pack, + "entry_point": self.entry_point, + } + + @staticmethod + def from_dict (dict): + return Package (dict["name"], dict["data_pack"], dict["entry_point"]) diff --git a/cloudpy/task.py b/cloudpy/task.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/cloudpy/task.py diff --git a/cloudpy/uid.py b/cloudpy/uid.py new file mode 100644 index 0000000..a1b9bd2 --- /dev/null +++ b/cloudpy/uid.py @@ -0,0 +1,4 @@ +import uuid + +def generate (): + return str (uuid.uuid1 ()) diff --git a/cloudpy/workqueue.py b/cloudpy/workqueue.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/cloudpy/workqueue.py diff --git a/deploy.py b/deploy.py new file mode 100644 index 0000000..35c1313 --- /dev/null +++ b/deploy.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +import os +import sys +import yaml + +from cloudpy.package import Package +import cloudpy.heartbeat +import cloudpy.config + +if __name__ == "__main__": + if len (sys.argv) != 3: + print "Usage: %s PACKAGE_PATH REQUESTED_WORKERS" % sys.argv[0] + sys.exit (2) + package_path = sys.argv[1] + if not os.path.isdir (package_path): + print "Wrong package path", package_path + sys.exit (1) + requested_workers = int (sys.argv[2]) + cloudpy.config.load_server_config () + with open (os.path.join (package_path, "package.yml")) as f: + config = yaml.load (f) + files = config["files"] + # FIXME : create data_pack + package = Package (config["name"], "", config["entry_point"]) + package_id = cloudpy.heartbeat.deploy (package, requested_workers) + if package_id: + print "Package is being deployed as", package_id + else: + print "Could not deploy package" diff --git a/heartbeat.py b/heartbeat.py new file mode 100644 index 0000000..0b5ad9a --- /dev/null +++ b/heartbeat.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python + +import cloudpy.config +import cloudpy.heartbeat + +if __name__ == "__main__": + cloudpy.config.load_server_config () + host, port = cloudpy.config.config["heartbeat"].split (":") + print "Running Cloudpy Heartbeat server on %s:%s" % (host, port) + server = cloudpy.heartbeat.HeartbeatServer (host, port) + try: + server.run () + except KeyboardInterrupt: + print "Shutting down Heartbeat server" diff --git a/setup-workers.py b/setup-workers.py new file mode 100644 index 0000000..a7fcb36 --- /dev/null +++ b/setup-workers.py @@ -0,0 +1,115 @@ +# coding=utf-8 + +''' + cloudpy + Author : Guillaume "iXce" Seguin + Email : guillaume@segu.in (or guillaume.seguin@ens.fr) + + # Simple Cloud Computing system # + + Copyright (C) 2009 Guillaume Seguin + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +''' + +import os +import glob + +import paramiko + +import cloudpy.config + +CORE_FILES = [ + "worker.py", + "worker_config.yml", + ] \ + + glob.glob ("cloudpy/*.py") \ + + glob.glob ("cloudpy/heartbeat/*.py") +# FIXME : remove this glob + +def upload_files (ssh, path): + def makedirs (sftp, path): + sftp.chdir (".") + old_cwd = sftp.getcwd () + try: + sftp.chdir (path) + except IOError, e: + if e.errno != 2: + raise e + else: + sftp.chdir (old_cwd) + return + for bit in path.split (os.path.sep): + try: + sftp.chdir (bit) + except IOError, e: + if e.errno == 2: + sftp.mkdir (bit, mode = 0700) + sftp.chdir (bit) + else: + raise e + sftp.chdir (old_cwd) + sftp = ssh.open_sftp () + makedirs (sftp, path) + sftp.chdir (path) + for file in CORE_FILES: + makedirs (sftp, os.path.dirname (file)) + sftp.put (file, file) + sftp.close () + +def setup_cron (ssh, path): + stdin, stdout, stderr = ssh.exec_command ("pwd") + stdin.close () + path = os.path.join (stdout.read ().strip (), path) + stdin, stdout, stderr = ssh.exec_command ("crontab -l") + stdin.close () + current = stdout.read () + new = [line + for line in current.splitlines () + if "cloudpy" not in line] + new.append ("* * * * * python " + os.path.join (path, "worker.py")) + stdin, stdout, stderr = ssh.exec_command ("crontab -") + stdin.write ("\n".join (new)) + stdin.close () + +def setup_worker (worker, username, path): + ssh = paramiko.SSHClient () + ssh.set_missing_host_key_policy (paramiko.AutoAddPolicy ()) + ssh.connect (worker, username = username, allow_agent = True, + look_for_keys = False) + # upload files + upload_files (ssh, path) + # setup cron + setup_cron (ssh, path) + ssh.close () + +def setup_workers (config): + workers = config["workers"] + del config["workers"] + for worker in workers: + username = os.getlogin () + path = "" + if "@" in worker: + username, worker = worker.split ("@", 1) + if ":" in worker: + worker, path = worker.split (":", 1) + config["worker"] = "%s@%s:%s" % (username, worker, path) + cloudpy.config.save_worker_config (config) + setup_worker (worker, username, path) + +if __name__ == "__main__": + config = cloudpy.config.load_config () + setup_workers (config) diff --git a/undeploy.py b/undeploy.py new file mode 100644 index 0000000..609e5b3 --- /dev/null +++ b/undeploy.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +import sys + +import cloudpy.config +import cloudpy.heartbeat + +if __name__ == "__main__": + if len (sys.argv) != 2: + print "Usage: %s PACKAGE_ID" % sys.argv[0] + sys.exit (2) + package_id = sys.argv[1] + cloudpy.config.load_server_config () + if cloudpy.heartbeat.undeploy (package_id): + print "Package is being undeployed." + else: + print "Could not undeploy package (it probably wasn't deployed)." diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..c471deb --- /dev/null +++ b/worker.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# coding=utf-8 + +''' + cloudpy + Author : Guillaume "iXce" Seguin + Email : guillaume@segu.in (or guillaume.seguin@ens.fr) + + # Simple Cloud Computing system # + + Copyright (C) 2009 Guillaume Seguin + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +''' + +import os +import fcntl +import signal + +import yaml + +import cloudpy.config +import cloudpy.heartbeat +import cloudpy.package + +if os.path.dirname (__file__): + os.chdir (os.path.dirname (__file__)) + +lock_f = open ("lock", "w") +fcntl.lockf (lock_f, fcntl.LOCK_EX) + +cloudpy.config.load_worker_config () + +running = False + +if os.path.exists ("pid"): + with open ("pid") as pid_f: + pid = pid_f.read () + if pid and os.path.exists (os.path.join ("/proc", pid)): + with open (os.path.join ("/proc", pid, "cmdline")) as f: + running = "worker.py" in f.read () + +package = None + +if running: + kill = cloudpy.heartbeat.update (running = True) + if kill: # kill the worker + os.kill (int (pid), signal.SIGKILL) + running = False + fcntl.lockf (lock_f, fcntl.LOCK_UN) + pid_f.close () +if not running: + package = cloudpy.heartbeat.update (running = False) + if package: # we are going to run a package : set pid file ... + with open ("pid", "w") as pid_f: + pid_f.write (str (os.getpid ())) + # ... and build Package object + package = cloudpy.package.Package.from_dict (package) + fcntl.lockf (lock_f, fcntl.LOCK_UN) + pid_f.close () + +if not package: + raise SystemExit + +# package is set here : deploy it +package.deploy () +# run package +package.run () |