summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudpy/connection.py19
-rw-r--r--cloudpy/data/client.py15
-rw-r--r--cloudpy/data/events.py57
-rw-r--r--cloudpy/data/server.py14
-rw-r--r--cloudpy/event.py17
-rw-r--r--cloudpy/heartbeat/client.py18
-rw-r--r--cloudpy/heartbeat/events.py58
-rw-r--r--cloudpy/heartbeat/server.py17
-rw-r--r--cloudpy/package.py54
-rw-r--r--cloudpy/singleton.py9
-rw-r--r--cloudpy/task.py52
-rw-r--r--cloudpy/taskqueue/__init__.py (renamed from cloudpy/workqueue.py)0
-rw-r--r--cloudpy/taskqueue/client.py58
-rw-r--r--cloudpy/taskqueue/events.py117
-rw-r--r--cloudpy/taskqueue/server.py115
-rw-r--r--list-packages.py11
-rw-r--r--list-workers.py14
-rw-r--r--taskqueue.py14
-rw-r--r--test_package/package.yml5
-rw-r--r--test_package/test.py13
-rw-r--r--test_package/test_task.py10
-rw-r--r--undeploy.py5
-rw-r--r--worker.py2
23 files changed, 613 insertions, 81 deletions
diff --git a/cloudpy/connection.py b/cloudpy/connection.py
index 3a84993..01e18d2 100644
--- a/cloudpy/connection.py
+++ b/cloudpy/connection.py
@@ -69,11 +69,19 @@ class Connection (object):
class ClientConnection (Connection):
+ _pause = None
_parser = None
def __init__ (self, *args, **kwargs):
super (ClientConnection, self).__init__ (*args, **kwargs)
self._parser = Parser ()
+ self._pause = False
+
+ def send (self, sock, data):
+ timeout = sock.gettimeout ()
+ sock.settimeout (None)
+ sock.sendall (data)
+ sock.settimeout (timeout)
def send_event (self, ev):
ev_desc = ev.describe ()
@@ -86,8 +94,12 @@ class ClientConnection (Connection):
self._socket.settimeout (1)
self.on_connected ()
+ def pause (self):
+ self._pause = True
+
def run (self):
- while True:
+ self._pause = False
+ while not self._pause:
try:
data = self._socket.recv (self._buffer_size)
if not data:
@@ -179,7 +191,10 @@ class ServerConnection (Connection):
break
self.on_receive (sock, data)
except socket.error, se:
- break
+ if se.args[0] == "timed out":
+ return
+ else:
+ break
self._client_sockets.remove (sock)
self.on_connection_lost (sock)
diff --git a/cloudpy/data/client.py b/cloudpy/data/client.py
index bdd80b9..1ea7c1e 100644
--- a/cloudpy/data/client.py
+++ b/cloudpy/data/client.py
@@ -1,15 +1,18 @@
import cloudpy.connection
import cloudpy.config
import cloudpy.item
+from cloudpy.singleton import Singleton
import events
-class DataClient (cloudpy.connection.ClientConnection):
+class DataClient (Singleton, cloudpy.connection.ClientConnection):
items_list = None
items = None
item = None
pack = None
+ success = False
+ count = None
def send_event (ev):
host, port = cloudpy.config.config["data"].split (":")
@@ -38,6 +41,11 @@ def put_pack (pack, items):
ev = events.DataPutPackEvent (pack, items)
client = send_event (ev)
+def delete_pack (pack):
+ ev = events.DataDeletePackEvent (pack)
+ client = send_event (ev)
+ return client.count
+
def get_item (pack, item):
ev = events.DataGetItemEvent (pack, item)
client = send_event (ev)
@@ -46,3 +54,8 @@ def get_item (pack, item):
def put_item (item):
ev = events.DataPutItemEvent (item.pack, item.name, item.serialize ())
client = send_event (ev)
+
+def delete_item (pack, item):
+ ev = events.DataDeleteItemEvent (pack, item)
+ client = send_event (ev)
+ return client.success
diff --git a/cloudpy/data/events.py b/cloudpy/data/events.py
index 748016c..18f1f2e 100644
--- a/cloudpy/data/events.py
+++ b/cloudpy/data/events.py
@@ -1,4 +1,5 @@
-from cloudpy.event import *
+from cloudpy.event import Event, EmptyEvent, \
+ StringEvent, BoolEvent, IntegerEvent
import cloudpy.uid
class DataCreatePackEvent (EmptyEvent):
@@ -6,7 +7,6 @@ class DataCreatePackEvent (EmptyEvent):
def process (self, connection, sock):
pack = connection.create_pack ()
connection.send_event (sock, DataPackCreatedEvent (pack))
-register_event (DataCreatePackEvent)
class DataPackCreatedEvent (StringEvent):
@@ -14,8 +14,7 @@ class DataPackCreatedEvent (StringEvent):
def process (self, connection, sock):
connection.pack = self.value
- connection.close ()
-register_event (DataPackCreatedEvent)
+ connection.pause ()
class DataListPackEvent (StringEvent):
@@ -24,7 +23,6 @@ class DataListPackEvent (StringEvent):
def process (self, connection, sock):
items_list = connection.list_pack (self.value)
connection.send_event (sock, DataPackListEvent (items_list))
-register_event (DataListPackEvent)
class DataPackListEvent (Event):
@@ -42,8 +40,7 @@ class DataPackListEvent (Event):
def process (self, connection, sock):
connection.items_list = self.items_list
- connection.close ()
-register_event (DataPackListEvent)
+ connection.pause ()
class DataGetPackEvent (StringEvent):
@@ -52,7 +49,6 @@ class DataGetPackEvent (StringEvent):
def process (self, connection, sock):
items = connection.get_pack (self.value)
connection.send_event (sock, DataPackItemsEvent (items))
-register_event (DataGetPackEvent)
class DataPackItemsEvent (Event):
@@ -70,8 +66,7 @@ class DataPackItemsEvent (Event):
def process (self, connection, sock):
connection.items = self.items
- connection.close ()
-register_event (DataPackItemsEvent)
+ connection.pause ()
class DataPutPackEvent (Event):
@@ -92,13 +87,11 @@ class DataPutPackEvent (Event):
def process (self, connection, sock):
connection.put_pack (self.pack, self.items)
connection.send_event (sock, DataPackPutSuccessEvent ())
-register_event (DataPutPackEvent)
class DataPackPutSuccessEvent (EmptyEvent):
def process (self, connection, sock):
- connection.close ()
-register_event (DataPackPutSuccessEvent)
+ connection.pause ()
class DataGetItemEvent (Event):
@@ -119,7 +112,6 @@ class DataGetItemEvent (Event):
def process (self, connection, sock):
item = connection.get_item (self.pack, self.item)
connection.send_event (sock, DataItemEvent (item))
-register_event (DataGetItemEvent)
class DataItemEvent (Event):
@@ -137,8 +129,7 @@ class DataItemEvent (Event):
def process (self, connection, sock):
connection.item = item
- connection.close ()
-register_event (DataItemEvent)
+ connection.pause ()
class DataPutItemEvent (Event):
@@ -161,36 +152,38 @@ class DataPutItemEvent (Event):
def process (self, connection, sock):
connection.put_item (self.pack, self.item, self.data)
connection.send_event (sock, DataItemPutSuccessEvent ())
-register_event (DataPutItemEvent)
-class DataDeletePackEvent (DataGetPackEvent):
+class DataDeletePackEvent (StringEvent):
+
+ # value == pack
def process (self, connection, sock):
- connection.delete_pack (self.pack)
- connection.send_event (sock, DataPackDeleteSuccessEvent ())
-register_event (DataDeletePackEvent)
+ success = connection.delete_pack (self.value)
+ connection.send_event (sock, DataPackDeleteSuccessEvent (success))
class DataDeleteItemEvent (DataGetItemEvent):
def process (self, connection, sock):
- connection.delete_item (self.pack, self.item)
- connection.send_event (sock, DataItemDeleteSuccessEvent ())
-register_event (DataDeleteItemEvent)
+ success = connection.delete_item (self.pack, self.item)
+ connection.send_event (sock, DataItemDeleteSuccessEvent (success))
+
+class DataPackDeleteSuccessEvent (IntegerEvent):
-class DataPackDeleteSuccessEvent (EmptyEvent):
+ # value == count
def process (self, connection, sock):
- connection.close ()
-register_event (DataPackDeleteSuccessEvent)
+ connection.count = self.value
+ connection.pause ()
+
+class DataItemDeleteSuccessEvent (BoolEvent):
-class DataItemDeleteSuccessEvent (EmptyEvent):
+ # value == success
def process (self, connection, sock):
- connection.close ()
-register_event (DataItemDeleteSuccessEvent)
+ connection.success = self.value
+ connection.pause ()
class DataItemPutSuccessEvent (EmptyEvent):
def process (self, connection, sock):
- connection.close ()
-register_event (DataItemPutSuccessEvent)
+ connection.pause ()
diff --git a/cloudpy/data/server.py b/cloudpy/data/server.py
index eb8adda..cc768cb 100644
--- a/cloudpy/data/server.py
+++ b/cloudpy/data/server.py
@@ -50,7 +50,7 @@ class DataServer (cloudpy.connection.ServerConnection):
self.db_session = Session ()
def list_pack (self, pack):
- query = self.db_session.query (Item.name).filter_by (Item.pack == pack)
+ query = self.db_session.query (Item.name).filter (Item.pack == pack)
items = query.all ()
return items
@@ -74,9 +74,9 @@ class DataServer (cloudpy.connection.ServerConnection):
def delete_pack (self, pack):
query = self.db_session.query (Item).filter (Item.pack == pack)
- query.delete ()
+ count = query.delete ()
self.db_session.commit ()
- # FIXME : maybe could we return the number of deleted items ?
+ return count
def get_item (self, pack, item):
query = self.db_session.query (Item).filter (and_ (Item.pack == pack,
@@ -105,8 +105,6 @@ class DataServer (cloudpy.connection.ServerConnection):
def delete_item (self, item):
query = self.db_session.query (Item).filter (and_ (Item.pack == pack,
Item.name == item))
- item = query.first ()
- # FIXME : add an error if item is not found
- if item:
- self.db_session.delete (item)
- self.db_session.commit ()
+ count = query.delete ()
+ self.db_session.commit ()
+ return count == 1
diff --git a/cloudpy/event.py b/cloudpy/event.py
index 59402b0..a5c180e 100644
--- a/cloudpy/event.py
+++ b/cloudpy/event.py
@@ -94,8 +94,17 @@ class EventSerializer (object):
data = " ".join ([event.__class__.__name__] + args)
return data
+class EventRegistrator (type):
+
+ def __new__ (self, name, bases, dict):
+ event = type.__new__ (self, name, bases, dict)
+ Events[name] = event
+ return event
+
class Event (object):
+ __metaclass__ = EventRegistrator
+
def process (self, context):
raise NotImplementedError
@@ -140,6 +149,8 @@ class StringEvent (SingleValueEvent):
def describe_args ():
return ["string"]
-def register_event (event):
- Events[event.__name__] = event
- return event
+class IntegerEvent (SingleValueEvent):
+
+ @staticmethod
+ def describe_args ():
+ return ["int"]
diff --git a/cloudpy/heartbeat/client.py b/cloudpy/heartbeat/client.py
index 110aafb..e3b6ede 100644
--- a/cloudpy/heartbeat/client.py
+++ b/cloudpy/heartbeat/client.py
@@ -1,4 +1,5 @@
import cloudpy.connection
+from cloudpy.singleton import Singleton
import events
@@ -7,7 +8,10 @@ class HeartbeatClient (cloudpy.connection.ClientConnection):
kill = False
package = None
package_id = None
- undeploying = False
+ undeploy_data_pack = None
+
+ packages = None
+ workers = None
def send_event (ev):
host, port = cloudpy.config.config["heartbeat"].split (":")
@@ -16,6 +20,16 @@ def send_event (ev):
client.run ()
return client
+def list_packages ():
+ ev = events.HeartbeatListPackagesEvent ()
+ client = send_event (ev)
+ return client.packages
+
+def list_workers ():
+ ev = events.HeartbeatListWorkersEvent ()
+ client = send_event (ev)
+ return client.workers
+
def update (running):
ev = events.HeartbeatUpdateEvent (running)
client = send_event (ev)
@@ -34,4 +48,4 @@ def deploy (package, requested_workers):
def undeploy (package_id):
ev = events.HeartbeatUndeployEvent (package_id)
client = send_event (ev)
- return client.undeploying
+ return client.undeploy_data_pack
diff --git a/cloudpy/heartbeat/events.py b/cloudpy/heartbeat/events.py
index bb7bfca..6a9a0db 100644
--- a/cloudpy/heartbeat/events.py
+++ b/cloudpy/heartbeat/events.py
@@ -1,6 +1,6 @@
import socket
-from cloudpy.event import *
+from cloudpy.event import Event, EmptyEvent, StringEvent, BoolEvent
class HeartbeatUpdateEvent (BoolEvent):
@@ -21,14 +21,60 @@ class HeartbeatUpdateEvent (BoolEvent):
connection.send_event (sock, HeartbeatPackageEvent (package))
else:
connection.send_event (sock, HeartbeatOkEvent ())
-register_event (HeartbeatUpdateEvent)
+
+class HeartbeatListPackagesEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ packages = connection.list_packages ()
+ connection.send_event (sock, HeartbeatPackagesListEvent (packages))
+
+class HeartbeatListWorkersEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ workers = connection.list_workers ()
+ connection.send_event (sock, HeartbeatWorkersListEvent (workers))
+
+class HeartbeatPackagesListEvent (Event):
+
+ packages = None
+
+ def __init__ (self, packages):
+ self.packages = packages
+
+ def get_args (self):
+ return [self.packages]
+
+ @staticmethod
+ def describe_args ():
+ return ["base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.packages = self.packages
+ connection.close ()
+
+class HeartbeatWorkersListEvent (Event):
+
+ workers = None
+
+ def __init__ (self, workers):
+ self.workers = workers
+
+ def get_args (self):
+ return [self.workers]
+
+ @staticmethod
+ def describe_args ():
+ return ["base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.workers = self.workers
+ connection.close ()
class HeartbeatKillEvent (EmptyEvent):
def process (self, connection, sock):
connection.kill = True
connection.close ()
-register_event (HeartbeatKillEvent)
class HeartbeatOkEvent (EmptyEvent):
@@ -36,7 +82,6 @@ class HeartbeatOkEvent (EmptyEvent):
connection.kill = False
connection.package = None
connection.close ()
-register_event (HeartbeatOkEvent)
class HeartbeatPackageEvent (Event):
@@ -55,7 +100,6 @@ class HeartbeatPackageEvent (Event):
def process (self, connection, sock):
connection.package = self.package
connection.close ()
-register_event (HeartbeatPackageEvent)
class HeartbeatDeployEvent (HeartbeatPackageEvent):
@@ -64,7 +108,6 @@ class HeartbeatDeployEvent (HeartbeatPackageEvent):
if not package_id:
package_id = ""
connection.send_event (sock, HeartbeatPackageIdEvent (package_id))
-register_event (HeartbeatDeployEvent)
class HeartbeatPackageIdEvent (StringEvent):
@@ -76,7 +119,6 @@ class HeartbeatPackageIdEvent (StringEvent):
else:
connection.package_id = self.value
connection.close ()
-register_event (HeartbeatPackageIdEvent)
class HeartbeatUndeployEvent (StringEvent):
@@ -87,7 +129,6 @@ class HeartbeatUndeployEvent (StringEvent):
if not data_pack:
data_pack = ""
connection.send_event (sock, HeartbeatUndeployDataPackEvent (data_pack))
-register_event (HeartbeatUndeployEvent)
class HeartbeatUndeployDataPackEvent (StringEvent):
@@ -96,4 +137,3 @@ class HeartbeatUndeployDataPackEvent (StringEvent):
def process (self, connection, sock):
connection.undeploy_data_pack = self.value
connection.close ()
-register_event (HeartbeatUndeployDataPackEvent)
diff --git a/cloudpy/heartbeat/server.py b/cloudpy/heartbeat/server.py
index c59eb13..a2bff33 100644
--- a/cloudpy/heartbeat/server.py
+++ b/cloudpy/heartbeat/server.py
@@ -5,7 +5,7 @@ from sqlalchemy import create_engine, Table, Column, \
Integer, String, DateTime, MetaData, ForeignKey, \
or_
from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relation, backref, sessionmaker
+from sqlalchemy.orm import relation, backref, sessionmaker, join
from sqlalchemy.sql import func
TableBase = declarative_base ()
@@ -68,6 +68,15 @@ class HeartbeatServer (cloudpy.connection.ServerConnection):
Session.configure (bind = engine)
self.db_session = Session ()
+ def list_packages (self):
+ query = self.db_session.query (Package.full_id, Package.name)
+ return [(id, name) for (id, name) in query.all ()]
+
+ def list_workers (self):
+ query = self.db_session.query (Worker.name, Package.full_id)
+ query = query.select_from (join (Package, Worker))
+ return [(worker, package) for (worker, package) in query.all ()]
+
def update_running (self, host):
worker = self.db_session.query (Worker).filter_by (name = host).first ()
if not worker:
@@ -97,10 +106,14 @@ class HeartbeatServer (cloudpy.connection.ServerConnection):
q = q.outerjoin ((subq, Package.id == subq.c.package_id))
q = q.filter (or_ (subq.c.workers_count == None,
subq.c.workers_count != Package.requested_workers))
- package, count = q.first ()
+ result = q.first ()
+ if not result:
+ return ""
+ package, count = result
worker.package = package
self.db_session.commit ()
return {
+ "id": package.full_id,
"name": package.name,
"data_pack": package.data_pack,
"entry_point": package.entry_point,
diff --git a/cloudpy/package.py b/cloudpy/package.py
index 179a47a..7492c67 100644
--- a/cloudpy/package.py
+++ b/cloudpy/package.py
@@ -30,6 +30,7 @@ import os
import sys
import base64
+from cloudpy.singleton import Singleton
import cloudpy.data.client
def recursive_unlink (base):
@@ -38,31 +39,35 @@ def recursive_unlink (base):
os.unlink (os.path.join (path, file))
for directory in dirs:
os.rmdir (os.path.join (path, directory))
+ os.rmdir (base)
class Package (object):
+ id = ""
+
name = ""
data_pack = ""
entry_point = ""
package_dir = ""
- def __init__ (self, name, data_pack, entry_point):
- self.name = name
- self.data_pack = data_pack
- self.entry_point = entry_point
- # setup signal handler
- signal.signal (signal.SIGTERM, self.undeploy)
+ def __init__ (self, name, data_pack, entry_point, id = ""):
+ Package.name = name
+ Package.data_pack = data_pack
+ Package.entry_point = entry_point
+ Package.id = id
def deploy (self):
- if not os.path.isdir (self.name):
- os.mkdir (self.name)
- os.chdir (self.name)
- self.package_dir = os.getcwd ()
- sys.path = [self.package_dir] + sys.path
- if self.data_pack:
+ # setup signal handler
+ signal.signal (signal.SIGTERM, self.undeploy)
+ if not os.path.isdir (Package.name):
+ os.mkdir (Package.name)
+ os.chdir (Package.name)
+ Package.package_dir = os.getcwd ()
+ sys.path = [Package.package_dir] + sys.path
+ if Package.data_pack:
# fetch and unpack data pack
- items = cloudpy.data.client.get_pack (self.data_pack)
+ items = cloudpy.data.client.get_pack (Package.data_pack)
for item in items:
dirname = os.path.dirname (item.metadata)
if dirname and not os.path.isdir (dirname):
@@ -71,25 +76,32 @@ class Package (object):
f.write (base64.b64decode (item.data))
def run (self):
- entry_module, entry_function = self.entry_point.rsplit (".", 1)
+ entry_module, entry_function = Package.entry_point.rsplit (".", 1)
try:
entry_module = __import__ (entry_module)
entry_function = getattr (entry_module, entry_function)
- except Exception, e:
- print "Entry point %s does not exist : %s" % (self.entry_point, e)
+ except (ImportError, AttributeError) as e:
+ print "Entry point %s does not exist : %s : %s" \
+ % (Package.entry_point, e.__class__.__name__, e)
raise SystemExit
entry_function ()
def undeploy (self, sig = None, stack = None):
- recursive_unlink (self.package_dir)
+ if sig:
+ print "Received undeploy signal."
+ recursive_unlink (Package.package_dir)
+ if sig:
+ raise SystemExit
def to_dict (self):
return {
- "name": self.name,
- "data_pack": self.data_pack,
- "entry_point": self.entry_point,
+ "id": Package.id,
+ "name": Package.name,
+ "data_pack": Package.data_pack,
+ "entry_point": Package.entry_point,
}
@staticmethod
def from_dict (dict):
- return Package (dict["name"], dict["data_pack"], dict["entry_point"])
+ return Package (dict["name"], dict["data_pack"],
+ dict["entry_point"], dict["id"])
diff --git a/cloudpy/singleton.py b/cloudpy/singleton.py
new file mode 100644
index 0000000..1cfe54b
--- /dev/null
+++ b/cloudpy/singleton.py
@@ -0,0 +1,9 @@
+class Singleton (object):
+
+ instance = None
+ def __new__ (cls, *args, **kwargs):
+ if not cls.instance:
+ cls.instance = object.__new__ (cls)
+ else:
+ cls.__init__ = lambda *a, **k: None
+ return cls.instance
diff --git a/cloudpy/task.py b/cloudpy/task.py
index e69de29..8876156 100644
--- a/cloudpy/task.py
+++ b/cloudpy/task.py
@@ -0,0 +1,52 @@
+import cloudpy.taskqueue.client
+
+DEFAULT_RETRY_RATE = 180 # 180 seconds
+
+Tasks = {}
+
+class TaskSerializer (object):
+
+ @staticmethod
+ def load (id, task_class, args, kwargs):
+ try:
+ cls = Tasks[task_class]
+ assert issubclass (cls, Task)
+ except (AssertionError, KeyError):
+ raise ValueError, "Unknown task : " + (data)
+ task = cls (*args, **kwargs)
+ task.id = id
+ return task
+
+ @staticmethod
+ def dump (task):
+ task_class = task.__class__.__name__
+ return task_class, task.args, task.kwargs
+
+class TaskRegistrator (type):
+
+ def __new__ (self, name, bases, dict):
+ task = type.__new__ (self, name, bases, dict)
+ Tasks[name] = task
+ return task
+
+class Task (object):
+
+ __metaclass__ = TaskRegistrator
+
+ id = None
+ retry_rate = DEFAULT_RETRY_RATE
+ args = None
+ kwargs = None
+
+ def __init__ (self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+
+ def finish (self):
+ cloudpy.taskqueue.client.finish_task (self.id)
+
+ def process (self):
+ return self.do_process (*self.args, **self.kwargs)
+
+ def do_process (self, *args, **kwargs):
+ raise NotImplementedError
diff --git a/cloudpy/workqueue.py b/cloudpy/taskqueue/__init__.py
index e69de29..e69de29 100644
--- a/cloudpy/workqueue.py
+++ b/cloudpy/taskqueue/__init__.py
diff --git a/cloudpy/taskqueue/client.py b/cloudpy/taskqueue/client.py
new file mode 100644
index 0000000..c9365e9
--- /dev/null
+++ b/cloudpy/taskqueue/client.py
@@ -0,0 +1,58 @@
+import cloudpy.connection
+import cloudpy.config
+import cloudpy.package
+import cloudpy.task
+from cloudpy.singleton import Singleton
+
+import events
+
+class TaskqueueClient (Singleton, cloudpy.connection.ClientConnection):
+
+ task_id = None
+ task_class = None
+ args = None
+ kwargs = None
+ queue_empty = None
+ success = None
+
+ def __init__ (self, package_id, *args, **kwargs):
+ super (TaskqueueClient, self).__init__ (*args, **kwargs)
+ self.send_event (events.TaskqueuePackageIdEvent (package_id))
+ self.run ()
+
+def send_event (ev):
+ package_id = cloudpy.package.Package.id
+ host, port = cloudpy.config.config["taskqueue"].split (":")
+ client = TaskqueueClient (package_id, host, port)
+ client.send_event (ev)
+ client.run ()
+ return client
+
+def is_empty ():
+ ev = events.TaskqueueIsEmptyRequestEvent ()
+ client = send_event (ev)
+ return client.queue_empty
+
+def add_task (task):
+ # add_task (task_class, args, kwargs, retry_rate):
+ task_class, args, kwargs = cloudpy.task.TaskSerializer.dump (task)
+ retry_rate = task.retry_rate
+ ev = events.TaskqueueAddTaskEvent (task_class, args, kwargs, retry_rate)
+ client = send_event (ev)
+ return client.task_id
+
+def get_task ():
+ ev = events.TaskqueueGetTaskEvent ()
+ client = send_event (ev)
+ return cloudpy.task.TaskSerializer.load (client.task_id, client.task_class,
+ client.args, client.kwargs)
+
+def finish_task (task_id):
+ ev = events.TaskqueueFinishTaskEvent (task_id)
+ client = send_event (ev)
+ return client.success
+
+def delete_task (task_id):
+ ev = events.TaskqueueDeleteTaskEvent (task_id)
+ client = send_event (ev)
+ return client.success
diff --git a/cloudpy/taskqueue/events.py b/cloudpy/taskqueue/events.py
new file mode 100644
index 0000000..7302022
--- /dev/null
+++ b/cloudpy/taskqueue/events.py
@@ -0,0 +1,117 @@
+from cloudpy.event import Event, EmptyEvent, StringEvent, BoolEvent
+import cloudpy.uid
+
+class TaskqueuePackageIdEvent (StringEvent):
+
+ # value == package_id
+
+ def process (self, connection, sock):
+ connection.package_id = self.value
+ connection.send_event (sock, TaskqueueSuccessEvent (True))
+
+class TaskqueueIsEmptyRequestEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ is_empty = connection.is_empty ()
+ connection.send_event (sock, TaskqueueIsEmptyEvent (is_empty))
+
+class TaskqueueIsEmptyEvent (BoolEvent):
+
+ # value == is_empty
+
+ def process (self, connection, sock):
+ connection.queue_empty = self.value
+ connection.pause ()
+
+class TaskqueueAddTaskEvent (Event):
+
+ task_class = None
+ args = None
+ kwargs = None
+ retry_rate = None
+
+ def __init__ (self, task_class, args, kwargs, retry_rate):
+ self.task_class = task_class
+ self.args = args
+ self.kwargs = kwargs
+ self.retry_rate = retry_rate
+
+ def get_args (self):
+ return [self.task_class, self.args, self.kwargs, self.retry_rate]
+
+ @staticmethod
+ def describe_args ():
+ return ["base64-string", "base64-yaml", "base64-yaml", "int"]
+
+ def process (self, connection, sock):
+ task_id = connection.add_task (self.task_class,
+ self.args,
+ self.kwargs,
+ self.retry_rate)
+ connection.send_event (sock, TaskqueueTaskIdEvent (task_id))
+
+class TaskqueueTaskIdEvent (StringEvent):
+
+ # value == task_id
+
+ def process (self, connection, sock):
+ connection.task_id = self.value
+ connection.pause ()
+
+class TaskqueueGetTaskEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ task_id, task_class, args, kwargs = connection.get_task ()
+ connection.send_event (sock, TaskqueueTaskEvent (task_id, task_class,
+ args, kwargs))
+
+class TaskqueueTaskEvent (Event):
+
+ id = None
+ task_class = None
+ args = None
+ kwargs = None
+
+ def __init__ (self, id, task_class, args, kwargs):
+ self.id = id
+ self.task_class = task_class
+ self.args = args
+ self.kwargs = kwargs
+
+ def get_args (self):
+ return [self.id, self.task_class, self.args, self.kwargs]
+
+ @staticmethod
+ def describe_args ():
+ return ["string", "base64-string", "base64-yaml", "base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.task_id = self.id
+ connection.task_class = self.task_class
+ connection.args = self.args
+ connection.kwargs = self.kwargs
+ connection.pause ()
+
+class TaskqueueFinishTaskEvent (StringEvent):
+
+ # value == task_id
+
+ def process (self, connection, sock):
+ success = connection.finish_task (self.value)
+ connection.send_event (sock, TaskqueueSuccessEvent (success))
+
+class TaskqueueSuccessEvent (BoolEvent):
+
+ # value == success
+
+ def process (self, connection, sock):
+ connection.success = self.value
+ connection.pause ()
+
+class TaskqueueDeleteTaskEvent (StringEvent):
+
+ # value == task_id
+
+ def process (self, connection, sock):
+ success = connection.delete_task (self.value)
+ connection.send_event (sock, TaskqueueSuccessEvent (success))
diff --git a/cloudpy/taskqueue/server.py b/cloudpy/taskqueue/server.py
new file mode 100644
index 0000000..d826822
--- /dev/null
+++ b/cloudpy/taskqueue/server.py
@@ -0,0 +1,115 @@
+import logging
+import base64
+import yaml
+from datetime import datetime, timedelta
+
+from sqlalchemy import create_engine, MetaData, Column, \
+ Integer, String, Text, DateTime, \
+ PrimaryKeyConstraint, ForeignKey, \
+ and_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relation, backref, sessionmaker
+from sqlalchemy.sql import func
+
+TableBase = declarative_base ()
+Session = sessionmaker ()
+
+import cloudpy.connection
+import cloudpy.uid
+import cloudpy.item
+
+import cloudpy.taskqueue.events
+
+RETRY_FACTOR = 1.3
+
+class Task (TableBase):
+
+ __tablename__ = "tasks"
+ __table_args__ = (PrimaryKeyConstraint ("package_id", "task_id",
+ name = "pk"), {})
+
+ package_id = Column (String (36))
+ task_id = Column (String (36))
+ task_class = Column (String)
+ args = Column (Text)
+ kwargs = Column (Text)
+ retry_rate = Column (Integer)
+ retry_date = Column (DateTime)
+
+ def __init__ (self, package_id, task_id, task_class,
+ args, kwargs, retry_rate):
+ self.package_id = package_id
+ self.task_id = task_id
+ self.task_class = task_class
+ self.args = args
+ self.kwargs = self.kwargs
+ self.retry_rate = retry_rate
+ self.retry_date = datetime.now ()
+
+ def __repr__ (self):
+ return "<Task %s of package %s>" % (self.task_id, self.package_id)
+
+class TaskqueueServer (cloudpy.connection.ServerConnection):
+
+ db_session = None
+
+ package_id = None
+
+ def __init__ (self, *args, **kwargs):
+ super (TaskqueueServer, self).__init__ (*args, **kwargs)
+ engine = create_engine ("sqlite:///taskqueue.db")
+ metadata = TableBase.metadata
+ metadata.create_all (engine)
+ Session.configure (bind = engine)
+ self.db_session = Session ()
+
+ def is_empty (self):
+ query = self.db_session.query (Task.task_id)
+ count = query.filter (Task.package_id == self.package_id).count ()
+ return count == 0
+
+ def add_task (self, task_class, args, kwargs, retry_rate):
+ task_id = cloudpy.uid.generate ()
+ args = base64.b64encode (yaml.dump (args))
+ kwargs = base64.b64encode (yaml.dump (kwargs))
+ task = Task (self.package_id, task_id, task_class,
+ args, kwargs, retry_rate)
+ self.db_session.add (task)
+ self.db_session.commit ()
+ return task_id
+
+ def get_task (self):
+ query = self.db_session.query (Task)
+ query = query.filter (and_ (Task.package_id == self.package_id,
+ Task.retry_date <= datetime.now ()))
+ task = query.one ()
+ if not task:
+ return "", [], {}
+ else:
+ task.retry_date += timedelta (0, task.retry_rate)
+ task.retry_rate = task.retry_rate * RETRY_FACTOR
+ self.db_session.commit ()
+ if task.args:
+ args = yaml.load (base64.b64decode (task.args))
+ else:
+ args = []
+ if task.kwargs:
+ kwargs = yaml.load (base64.b64decode (task.kwargs))
+ else:
+ kwargs = {}
+ return task.task_id, task.task_class, args, kwargs
+
+ def delete_task (self, task_id):
+ query = self.db_session.query (Task)
+ query = query.filter (and_ (Task.package_id == self.package_id,
+ Task.task_id == task_id))
+ task = query.first ()
+ if task:
+ self.db_session.delete (task)
+ self.db_session.commit ()
+ return True
+ else:
+ return False
+
+ def finish_task (self, task_id):
+ self.delete_task (task_id)
diff --git a/list-packages.py b/list-packages.py
new file mode 100644
index 0000000..dbd9c52
--- /dev/null
+++ b/list-packages.py
@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+import cloudpy.config
+import cloudpy.heartbeat.client
+
+if __name__ == "__main__":
+ cloudpy.config.load_server_config ()
+ packages = cloudpy.heartbeat.client.list_packages ()
+ for id, name in packages:
+ print "%s - %s" % (id, name)
diff --git a/list-workers.py b/list-workers.py
new file mode 100644
index 0000000..32341af
--- /dev/null
+++ b/list-workers.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+import cloudpy.config
+import cloudpy.heartbeat.client
+
+if __name__ == "__main__":
+ cloudpy.config.load_server_config ()
+ workers = cloudpy.heartbeat.client.list_workers ()
+ for worker, package in workers:
+ if package:
+ print "%s running %s" % (worker, package)
+ else:
+ print worker
diff --git a/taskqueue.py b/taskqueue.py
new file mode 100644
index 0000000..01f155b
--- /dev/null
+++ b/taskqueue.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+
+import cloudpy.config
+import cloudpy.taskqueue.server
+
+if __name__ == "__main__":
+ cloudpy.config.load_server_config ()
+ host, port = cloudpy.config.config["taskqueue"].split (":")
+ print "Running Cloudpy Taskqueue server on %s:%s" % (host, port)
+ server = cloudpy.taskqueue.server.TaskqueueServer (host, port)
+ try:
+ server.run ()
+ except KeyboardInterrupt:
+ print "Shutting down Taskqueue server"
diff --git a/test_package/package.yml b/test_package/package.yml
new file mode 100644
index 0000000..bb7ad29
--- /dev/null
+++ b/test_package/package.yml
@@ -0,0 +1,5 @@
+name: test
+files:
+ - test.py
+ - test_task.py
+entry_point: test.main
diff --git a/test_package/test.py b/test_package/test.py
new file mode 100644
index 0000000..5791900
--- /dev/null
+++ b/test_package/test.py
@@ -0,0 +1,13 @@
+import cloudpy.taskqueue.client as taskqueue
+import test_task
+
+def main ():
+ print taskqueue.is_empty ()
+ task = test_task.TestTask (0)
+ taskqueue.add_task (task)
+ print taskqueue.is_empty ()
+ while True:
+ task = taskqueue.get_task ()
+ if not task:
+ break
+ task.process ()
diff --git a/test_package/test_task.py b/test_package/test_task.py
new file mode 100644
index 0000000..7a048c6
--- /dev/null
+++ b/test_package/test_task.py
@@ -0,0 +1,10 @@
+import cloudpy.task
+import cloudpy.taskqueue.client as taskqueue
+
+class TestTask (cloudpy.task.Task):
+
+ def do_process (self, i):
+ task = TestTask (i + 1)
+ print i
+ taskqueue.add_task (task)
+ self.finish ()
diff --git a/undeploy.py b/undeploy.py
index bc10420..09dbc45 100644
--- a/undeploy.py
+++ b/undeploy.py
@@ -4,6 +4,7 @@ import sys
import cloudpy.config
import cloudpy.heartbeat.client
+import cloudpy.data.client
if __name__ == "__main__":
if len (sys.argv) != 2:
@@ -11,7 +12,9 @@ if __name__ == "__main__":
sys.exit (2)
package_id = sys.argv[1]
cloudpy.config.load_server_config ()
- if cloudpy.heartbeat.client.undeploy (package_id):
+ data_pack = cloudpy.heartbeat.client.undeploy (package_id)
+ if data_pack:
+ cloudpy.data.client.delete_pack (data_pack)
print "Package is being undeployed."
else:
print "Could not undeploy package (it probably wasn't deployed)."
diff --git a/worker.py b/worker.py
index 6ef23e1..73ad551 100644
--- a/worker.py
+++ b/worker.py
@@ -73,6 +73,8 @@ if not running:
pid_f.close ()
if not package:
+ if not running:
+ print "No package currently needs more workers"
raise SystemExit
# package is set here : deploy it