diff options
author | Guillaume Seguin <guillaume@segu.in> | 2009-11-18 11:54:42 +0100 |
---|---|---|
committer | Guillaume Seguin <guillaume@segu.in> | 2009-11-18 11:54:42 +0100 |
commit | d28d7778af1026c15510c4f4fb389bc21398c299 (patch) | |
tree | 96a858199d66154a817542fe7fdf74192729b5c6 | |
parent | 975acee881c9f6c3d88dd146741e4bd462896617 (diff) | |
download | cloudpy-master.tar.gz cloudpy-master.tar.bz2 |
Among others, fixed large sends and added a "pause" feature to client
connections.
-rw-r--r-- | cloudpy/connection.py | 19 | ||||
-rw-r--r-- | cloudpy/data/client.py | 15 | ||||
-rw-r--r-- | cloudpy/data/events.py | 57 | ||||
-rw-r--r-- | cloudpy/data/server.py | 14 | ||||
-rw-r--r-- | cloudpy/event.py | 17 | ||||
-rw-r--r-- | cloudpy/heartbeat/client.py | 18 | ||||
-rw-r--r-- | cloudpy/heartbeat/events.py | 58 | ||||
-rw-r--r-- | cloudpy/heartbeat/server.py | 17 | ||||
-rw-r--r-- | cloudpy/package.py | 54 | ||||
-rw-r--r-- | cloudpy/singleton.py | 9 | ||||
-rw-r--r-- | cloudpy/task.py | 52 | ||||
-rw-r--r-- | cloudpy/taskqueue/__init__.py (renamed from cloudpy/workqueue.py) | 0 | ||||
-rw-r--r-- | cloudpy/taskqueue/client.py | 58 | ||||
-rw-r--r-- | cloudpy/taskqueue/events.py | 117 | ||||
-rw-r--r-- | cloudpy/taskqueue/server.py | 115 | ||||
-rw-r--r-- | list-packages.py | 11 | ||||
-rw-r--r-- | list-workers.py | 14 | ||||
-rw-r--r-- | taskqueue.py | 14 | ||||
-rw-r--r-- | test_package/package.yml | 5 | ||||
-rw-r--r-- | test_package/test.py | 13 | ||||
-rw-r--r-- | test_package/test_task.py | 10 | ||||
-rw-r--r-- | undeploy.py | 5 | ||||
-rw-r--r-- | worker.py | 2 |
23 files changed, 613 insertions, 81 deletions
diff --git a/cloudpy/connection.py b/cloudpy/connection.py index 3a84993..01e18d2 100644 --- a/cloudpy/connection.py +++ b/cloudpy/connection.py @@ -69,11 +69,19 @@ class Connection (object): class ClientConnection (Connection): + _pause = None _parser = None def __init__ (self, *args, **kwargs): super (ClientConnection, self).__init__ (*args, **kwargs) self._parser = Parser () + self._pause = False + + def send (self, sock, data): + timeout = sock.gettimeout () + sock.settimeout (None) + sock.sendall (data) + sock.settimeout (timeout) def send_event (self, ev): ev_desc = ev.describe () @@ -86,8 +94,12 @@ class ClientConnection (Connection): self._socket.settimeout (1) self.on_connected () + def pause (self): + self._pause = True + def run (self): - while True: + self._pause = False + while not self._pause: try: data = self._socket.recv (self._buffer_size) if not data: @@ -179,7 +191,10 @@ class ServerConnection (Connection): break self.on_receive (sock, data) except socket.error, se: - break + if se.args[0] == "timed out": + return + else: + break self._client_sockets.remove (sock) self.on_connection_lost (sock) diff --git a/cloudpy/data/client.py b/cloudpy/data/client.py index bdd80b9..1ea7c1e 100644 --- a/cloudpy/data/client.py +++ b/cloudpy/data/client.py @@ -1,15 +1,18 @@ import cloudpy.connection import cloudpy.config import cloudpy.item +from cloudpy.singleton import Singleton import events -class DataClient (cloudpy.connection.ClientConnection): +class DataClient (Singleton, cloudpy.connection.ClientConnection): items_list = None items = None item = None pack = None + success = False + count = None def send_event (ev): host, port = cloudpy.config.config["data"].split (":") @@ -38,6 +41,11 @@ def put_pack (pack, items): ev = events.DataPutPackEvent (pack, items) client = send_event (ev) +def delete_pack (pack): + ev = events.DataDeletePackEvent (pack) + client = send_event (ev) + return client.count + def get_item (pack, item): ev = events.DataGetItemEvent (pack, item) client = send_event (ev) @@ -46,3 +54,8 @@ def get_item (pack, item): def put_item (item): ev = events.DataPutItemEvent (item.pack, item.name, item.serialize ()) client = send_event (ev) + +def delete_item (pack, item): + ev = events.DataDeleteItemEvent (pack, item) + client = send_event (ev) + return client.success diff --git a/cloudpy/data/events.py b/cloudpy/data/events.py index 748016c..18f1f2e 100644 --- a/cloudpy/data/events.py +++ b/cloudpy/data/events.py @@ -1,4 +1,5 @@ -from cloudpy.event import * +from cloudpy.event import Event, EmptyEvent, \ + StringEvent, BoolEvent, IntegerEvent import cloudpy.uid class DataCreatePackEvent (EmptyEvent): @@ -6,7 +7,6 @@ class DataCreatePackEvent (EmptyEvent): def process (self, connection, sock): pack = connection.create_pack () connection.send_event (sock, DataPackCreatedEvent (pack)) -register_event (DataCreatePackEvent) class DataPackCreatedEvent (StringEvent): @@ -14,8 +14,7 @@ class DataPackCreatedEvent (StringEvent): def process (self, connection, sock): connection.pack = self.value - connection.close () -register_event (DataPackCreatedEvent) + connection.pause () class DataListPackEvent (StringEvent): @@ -24,7 +23,6 @@ class DataListPackEvent (StringEvent): def process (self, connection, sock): items_list = connection.list_pack (self.value) connection.send_event (sock, DataPackListEvent (items_list)) -register_event (DataListPackEvent) class DataPackListEvent (Event): @@ -42,8 +40,7 @@ class DataPackListEvent (Event): def process (self, connection, sock): connection.items_list = self.items_list - connection.close () -register_event (DataPackListEvent) + connection.pause () class DataGetPackEvent (StringEvent): @@ -52,7 +49,6 @@ class DataGetPackEvent (StringEvent): def process (self, connection, sock): items = connection.get_pack (self.value) connection.send_event (sock, DataPackItemsEvent (items)) -register_event (DataGetPackEvent) class DataPackItemsEvent (Event): @@ -70,8 +66,7 @@ class DataPackItemsEvent (Event): def process (self, connection, sock): connection.items = self.items - connection.close () -register_event (DataPackItemsEvent) + connection.pause () class DataPutPackEvent (Event): @@ -92,13 +87,11 @@ class DataPutPackEvent (Event): def process (self, connection, sock): connection.put_pack (self.pack, self.items) connection.send_event (sock, DataPackPutSuccessEvent ()) -register_event (DataPutPackEvent) class DataPackPutSuccessEvent (EmptyEvent): def process (self, connection, sock): - connection.close () -register_event (DataPackPutSuccessEvent) + connection.pause () class DataGetItemEvent (Event): @@ -119,7 +112,6 @@ class DataGetItemEvent (Event): def process (self, connection, sock): item = connection.get_item (self.pack, self.item) connection.send_event (sock, DataItemEvent (item)) -register_event (DataGetItemEvent) class DataItemEvent (Event): @@ -137,8 +129,7 @@ class DataItemEvent (Event): def process (self, connection, sock): connection.item = item - connection.close () -register_event (DataItemEvent) + connection.pause () class DataPutItemEvent (Event): @@ -161,36 +152,38 @@ class DataPutItemEvent (Event): def process (self, connection, sock): connection.put_item (self.pack, self.item, self.data) connection.send_event (sock, DataItemPutSuccessEvent ()) -register_event (DataPutItemEvent) -class DataDeletePackEvent (DataGetPackEvent): +class DataDeletePackEvent (StringEvent): + + # value == pack def process (self, connection, sock): - connection.delete_pack (self.pack) - connection.send_event (sock, DataPackDeleteSuccessEvent ()) -register_event (DataDeletePackEvent) + success = connection.delete_pack (self.value) + connection.send_event (sock, DataPackDeleteSuccessEvent (success)) class DataDeleteItemEvent (DataGetItemEvent): def process (self, connection, sock): - connection.delete_item (self.pack, self.item) - connection.send_event (sock, DataItemDeleteSuccessEvent ()) -register_event (DataDeleteItemEvent) + success = connection.delete_item (self.pack, self.item) + connection.send_event (sock, DataItemDeleteSuccessEvent (success)) + +class DataPackDeleteSuccessEvent (IntegerEvent): -class DataPackDeleteSuccessEvent (EmptyEvent): + # value == count def process (self, connection, sock): - connection.close () -register_event (DataPackDeleteSuccessEvent) + connection.count = self.value + connection.pause () + +class DataItemDeleteSuccessEvent (BoolEvent): -class DataItemDeleteSuccessEvent (EmptyEvent): + # value == success def process (self, connection, sock): - connection.close () -register_event (DataItemDeleteSuccessEvent) + connection.success = self.value + connection.pause () class DataItemPutSuccessEvent (EmptyEvent): def process (self, connection, sock): - connection.close () -register_event (DataItemPutSuccessEvent) + connection.pause () diff --git a/cloudpy/data/server.py b/cloudpy/data/server.py index eb8adda..cc768cb 100644 --- a/cloudpy/data/server.py +++ b/cloudpy/data/server.py @@ -50,7 +50,7 @@ class DataServer (cloudpy.connection.ServerConnection): self.db_session = Session () def list_pack (self, pack): - query = self.db_session.query (Item.name).filter_by (Item.pack == pack) + query = self.db_session.query (Item.name).filter (Item.pack == pack) items = query.all () return items @@ -74,9 +74,9 @@ class DataServer (cloudpy.connection.ServerConnection): def delete_pack (self, pack): query = self.db_session.query (Item).filter (Item.pack == pack) - query.delete () + count = query.delete () self.db_session.commit () - # FIXME : maybe could we return the number of deleted items ? + return count def get_item (self, pack, item): query = self.db_session.query (Item).filter (and_ (Item.pack == pack, @@ -105,8 +105,6 @@ class DataServer (cloudpy.connection.ServerConnection): def delete_item (self, item): query = self.db_session.query (Item).filter (and_ (Item.pack == pack, Item.name == item)) - item = query.first () - # FIXME : add an error if item is not found - if item: - self.db_session.delete (item) - self.db_session.commit () + count = query.delete () + self.db_session.commit () + return count == 1 diff --git a/cloudpy/event.py b/cloudpy/event.py index 59402b0..a5c180e 100644 --- a/cloudpy/event.py +++ b/cloudpy/event.py @@ -94,8 +94,17 @@ class EventSerializer (object): data = " ".join ([event.__class__.__name__] + args) return data +class EventRegistrator (type): + + def __new__ (self, name, bases, dict): + event = type.__new__ (self, name, bases, dict) + Events[name] = event + return event + class Event (object): + __metaclass__ = EventRegistrator + def process (self, context): raise NotImplementedError @@ -140,6 +149,8 @@ class StringEvent (SingleValueEvent): def describe_args (): return ["string"] -def register_event (event): - Events[event.__name__] = event - return event +class IntegerEvent (SingleValueEvent): + + @staticmethod + def describe_args (): + return ["int"] diff --git a/cloudpy/heartbeat/client.py b/cloudpy/heartbeat/client.py index 110aafb..e3b6ede 100644 --- a/cloudpy/heartbeat/client.py +++ b/cloudpy/heartbeat/client.py @@ -1,4 +1,5 @@ import cloudpy.connection +from cloudpy.singleton import Singleton import events @@ -7,7 +8,10 @@ class HeartbeatClient (cloudpy.connection.ClientConnection): kill = False package = None package_id = None - undeploying = False + undeploy_data_pack = None + + packages = None + workers = None def send_event (ev): host, port = cloudpy.config.config["heartbeat"].split (":") @@ -16,6 +20,16 @@ def send_event (ev): client.run () return client +def list_packages (): + ev = events.HeartbeatListPackagesEvent () + client = send_event (ev) + return client.packages + +def list_workers (): + ev = events.HeartbeatListWorkersEvent () + client = send_event (ev) + return client.workers + def update (running): ev = events.HeartbeatUpdateEvent (running) client = send_event (ev) @@ -34,4 +48,4 @@ def deploy (package, requested_workers): def undeploy (package_id): ev = events.HeartbeatUndeployEvent (package_id) client = send_event (ev) - return client.undeploying + return client.undeploy_data_pack diff --git a/cloudpy/heartbeat/events.py b/cloudpy/heartbeat/events.py index bb7bfca..6a9a0db 100644 --- a/cloudpy/heartbeat/events.py +++ b/cloudpy/heartbeat/events.py @@ -1,6 +1,6 @@ import socket -from cloudpy.event import * +from cloudpy.event import Event, EmptyEvent, StringEvent, BoolEvent class HeartbeatUpdateEvent (BoolEvent): @@ -21,14 +21,60 @@ class HeartbeatUpdateEvent (BoolEvent): connection.send_event (sock, HeartbeatPackageEvent (package)) else: connection.send_event (sock, HeartbeatOkEvent ()) -register_event (HeartbeatUpdateEvent) + +class HeartbeatListPackagesEvent (EmptyEvent): + + def process (self, connection, sock): + packages = connection.list_packages () + connection.send_event (sock, HeartbeatPackagesListEvent (packages)) + +class HeartbeatListWorkersEvent (EmptyEvent): + + def process (self, connection, sock): + workers = connection.list_workers () + connection.send_event (sock, HeartbeatWorkersListEvent (workers)) + +class HeartbeatPackagesListEvent (Event): + + packages = None + + def __init__ (self, packages): + self.packages = packages + + def get_args (self): + return [self.packages] + + @staticmethod + def describe_args (): + return ["base64-yaml"] + + def process (self, connection, sock): + connection.packages = self.packages + connection.close () + +class HeartbeatWorkersListEvent (Event): + + workers = None + + def __init__ (self, workers): + self.workers = workers + + def get_args (self): + return [self.workers] + + @staticmethod + def describe_args (): + return ["base64-yaml"] + + def process (self, connection, sock): + connection.workers = self.workers + connection.close () class HeartbeatKillEvent (EmptyEvent): def process (self, connection, sock): connection.kill = True connection.close () -register_event (HeartbeatKillEvent) class HeartbeatOkEvent (EmptyEvent): @@ -36,7 +82,6 @@ class HeartbeatOkEvent (EmptyEvent): connection.kill = False connection.package = None connection.close () -register_event (HeartbeatOkEvent) class HeartbeatPackageEvent (Event): @@ -55,7 +100,6 @@ class HeartbeatPackageEvent (Event): def process (self, connection, sock): connection.package = self.package connection.close () -register_event (HeartbeatPackageEvent) class HeartbeatDeployEvent (HeartbeatPackageEvent): @@ -64,7 +108,6 @@ class HeartbeatDeployEvent (HeartbeatPackageEvent): if not package_id: package_id = "" connection.send_event (sock, HeartbeatPackageIdEvent (package_id)) -register_event (HeartbeatDeployEvent) class HeartbeatPackageIdEvent (StringEvent): @@ -76,7 +119,6 @@ class HeartbeatPackageIdEvent (StringEvent): else: connection.package_id = self.value connection.close () -register_event (HeartbeatPackageIdEvent) class HeartbeatUndeployEvent (StringEvent): @@ -87,7 +129,6 @@ class HeartbeatUndeployEvent (StringEvent): if not data_pack: data_pack = "" connection.send_event (sock, HeartbeatUndeployDataPackEvent (data_pack)) -register_event (HeartbeatUndeployEvent) class HeartbeatUndeployDataPackEvent (StringEvent): @@ -96,4 +137,3 @@ class HeartbeatUndeployDataPackEvent (StringEvent): def process (self, connection, sock): connection.undeploy_data_pack = self.value connection.close () -register_event (HeartbeatUndeployDataPackEvent) diff --git a/cloudpy/heartbeat/server.py b/cloudpy/heartbeat/server.py index c59eb13..a2bff33 100644 --- a/cloudpy/heartbeat/server.py +++ b/cloudpy/heartbeat/server.py @@ -5,7 +5,7 @@ from sqlalchemy import create_engine, Table, Column, \ Integer, String, DateTime, MetaData, ForeignKey, \ or_ from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relation, backref, sessionmaker +from sqlalchemy.orm import relation, backref, sessionmaker, join from sqlalchemy.sql import func TableBase = declarative_base () @@ -68,6 +68,15 @@ class HeartbeatServer (cloudpy.connection.ServerConnection): Session.configure (bind = engine) self.db_session = Session () + def list_packages (self): + query = self.db_session.query (Package.full_id, Package.name) + return [(id, name) for (id, name) in query.all ()] + + def list_workers (self): + query = self.db_session.query (Worker.name, Package.full_id) + query = query.select_from (join (Package, Worker)) + return [(worker, package) for (worker, package) in query.all ()] + def update_running (self, host): worker = self.db_session.query (Worker).filter_by (name = host).first () if not worker: @@ -97,10 +106,14 @@ class HeartbeatServer (cloudpy.connection.ServerConnection): q = q.outerjoin ((subq, Package.id == subq.c.package_id)) q = q.filter (or_ (subq.c.workers_count == None, subq.c.workers_count != Package.requested_workers)) - package, count = q.first () + result = q.first () + if not result: + return "" + package, count = result worker.package = package self.db_session.commit () return { + "id": package.full_id, "name": package.name, "data_pack": package.data_pack, "entry_point": package.entry_point, diff --git a/cloudpy/package.py b/cloudpy/package.py index 179a47a..7492c67 100644 --- a/cloudpy/package.py +++ b/cloudpy/package.py @@ -30,6 +30,7 @@ import os import sys import base64 +from cloudpy.singleton import Singleton import cloudpy.data.client def recursive_unlink (base): @@ -38,31 +39,35 @@ def recursive_unlink (base): os.unlink (os.path.join (path, file)) for directory in dirs: os.rmdir (os.path.join (path, directory)) + os.rmdir (base) class Package (object): + id = "" + name = "" data_pack = "" entry_point = "" package_dir = "" - def __init__ (self, name, data_pack, entry_point): - self.name = name - self.data_pack = data_pack - self.entry_point = entry_point - # setup signal handler - signal.signal (signal.SIGTERM, self.undeploy) + def __init__ (self, name, data_pack, entry_point, id = ""): + Package.name = name + Package.data_pack = data_pack + Package.entry_point = entry_point + Package.id = id def deploy (self): - if not os.path.isdir (self.name): - os.mkdir (self.name) - os.chdir (self.name) - self.package_dir = os.getcwd () - sys.path = [self.package_dir] + sys.path - if self.data_pack: + # setup signal handler + signal.signal (signal.SIGTERM, self.undeploy) + if not os.path.isdir (Package.name): + os.mkdir (Package.name) + os.chdir (Package.name) + Package.package_dir = os.getcwd () + sys.path = [Package.package_dir] + sys.path + if Package.data_pack: # fetch and unpack data pack - items = cloudpy.data.client.get_pack (self.data_pack) + items = cloudpy.data.client.get_pack (Package.data_pack) for item in items: dirname = os.path.dirname (item.metadata) if dirname and not os.path.isdir (dirname): @@ -71,25 +76,32 @@ class Package (object): f.write (base64.b64decode (item.data)) def run (self): - entry_module, entry_function = self.entry_point.rsplit (".", 1) + entry_module, entry_function = Package.entry_point.rsplit (".", 1) try: entry_module = __import__ (entry_module) entry_function = getattr (entry_module, entry_function) - except Exception, e: - print "Entry point %s does not exist : %s" % (self.entry_point, e) + except (ImportError, AttributeError) as e: + print "Entry point %s does not exist : %s : %s" \ + % (Package.entry_point, e.__class__.__name__, e) raise SystemExit entry_function () def undeploy (self, sig = None, stack = None): - recursive_unlink (self.package_dir) + if sig: + print "Received undeploy signal." + recursive_unlink (Package.package_dir) + if sig: + raise SystemExit def to_dict (self): return { - "name": self.name, - "data_pack": self.data_pack, - "entry_point": self.entry_point, + "id": Package.id, + "name": Package.name, + "data_pack": Package.data_pack, + "entry_point": Package.entry_point, } @staticmethod def from_dict (dict): - return Package (dict["name"], dict["data_pack"], dict["entry_point"]) + return Package (dict["name"], dict["data_pack"], + dict["entry_point"], dict["id"]) diff --git a/cloudpy/singleton.py b/cloudpy/singleton.py new file mode 100644 index 0000000..1cfe54b --- /dev/null +++ b/cloudpy/singleton.py @@ -0,0 +1,9 @@ +class Singleton (object): + + instance = None + def __new__ (cls, *args, **kwargs): + if not cls.instance: + cls.instance = object.__new__ (cls) + else: + cls.__init__ = lambda *a, **k: None + return cls.instance diff --git a/cloudpy/task.py b/cloudpy/task.py index e69de29..8876156 100644 --- a/cloudpy/task.py +++ b/cloudpy/task.py @@ -0,0 +1,52 @@ +import cloudpy.taskqueue.client + +DEFAULT_RETRY_RATE = 180 # 180 seconds + +Tasks = {} + +class TaskSerializer (object): + + @staticmethod + def load (id, task_class, args, kwargs): + try: + cls = Tasks[task_class] + assert issubclass (cls, Task) + except (AssertionError, KeyError): + raise ValueError, "Unknown task : " + (data) + task = cls (*args, **kwargs) + task.id = id + return task + + @staticmethod + def dump (task): + task_class = task.__class__.__name__ + return task_class, task.args, task.kwargs + +class TaskRegistrator (type): + + def __new__ (self, name, bases, dict): + task = type.__new__ (self, name, bases, dict) + Tasks[name] = task + return task + +class Task (object): + + __metaclass__ = TaskRegistrator + + id = None + retry_rate = DEFAULT_RETRY_RATE + args = None + kwargs = None + + def __init__ (self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def finish (self): + cloudpy.taskqueue.client.finish_task (self.id) + + def process (self): + return self.do_process (*self.args, **self.kwargs) + + def do_process (self, *args, **kwargs): + raise NotImplementedError diff --git a/cloudpy/workqueue.py b/cloudpy/taskqueue/__init__.py index e69de29..e69de29 100644 --- a/cloudpy/workqueue.py +++ b/cloudpy/taskqueue/__init__.py diff --git a/cloudpy/taskqueue/client.py b/cloudpy/taskqueue/client.py new file mode 100644 index 0000000..c9365e9 --- /dev/null +++ b/cloudpy/taskqueue/client.py @@ -0,0 +1,58 @@ +import cloudpy.connection +import cloudpy.config +import cloudpy.package +import cloudpy.task +from cloudpy.singleton import Singleton + +import events + +class TaskqueueClient (Singleton, cloudpy.connection.ClientConnection): + + task_id = None + task_class = None + args = None + kwargs = None + queue_empty = None + success = None + + def __init__ (self, package_id, *args, **kwargs): + super (TaskqueueClient, self).__init__ (*args, **kwargs) + self.send_event (events.TaskqueuePackageIdEvent (package_id)) + self.run () + +def send_event (ev): + package_id = cloudpy.package.Package.id + host, port = cloudpy.config.config["taskqueue"].split (":") + client = TaskqueueClient (package_id, host, port) + client.send_event (ev) + client.run () + return client + +def is_empty (): + ev = events.TaskqueueIsEmptyRequestEvent () + client = send_event (ev) + return client.queue_empty + +def add_task (task): + # add_task (task_class, args, kwargs, retry_rate): + task_class, args, kwargs = cloudpy.task.TaskSerializer.dump (task) + retry_rate = task.retry_rate + ev = events.TaskqueueAddTaskEvent (task_class, args, kwargs, retry_rate) + client = send_event (ev) + return client.task_id + +def get_task (): + ev = events.TaskqueueGetTaskEvent () + client = send_event (ev) + return cloudpy.task.TaskSerializer.load (client.task_id, client.task_class, + client.args, client.kwargs) + +def finish_task (task_id): + ev = events.TaskqueueFinishTaskEvent (task_id) + client = send_event (ev) + return client.success + +def delete_task (task_id): + ev = events.TaskqueueDeleteTaskEvent (task_id) + client = send_event (ev) + return client.success diff --git a/cloudpy/taskqueue/events.py b/cloudpy/taskqueue/events.py new file mode 100644 index 0000000..7302022 --- /dev/null +++ b/cloudpy/taskqueue/events.py @@ -0,0 +1,117 @@ +from cloudpy.event import Event, EmptyEvent, StringEvent, BoolEvent +import cloudpy.uid + +class TaskqueuePackageIdEvent (StringEvent): + + # value == package_id + + def process (self, connection, sock): + connection.package_id = self.value + connection.send_event (sock, TaskqueueSuccessEvent (True)) + +class TaskqueueIsEmptyRequestEvent (EmptyEvent): + + def process (self, connection, sock): + is_empty = connection.is_empty () + connection.send_event (sock, TaskqueueIsEmptyEvent (is_empty)) + +class TaskqueueIsEmptyEvent (BoolEvent): + + # value == is_empty + + def process (self, connection, sock): + connection.queue_empty = self.value + connection.pause () + +class TaskqueueAddTaskEvent (Event): + + task_class = None + args = None + kwargs = None + retry_rate = None + + def __init__ (self, task_class, args, kwargs, retry_rate): + self.task_class = task_class + self.args = args + self.kwargs = kwargs + self.retry_rate = retry_rate + + def get_args (self): + return [self.task_class, self.args, self.kwargs, self.retry_rate] + + @staticmethod + def describe_args (): + return ["base64-string", "base64-yaml", "base64-yaml", "int"] + + def process (self, connection, sock): + task_id = connection.add_task (self.task_class, + self.args, + self.kwargs, + self.retry_rate) + connection.send_event (sock, TaskqueueTaskIdEvent (task_id)) + +class TaskqueueTaskIdEvent (StringEvent): + + # value == task_id + + def process (self, connection, sock): + connection.task_id = self.value + connection.pause () + +class TaskqueueGetTaskEvent (EmptyEvent): + + def process (self, connection, sock): + task_id, task_class, args, kwargs = connection.get_task () + connection.send_event (sock, TaskqueueTaskEvent (task_id, task_class, + args, kwargs)) + +class TaskqueueTaskEvent (Event): + + id = None + task_class = None + args = None + kwargs = None + + def __init__ (self, id, task_class, args, kwargs): + self.id = id + self.task_class = task_class + self.args = args + self.kwargs = kwargs + + def get_args (self): + return [self.id, self.task_class, self.args, self.kwargs] + + @staticmethod + def describe_args (): + return ["string", "base64-string", "base64-yaml", "base64-yaml"] + + def process (self, connection, sock): + connection.task_id = self.id + connection.task_class = self.task_class + connection.args = self.args + connection.kwargs = self.kwargs + connection.pause () + +class TaskqueueFinishTaskEvent (StringEvent): + + # value == task_id + + def process (self, connection, sock): + success = connection.finish_task (self.value) + connection.send_event (sock, TaskqueueSuccessEvent (success)) + +class TaskqueueSuccessEvent (BoolEvent): + + # value == success + + def process (self, connection, sock): + connection.success = self.value + connection.pause () + +class TaskqueueDeleteTaskEvent (StringEvent): + + # value == task_id + + def process (self, connection, sock): + success = connection.delete_task (self.value) + connection.send_event (sock, TaskqueueSuccessEvent (success)) diff --git a/cloudpy/taskqueue/server.py b/cloudpy/taskqueue/server.py new file mode 100644 index 0000000..d826822 --- /dev/null +++ b/cloudpy/taskqueue/server.py @@ -0,0 +1,115 @@ +import logging +import base64 +import yaml +from datetime import datetime, timedelta + +from sqlalchemy import create_engine, MetaData, Column, \ + Integer, String, Text, DateTime, \ + PrimaryKeyConstraint, ForeignKey, \ + and_ +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relation, backref, sessionmaker +from sqlalchemy.sql import func + +TableBase = declarative_base () +Session = sessionmaker () + +import cloudpy.connection +import cloudpy.uid +import cloudpy.item + +import cloudpy.taskqueue.events + +RETRY_FACTOR = 1.3 + +class Task (TableBase): + + __tablename__ = "tasks" + __table_args__ = (PrimaryKeyConstraint ("package_id", "task_id", + name = "pk"), {}) + + package_id = Column (String (36)) + task_id = Column (String (36)) + task_class = Column (String) + args = Column (Text) + kwargs = Column (Text) + retry_rate = Column (Integer) + retry_date = Column (DateTime) + + def __init__ (self, package_id, task_id, task_class, + args, kwargs, retry_rate): + self.package_id = package_id + self.task_id = task_id + self.task_class = task_class + self.args = args + self.kwargs = self.kwargs + self.retry_rate = retry_rate + self.retry_date = datetime.now () + + def __repr__ (self): + return "<Task %s of package %s>" % (self.task_id, self.package_id) + +class TaskqueueServer (cloudpy.connection.ServerConnection): + + db_session = None + + package_id = None + + def __init__ (self, *args, **kwargs): + super (TaskqueueServer, self).__init__ (*args, **kwargs) + engine = create_engine ("sqlite:///taskqueue.db") + metadata = TableBase.metadata + metadata.create_all (engine) + Session.configure (bind = engine) + self.db_session = Session () + + def is_empty (self): + query = self.db_session.query (Task.task_id) + count = query.filter (Task.package_id == self.package_id).count () + return count == 0 + + def add_task (self, task_class, args, kwargs, retry_rate): + task_id = cloudpy.uid.generate () + args = base64.b64encode (yaml.dump (args)) + kwargs = base64.b64encode (yaml.dump (kwargs)) + task = Task (self.package_id, task_id, task_class, + args, kwargs, retry_rate) + self.db_session.add (task) + self.db_session.commit () + return task_id + + def get_task (self): + query = self.db_session.query (Task) + query = query.filter (and_ (Task.package_id == self.package_id, + Task.retry_date <= datetime.now ())) + task = query.one () + if not task: + return "", [], {} + else: + task.retry_date += timedelta (0, task.retry_rate) + task.retry_rate = task.retry_rate * RETRY_FACTOR + self.db_session.commit () + if task.args: + args = yaml.load (base64.b64decode (task.args)) + else: + args = [] + if task.kwargs: + kwargs = yaml.load (base64.b64decode (task.kwargs)) + else: + kwargs = {} + return task.task_id, task.task_class, args, kwargs + + def delete_task (self, task_id): + query = self.db_session.query (Task) + query = query.filter (and_ (Task.package_id == self.package_id, + Task.task_id == task_id)) + task = query.first () + if task: + self.db_session.delete (task) + self.db_session.commit () + return True + else: + return False + + def finish_task (self, task_id): + self.delete_task (task_id) diff --git a/list-packages.py b/list-packages.py new file mode 100644 index 0000000..dbd9c52 --- /dev/null +++ b/list-packages.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# coding=utf-8 + +import cloudpy.config +import cloudpy.heartbeat.client + +if __name__ == "__main__": + cloudpy.config.load_server_config () + packages = cloudpy.heartbeat.client.list_packages () + for id, name in packages: + print "%s - %s" % (id, name) diff --git a/list-workers.py b/list-workers.py new file mode 100644 index 0000000..32341af --- /dev/null +++ b/list-workers.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# coding=utf-8 + +import cloudpy.config +import cloudpy.heartbeat.client + +if __name__ == "__main__": + cloudpy.config.load_server_config () + workers = cloudpy.heartbeat.client.list_workers () + for worker, package in workers: + if package: + print "%s running %s" % (worker, package) + else: + print worker diff --git a/taskqueue.py b/taskqueue.py new file mode 100644 index 0000000..01f155b --- /dev/null +++ b/taskqueue.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python + +import cloudpy.config +import cloudpy.taskqueue.server + +if __name__ == "__main__": + cloudpy.config.load_server_config () + host, port = cloudpy.config.config["taskqueue"].split (":") + print "Running Cloudpy Taskqueue server on %s:%s" % (host, port) + server = cloudpy.taskqueue.server.TaskqueueServer (host, port) + try: + server.run () + except KeyboardInterrupt: + print "Shutting down Taskqueue server" diff --git a/test_package/package.yml b/test_package/package.yml new file mode 100644 index 0000000..bb7ad29 --- /dev/null +++ b/test_package/package.yml @@ -0,0 +1,5 @@ +name: test +files: + - test.py + - test_task.py +entry_point: test.main diff --git a/test_package/test.py b/test_package/test.py new file mode 100644 index 0000000..5791900 --- /dev/null +++ b/test_package/test.py @@ -0,0 +1,13 @@ +import cloudpy.taskqueue.client as taskqueue +import test_task + +def main (): + print taskqueue.is_empty () + task = test_task.TestTask (0) + taskqueue.add_task (task) + print taskqueue.is_empty () + while True: + task = taskqueue.get_task () + if not task: + break + task.process () diff --git a/test_package/test_task.py b/test_package/test_task.py new file mode 100644 index 0000000..7a048c6 --- /dev/null +++ b/test_package/test_task.py @@ -0,0 +1,10 @@ +import cloudpy.task +import cloudpy.taskqueue.client as taskqueue + +class TestTask (cloudpy.task.Task): + + def do_process (self, i): + task = TestTask (i + 1) + print i + taskqueue.add_task (task) + self.finish () diff --git a/undeploy.py b/undeploy.py index bc10420..09dbc45 100644 --- a/undeploy.py +++ b/undeploy.py @@ -4,6 +4,7 @@ import sys import cloudpy.config import cloudpy.heartbeat.client +import cloudpy.data.client if __name__ == "__main__": if len (sys.argv) != 2: @@ -11,7 +12,9 @@ if __name__ == "__main__": sys.exit (2) package_id = sys.argv[1] cloudpy.config.load_server_config () - if cloudpy.heartbeat.client.undeploy (package_id): + data_pack = cloudpy.heartbeat.client.undeploy (package_id) + if data_pack: + cloudpy.data.client.delete_pack (data_pack) print "Package is being undeployed." else: print "Could not undeploy package (it probably wasn't deployed)." @@ -73,6 +73,8 @@ if not running: pid_f.close () if not package: + if not running: + print "No package currently needs more workers" raise SystemExit # package is set here : deploy it |