summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuillaume Seguin <guillaume@segu.in>2009-11-17 02:15:39 +0100
committerGuillaume Seguin <guillaume@segu.in>2009-11-17 02:15:39 +0100
commit975acee881c9f6c3d88dd146741e4bd462896617 (patch)
treee1f5a539ab2bb0ae195e9cafd7e53ed1d2cae760
parent5626e1bdbfccb0671c969eff1d6232d28ce0380b (diff)
downloadcloudpy-975acee881c9f6c3d88dd146741e4bd462896617.tar.gz
cloudpy-975acee881c9f6c3d88dd146741e4bd462896617.tar.bz2
Add Data server, refactor events handling, various changes
-rw-r--r--cloudpy/connection.py19
-rw-r--r--cloudpy/data/__init__.py (renamed from cloudpy/data.py)0
-rw-r--r--cloudpy/data/client.py48
-rw-r--r--cloudpy/data/events.py196
-rw-r--r--cloudpy/data/server.py112
-rw-r--r--cloudpy/event.py54
-rw-r--r--cloudpy/event_base.py47
-rw-r--r--cloudpy/heartbeat/__init__.py167
-rw-r--r--cloudpy/heartbeat/client.py37
-rw-r--r--cloudpy/heartbeat/events.py10
-rw-r--r--cloudpy/heartbeat/server.py132
-rw-r--r--cloudpy/item.py27
-rw-r--r--cloudpy/package.py43
-rw-r--r--data.py14
-rw-r--r--deploy.py20
-rw-r--r--heartbeat.py4
-rw-r--r--setup-workers.py3
-rw-r--r--undeploy.py4
-rw-r--r--worker.py10
19 files changed, 701 insertions, 246 deletions
diff --git a/cloudpy/connection.py b/cloudpy/connection.py
index 7090b73..3a84993 100644
--- a/cloudpy/connection.py
+++ b/cloudpy/connection.py
@@ -40,6 +40,9 @@ class Connection (object):
self._socket = socket.socket ()
self.prepare ()
+ def send (self, sock, data):
+ sock.sendall (data)
+
def prepare (self):
raise NotImplementedError
@@ -72,14 +75,11 @@ class ClientConnection (Connection):
super (ClientConnection, self).__init__ (*args, **kwargs)
self._parser = Parser ()
- def send (self, data):
- self._socket.send (data)
-
def send_event (self, ev):
ev_desc = ev.describe ()
logging.log (logging.INFO, "Sending event to server : %s" \
% (ev_desc,))
- self.send (self._parser.prepare (ev_desc))
+ self.send (self._socket, self._parser.prepare (ev_desc))
def prepare (self):
self._socket.connect ((self._host, self._port))
@@ -139,13 +139,13 @@ class ServerConnection (Connection):
def broadcast (self, data):
for sock in self._client_sockets:
- sock.send (data)
+ self.send (sock, data)
def send_event (self, sock, ev):
ev_desc = ev.describe ()
logging.log (logging.INFO, "Sending event to %s:%d : %s" \
% (self._addresses[sock] + (ev_desc,)))
- sock.send (self._parsers[sock].prepare (ev_desc))
+ self.send (sock, self._parsers[sock].prepare (ev_desc))
def prepare (self):
self._socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -166,7 +166,7 @@ class ServerConnection (Connection):
logging.log (logging.INFO, "Accepted client connection from %s"
% (address,))
self.on_accept (sock)
- sock.setblocking (0)
+ sock.settimeout (0.01)
self._client_sockets.append (sock)
else:
self.recv_socket (sock)
@@ -179,10 +179,7 @@ class ServerConnection (Connection):
break
self.on_receive (sock, data)
except socket.error, se:
- if se.args[0] == EWOULDBLOCK:
- return
- else:
- break
+ break
self._client_sockets.remove (sock)
self.on_connection_lost (sock)
diff --git a/cloudpy/data.py b/cloudpy/data/__init__.py
index e69de29..e69de29 100644
--- a/cloudpy/data.py
+++ b/cloudpy/data/__init__.py
diff --git a/cloudpy/data/client.py b/cloudpy/data/client.py
new file mode 100644
index 0000000..bdd80b9
--- /dev/null
+++ b/cloudpy/data/client.py
@@ -0,0 +1,48 @@
+import cloudpy.connection
+import cloudpy.config
+import cloudpy.item
+
+import events
+
+class DataClient (cloudpy.connection.ClientConnection):
+
+ items_list = None
+ items = None
+ item = None
+ pack = None
+
+def send_event (ev):
+ host, port = cloudpy.config.config["data"].split (":")
+ client = DataClient (host, port)
+ client.send_event (ev)
+ client.run ()
+ return client
+
+def create_pack ():
+ ev = events.DataCreatePackEvent ()
+ client = send_event (ev)
+ return client.pack
+
+def list_pack (pack):
+ ev = events.DataListPackEvent (pack)
+ client = send_event (ev)
+ return client.items_list
+
+def get_pack (pack):
+ ev = events.DataGetPackEvent (pack)
+ client = send_event (ev)
+ return cloudpy.item.from_dict (pack, client.items)
+
+def put_pack (pack, items):
+ items = cloudpy.item.to_dict (items)
+ ev = events.DataPutPackEvent (pack, items)
+ client = send_event (ev)
+
+def get_item (pack, item):
+ ev = events.DataGetItemEvent (pack, item)
+ client = send_event (ev)
+ return client.item
+
+def put_item (item):
+ ev = events.DataPutItemEvent (item.pack, item.name, item.serialize ())
+ client = send_event (ev)
diff --git a/cloudpy/data/events.py b/cloudpy/data/events.py
new file mode 100644
index 0000000..748016c
--- /dev/null
+++ b/cloudpy/data/events.py
@@ -0,0 +1,196 @@
+from cloudpy.event import *
+import cloudpy.uid
+
+class DataCreatePackEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ pack = connection.create_pack ()
+ connection.send_event (sock, DataPackCreatedEvent (pack))
+register_event (DataCreatePackEvent)
+
+class DataPackCreatedEvent (StringEvent):
+
+ # value == data_pack
+
+ def process (self, connection, sock):
+ connection.pack = self.value
+ connection.close ()
+register_event (DataPackCreatedEvent)
+
+class DataListPackEvent (StringEvent):
+
+ # value == data_pack
+
+ def process (self, connection, sock):
+ items_list = connection.list_pack (self.value)
+ connection.send_event (sock, DataPackListEvent (items_list))
+register_event (DataListPackEvent)
+
+class DataPackListEvent (Event):
+
+ items_list = None
+
+ def __init__ (self, items_list):
+ self.items_list = items_list
+
+ def get_args (self):
+ return [self.items_list]
+
+ @staticmethod
+ def describe_args ():
+ return ["base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.items_list = self.items_list
+ connection.close ()
+register_event (DataPackListEvent)
+
+class DataGetPackEvent (StringEvent):
+
+ # value == data_pack
+
+ def process (self, connection, sock):
+ items = connection.get_pack (self.value)
+ connection.send_event (sock, DataPackItemsEvent (items))
+register_event (DataGetPackEvent)
+
+class DataPackItemsEvent (Event):
+
+ items = None
+
+ def __init__ (self, items):
+ self.items = items
+
+ def get_args (self):
+ return [self.items]
+
+ @staticmethod
+ def describe_args ():
+ return ["base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.items = self.items
+ connection.close ()
+register_event (DataPackItemsEvent)
+
+class DataPutPackEvent (Event):
+
+ pack = None
+ items = None
+
+ def __init__ (self, pack, items):
+ self.pack = pack
+ self.items = items
+
+ def get_args (self):
+ return [self.pack, self.items]
+
+ @staticmethod
+ def describe_args ():
+ return ["string", "base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.put_pack (self.pack, self.items)
+ connection.send_event (sock, DataPackPutSuccessEvent ())
+register_event (DataPutPackEvent)
+
+class DataPackPutSuccessEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ connection.close ()
+register_event (DataPackPutSuccessEvent)
+
+class DataGetItemEvent (Event):
+
+ pack = None
+ item = None
+
+ def __init__ (self, pack, item):
+ self.pack = pack
+ self.item = item
+
+ def get_args (self):
+ return [self.pack, self.item]
+
+ @staticmethod
+ def describe_args ():
+ return ["string", "string"]
+
+ def process (self, connection, sock):
+ item = connection.get_item (self.pack, self.item)
+ connection.send_event (sock, DataItemEvent (item))
+register_event (DataGetItemEvent)
+
+class DataItemEvent (Event):
+
+ item = None
+
+ def __init__ (self, item):
+ self.item = item
+
+ def get_args (self):
+ return [self.item]
+
+ @staticmethod
+ def describe_args ():
+ return ["base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.item = item
+ connection.close ()
+register_event (DataItemEvent)
+
+class DataPutItemEvent (Event):
+
+ pack = None
+ item = None
+ data = None
+
+ def __init__ (self, pack, item, data):
+ self.pack = pack
+ self.item = item
+ self.data = data
+
+ def get_args (self):
+ return [self.pack, self.item, self.data]
+
+ @staticmethod
+ def describe_args ():
+ return ["string", "string", "base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.put_item (self.pack, self.item, self.data)
+ connection.send_event (sock, DataItemPutSuccessEvent ())
+register_event (DataPutItemEvent)
+
+class DataDeletePackEvent (DataGetPackEvent):
+
+ def process (self, connection, sock):
+ connection.delete_pack (self.pack)
+ connection.send_event (sock, DataPackDeleteSuccessEvent ())
+register_event (DataDeletePackEvent)
+
+class DataDeleteItemEvent (DataGetItemEvent):
+
+ def process (self, connection, sock):
+ connection.delete_item (self.pack, self.item)
+ connection.send_event (sock, DataItemDeleteSuccessEvent ())
+register_event (DataDeleteItemEvent)
+
+class DataPackDeleteSuccessEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ connection.close ()
+register_event (DataPackDeleteSuccessEvent)
+
+class DataItemDeleteSuccessEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ connection.close ()
+register_event (DataItemDeleteSuccessEvent)
+
+class DataItemPutSuccessEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ connection.close ()
+register_event (DataItemPutSuccessEvent)
diff --git a/cloudpy/data/server.py b/cloudpy/data/server.py
new file mode 100644
index 0000000..eb8adda
--- /dev/null
+++ b/cloudpy/data/server.py
@@ -0,0 +1,112 @@
+import logging
+
+from sqlalchemy import create_engine, MetaData, Column, \
+ Integer, String, Text, DateTime, \
+ PrimaryKeyConstraint, ForeignKey, \
+ and_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relation, backref, sessionmaker
+from sqlalchemy.sql import func
+
+TableBase = declarative_base ()
+Session = sessionmaker ()
+
+import cloudpy.connection
+import cloudpy.uid
+import cloudpy.item
+
+import cloudpy.data.events
+
+class Item (TableBase):
+
+ __tablename__ = "items"
+ __table_args__ = (PrimaryKeyConstraint ("pack", "name", name = "pk"), {})
+
+ pack = Column (String (36))
+ name = Column (String (36))
+ meta = Column (String)
+ data = Column (Text)
+
+ def __init__ (self, pack, name, meta, data):
+ self.pack = pack
+ self.name = name
+ self.meta = meta
+ self.data = data
+ # FIXME : this storage sucks, change it asap !
+
+ def __repr__ (self):
+ return "<Item %s in pack %s>" % (self.name, self.pack)
+
+class DataServer (cloudpy.connection.ServerConnection):
+
+ db_session = None
+
+ def __init__ (self, *args, **kwargs):
+ super (DataServer, self).__init__ (*args, **kwargs)
+ engine = create_engine ("sqlite:///data.db")
+ metadata = TableBase.metadata
+ metadata.create_all (engine)
+ Session.configure (bind = engine)
+ self.db_session = Session ()
+
+ def list_pack (self, pack):
+ query = self.db_session.query (Item.name).filter_by (Item.pack == pack)
+ items = query.all ()
+ return items
+
+ def create_pack (self):
+ return cloudpy.uid.generate ()
+
+ def get_pack (self, pack):
+ query = self.db_session.query (Item).filter (Item.pack == pack)
+ raw_items = query.all ()
+ items = {}
+ for item in raw_items:
+ items[item.name] = [item.meta, item.data]
+ return items
+
+ def put_pack (self, pack, items):
+ self.delete_pack (pack)
+ items = cloudpy.item.from_dict (pack, items)
+ for item in items:
+ self.put_item (pack, item, commit = False)
+ self.db_session.commit ()
+
+ def delete_pack (self, pack):
+ query = self.db_session.query (Item).filter (Item.pack == pack)
+ query.delete ()
+ self.db_session.commit ()
+ # FIXME : maybe could we return the number of deleted items ?
+
+ def get_item (self, pack, item):
+ query = self.db_session.query (Item).filter (and_ (Item.pack == pack,
+ Item.name == item))
+ item = query.first ()
+ if not item:
+ return None
+ else:
+ return [item.meta, item.data]
+
+ def put_item (self, pack, item, raw_item = None, commit = True):
+ if raw_item:
+ item = cloudpy.item.Item.unserialize (pack, item, raw_item)
+ query = self.db_session.query (Item)
+ query = query.filter (and_ (Item.pack == pack, Item.name == item.name))
+ db_item = query.first ()
+ if not db_item:
+ db_item = Item (pack, item.name, item.metadata, item.data)
+ self.db_session.add (db_item)
+ else:
+ db_item.meta = raw_item.metadata
+ db_item.data = raw_item.data
+ if commit:
+ self.db_session.commit ()
+
+ def delete_item (self, item):
+ query = self.db_session.query (Item).filter (and_ (Item.pack == pack,
+ Item.name == item))
+ item = query.first ()
+ # FIXME : add an error if item is not found
+ if item:
+ self.db_session.delete (item)
+ self.db_session.commit ()
diff --git a/cloudpy/event.py b/cloudpy/event.py
index d8cb29b..59402b0 100644
--- a/cloudpy/event.py
+++ b/cloudpy/event.py
@@ -1,6 +1,8 @@
import base64
import yaml
+Events = {}
+
class ValueCoder (object):
@staticmethod
@@ -70,7 +72,7 @@ class EventSerializer (object):
bits = data.split (" ")
class_name = bits[0]
try:
- cls = globals ()[class_name]
+ cls = Events[class_name]
assert issubclass (cls, Event)
except (AssertionError, KeyError):
raise ValueError, "Unknown event : " + (data)
@@ -92,4 +94,52 @@ class EventSerializer (object):
data = " ".join ([event.__class__.__name__] + args)
return data
-from heartbeat.events import *
+class Event (object):
+
+ def process (self, context):
+ raise NotImplementedError
+
+ def describe (self):
+ return EventSerializer.dump (self, *self.get_args ())
+
+ def get_args (self):
+ raise NotImplementedError
+
+ @staticmethod
+ def describe_args ():
+ raise NotImplementedError
+
+class EmptyEvent (Event):
+
+ def get_args (self):
+ return []
+
+ @staticmethod
+ def describe_args ():
+ return []
+
+class SingleValueEvent (Event):
+
+ value = None
+
+ def __init__ (self, value):
+ self.value = value
+
+ def get_args (self):
+ return [self.value]
+
+class BoolEvent (SingleValueEvent):
+
+ @staticmethod
+ def describe_args ():
+ return ["bool"]
+
+class StringEvent (SingleValueEvent):
+
+ @staticmethod
+ def describe_args ():
+ return ["string"]
+
+def register_event (event):
+ Events[event.__name__] = event
+ return event
diff --git a/cloudpy/event_base.py b/cloudpy/event_base.py
deleted file mode 100644
index b3d5bac..0000000
--- a/cloudpy/event_base.py
+++ /dev/null
@@ -1,47 +0,0 @@
-from event import EventSerializer
-
-class Event (object):
-
- def process (self, context):
- raise NotImplementedError
-
- def describe (self):
- return EventSerializer.dump (self, *self.get_args ())
-
- def get_args (self):
- raise NotImplementedError
-
- @staticmethod
- def describe_args ():
- raise NotImplementedError
-
-class EmptyEvent (Event):
-
- def get_args (self):
- return []
-
- @staticmethod
- def describe_args ():
- return []
-
-class SingleValueEvent (Event):
-
- value = None
-
- def __init__ (self, value):
- self.value = value
-
- def get_args (self):
- return [self.value]
-
-class BoolEvent (SingleValueEvent):
-
- @staticmethod
- def describe_args ():
- return ["bool"]
-
-class StringEvent (SingleValueEvent):
-
- @staticmethod
- def describe_args ():
- return ["string"]
diff --git a/cloudpy/heartbeat/__init__.py b/cloudpy/heartbeat/__init__.py
index b8dbf77..e69de29 100644
--- a/cloudpy/heartbeat/__init__.py
+++ b/cloudpy/heartbeat/__init__.py
@@ -1,167 +0,0 @@
-import logging
-from datetime import datetime
-
-from sqlalchemy import create_engine, Table, Column, \
- Integer, String, DateTime, MetaData, ForeignKey, \
- or_
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relation, backref, sessionmaker
-from sqlalchemy.sql import func
-
-TableBase = declarative_base ()
-Session = sessionmaker ()
-
-import cloudpy.connection
-import cloudpy.package
-import cloudpy.config
-import cloudpy.uid
-import events
-
-class HeartbeatClient (cloudpy.connection.ClientConnection):
-
- kill = False
- package = None
- package_id = None
- undeploying = False
-
-class Package (TableBase):
-
- __tablename__ = "packages"
-
- id = Column (Integer, primary_key = True)
- name = Column (String)
- data_pack = Column (String)
- entry_point = Column (String)
- full_id = Column (String)
- requested_workers = Column (Integer)
-
- def __init__ (self, name, data_pack, entry_point,
- full_id, requested_workers):
- self.name = name
- self.data_pack = data_pack
- self.entry_point = entry_point
- self.full_id = full_id
- self.requested_workers = requested_workers
-
- def __repr__ (self):
- return "<Package %s (%s)>" % (self.name, self.full_id)
-
-class Worker (TableBase):
-
- __tablename__ = "workers"
-
- id = Column (Integer, primary_key = True)
- name = Column (String)
- package_id = Column (Integer, ForeignKey ('packages.id'))
- last_update = Column (DateTime)
- package = relation (Package, backref = backref ('workers', order_by = id))
-
- def __init__ (self, name):
- self.name = name
-
- def __repr__ (self):
- return "<Worker %s>" % self.name
-
-class HeartbeatServer (cloudpy.connection.ServerConnection):
-
- db_session = None
-
- def __init__ (self, *args, **kwargs):
- super (HeartbeatServer, self).__init__ (*args, **kwargs)
- engine = create_engine ("sqlite:///heartbeat.db")
- metadata = TableBase.metadata
- metadata.create_all (engine)
- Session.configure (bind = engine)
- self.db_session = Session ()
-
- def update_running (self, host):
- worker = self.db_session.query (Worker).filter_by (name = host).first ()
- if not worker:
- # this IS not right, a running worker should never disappear...
- logging.log (logging.ERROR, "Could not find running worker %s, " \
- + "something wrong must have occured" \
- % host)
- worker = Worker (host)
- self.db_session.add (worker)
- worker.last_update = datetime.now ()
- self.db_session.commit ()
- return not worker.package
-
- def update_idle (self, host):
- worker = self.db_session.query (Worker).filter_by (name = host).first ()
- if not worker: # new worker registering
- logging.log (logging.INFO, "Registering worker %s" % host)
- worker = Worker (host)
- self.db_session.add (worker)
- worker.last_update = datetime.now ()
- worker.package = None
- self.db_session.commit ()
- subq = self.db_session.query (Worker.package_id,
- func.count ('*').label ('workers_count'))
- subq = subq.group_by (Worker.package_id).subquery ()
- q = self.db_session.query (Package, subq.c.workers_count)
- q = q.outerjoin ((subq, Package.id == subq.c.package_id))
- q = q.filter (or_ (subq.c.workers_count == None,
- subq.c.workers_count != Package.requested_workers))
- package, count = q.first ()
- worker.package = package
- self.db_session.commit ()
- return {
- "name": package.name,
- "data_pack": package.data_pack,
- "entry_point": package.entry_point,
- }
-
- def deploy (self, package_dict):
- package_id = cloudpy.uid.generate ()
- logging.log (logging.INFO, "Deploying package %s as %s" \
- % (package_dict["name"], package_id))
- package = Package (package_dict["name"],
- package_dict["data_pack"],
- package_dict["entry_point"],
- package_id,
- package_dict["requested_workers"])
- self.db_session.add (package)
- self.db_session.commit ()
- return package_id
-
- def undeploy (self, package_id):
- logging.log (logging.INFO, "Undeploying package %s" % package_id)
- query = self.db_session.query (Package)
- query = query.filter (Package.full_id == package_id)
- package = query.first ()
- if not package:
- return None
- else:
- self.db_session.delete (package)
- self.db_session.commit ()
- return package.data_pack
-
-def update (running):
- host, port = cloudpy.config.config["heartbeat"].split (":")
- client = HeartbeatClient (host, port)
- ev = events.HeartbeatUpdateEvent (running)
- client.send_event (ev)
- client.run ()
- if running:
- return client.kill
- else:
- return client.package
-
-def deploy (package, requested_workers):
- package = package.to_dict ()
- package["requested_workers"] = requested_workers
- host, port = cloudpy.config.config["heartbeat"].split (":")
- client = HeartbeatClient (host, port)
- ev = events.HeartbeatDeployEvent (package)
- client.send_event (ev)
- client.run ()
- return client.package_id
-
-def undeploy (package_id):
- host, port = cloudpy.config.config["heartbeat"].split (":")
- client = HeartbeatClient (host, port)
- ev = events.HeartbeatUndeployEvent (package_id)
- client.send_event (ev)
- client.run ()
- return client.undeploying
diff --git a/cloudpy/heartbeat/client.py b/cloudpy/heartbeat/client.py
new file mode 100644
index 0000000..110aafb
--- /dev/null
+++ b/cloudpy/heartbeat/client.py
@@ -0,0 +1,37 @@
+import cloudpy.connection
+
+import events
+
+class HeartbeatClient (cloudpy.connection.ClientConnection):
+
+ kill = False
+ package = None
+ package_id = None
+ undeploying = False
+
+def send_event (ev):
+ host, port = cloudpy.config.config["heartbeat"].split (":")
+ client = HeartbeatClient (host, port)
+ client.send_event (ev)
+ client.run ()
+ return client
+
+def update (running):
+ ev = events.HeartbeatUpdateEvent (running)
+ client = send_event (ev)
+ if running:
+ return client.kill
+ else:
+ return client.package
+
+def deploy (package, requested_workers):
+ package = package.to_dict ()
+ package["requested_workers"] = requested_workers
+ ev = events.HeartbeatDeployEvent (package)
+ client = send_event (ev)
+ return client.package_id
+
+def undeploy (package_id):
+ ev = events.HeartbeatUndeployEvent (package_id)
+ client = send_event (ev)
+ return client.undeploying
diff --git a/cloudpy/heartbeat/events.py b/cloudpy/heartbeat/events.py
index fca067f..bb7bfca 100644
--- a/cloudpy/heartbeat/events.py
+++ b/cloudpy/heartbeat/events.py
@@ -1,6 +1,6 @@
import socket
-from cloudpy.event_base import *
+from cloudpy.event import *
class HeartbeatUpdateEvent (BoolEvent):
@@ -21,12 +21,14 @@ class HeartbeatUpdateEvent (BoolEvent):
connection.send_event (sock, HeartbeatPackageEvent (package))
else:
connection.send_event (sock, HeartbeatOkEvent ())
+register_event (HeartbeatUpdateEvent)
class HeartbeatKillEvent (EmptyEvent):
def process (self, connection, sock):
connection.kill = True
connection.close ()
+register_event (HeartbeatKillEvent)
class HeartbeatOkEvent (EmptyEvent):
@@ -34,6 +36,7 @@ class HeartbeatOkEvent (EmptyEvent):
connection.kill = False
connection.package = None
connection.close ()
+register_event (HeartbeatOkEvent)
class HeartbeatPackageEvent (Event):
@@ -52,6 +55,7 @@ class HeartbeatPackageEvent (Event):
def process (self, connection, sock):
connection.package = self.package
connection.close ()
+register_event (HeartbeatPackageEvent)
class HeartbeatDeployEvent (HeartbeatPackageEvent):
@@ -60,6 +64,7 @@ class HeartbeatDeployEvent (HeartbeatPackageEvent):
if not package_id:
package_id = ""
connection.send_event (sock, HeartbeatPackageIdEvent (package_id))
+register_event (HeartbeatDeployEvent)
class HeartbeatPackageIdEvent (StringEvent):
@@ -71,6 +76,7 @@ class HeartbeatPackageIdEvent (StringEvent):
else:
connection.package_id = self.value
connection.close ()
+register_event (HeartbeatPackageIdEvent)
class HeartbeatUndeployEvent (StringEvent):
@@ -81,6 +87,7 @@ class HeartbeatUndeployEvent (StringEvent):
if not data_pack:
data_pack = ""
connection.send_event (sock, HeartbeatUndeployDataPackEvent (data_pack))
+register_event (HeartbeatUndeployEvent)
class HeartbeatUndeployDataPackEvent (StringEvent):
@@ -89,3 +96,4 @@ class HeartbeatUndeployDataPackEvent (StringEvent):
def process (self, connection, sock):
connection.undeploy_data_pack = self.value
connection.close ()
+register_event (HeartbeatUndeployDataPackEvent)
diff --git a/cloudpy/heartbeat/server.py b/cloudpy/heartbeat/server.py
new file mode 100644
index 0000000..c59eb13
--- /dev/null
+++ b/cloudpy/heartbeat/server.py
@@ -0,0 +1,132 @@
+import logging
+from datetime import datetime
+
+from sqlalchemy import create_engine, Table, Column, \
+ Integer, String, DateTime, MetaData, ForeignKey, \
+ or_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relation, backref, sessionmaker
+from sqlalchemy.sql import func
+
+TableBase = declarative_base ()
+Session = sessionmaker ()
+
+import cloudpy.connection
+import cloudpy.package
+import cloudpy.config
+import cloudpy.uid
+
+import cloudpy.heartbeat.events
+
+class Package (TableBase):
+
+ __tablename__ = "packages"
+
+ id = Column (Integer, primary_key = True)
+ name = Column (String)
+ data_pack = Column (String)
+ entry_point = Column (String)
+ full_id = Column (String)
+ requested_workers = Column (Integer)
+
+ def __init__ (self, name, data_pack, entry_point,
+ full_id, requested_workers):
+ self.name = name
+ self.data_pack = data_pack
+ self.entry_point = entry_point
+ self.full_id = full_id
+ self.requested_workers = requested_workers
+
+ def __repr__ (self):
+ return "<Package %s (%s)>" % (self.name, self.full_id)
+
+class Worker (TableBase):
+
+ __tablename__ = "workers"
+
+ id = Column (Integer, primary_key = True)
+ name = Column (String)
+ package_id = Column (Integer, ForeignKey ('packages.id'))
+ last_update = Column (DateTime)
+ package = relation (Package, backref = backref ('workers', order_by = id))
+
+ def __init__ (self, name):
+ self.name = name
+
+ def __repr__ (self):
+ return "<Worker %s>" % self.name
+
+class HeartbeatServer (cloudpy.connection.ServerConnection):
+
+ db_session = None
+
+ def __init__ (self, *args, **kwargs):
+ super (HeartbeatServer, self).__init__ (*args, **kwargs)
+ engine = create_engine ("sqlite:///heartbeat.db")
+ metadata = TableBase.metadata
+ metadata.create_all (engine)
+ Session.configure (bind = engine)
+ self.db_session = Session ()
+
+ def update_running (self, host):
+ worker = self.db_session.query (Worker).filter_by (name = host).first ()
+ if not worker:
+ # this IS not right, a running worker should never disappear...
+ logging.log (logging.ERROR, "Could not find running worker %s, " \
+ + "something wrong must have occured" \
+ % host)
+ worker = Worker (host)
+ self.db_session.add (worker)
+ worker.last_update = datetime.now ()
+ self.db_session.commit ()
+ return not worker.package
+
+ def update_idle (self, host):
+ worker = self.db_session.query (Worker).filter_by (name = host).first ()
+ if not worker: # new worker registering
+ logging.log (logging.INFO, "Registering worker %s" % host)
+ worker = Worker (host)
+ self.db_session.add (worker)
+ worker.last_update = datetime.now ()
+ worker.package = None
+ self.db_session.commit ()
+ subq = self.db_session.query (Worker.package_id,
+ func.count ('*').label ('workers_count'))
+ subq = subq.group_by (Worker.package_id).subquery ()
+ q = self.db_session.query (Package, subq.c.workers_count)
+ q = q.outerjoin ((subq, Package.id == subq.c.package_id))
+ q = q.filter (or_ (subq.c.workers_count == None,
+ subq.c.workers_count != Package.requested_workers))
+ package, count = q.first ()
+ worker.package = package
+ self.db_session.commit ()
+ return {
+ "name": package.name,
+ "data_pack": package.data_pack,
+ "entry_point": package.entry_point,
+ }
+
+ def deploy (self, package_dict):
+ package_id = cloudpy.uid.generate ()
+ logging.log (logging.INFO, "Deploying package %s as %s" \
+ % (package_dict["name"], package_id))
+ package = Package (package_dict["name"],
+ package_dict["data_pack"],
+ package_dict["entry_point"],
+ package_id,
+ package_dict["requested_workers"])
+ self.db_session.add (package)
+ self.db_session.commit ()
+ return package_id
+
+ def undeploy (self, package_id):
+ logging.log (logging.INFO, "Undeploying package %s" % package_id)
+ query = self.db_session.query (Package)
+ query = query.filter (Package.full_id == package_id)
+ package = query.first ()
+ if not package:
+ return None
+ else:
+ self.db_session.delete (package)
+ self.db_session.commit ()
+ return package.data_pack
diff --git a/cloudpy/item.py b/cloudpy/item.py
new file mode 100644
index 0000000..7c5fef4
--- /dev/null
+++ b/cloudpy/item.py
@@ -0,0 +1,27 @@
+class Item (object):
+
+ pack = None
+ name = None
+ metadata = None
+ data = None
+
+ def __init__ (self, pack, name, metadata, data):
+ self.pack = pack
+ self.name = name
+ self.metadata = metadata
+ self.data = data
+
+ def serialize (self):
+ return [self.metadata, self.data]
+
+ @staticmethod
+ def unserialize (pack, item, raw_item):
+ return Item (pack, item, raw_item[0], raw_item[1])
+
+def to_dict (items):
+ return dict ([(item.name, item.serialize ())
+ for item in items])
+
+def from_dict (pack, items):
+ return [Item.unserialize (pack, item, items[item])
+ for item in items]
diff --git a/cloudpy/package.py b/cloudpy/package.py
index de19f22..179a47a 100644
--- a/cloudpy/package.py
+++ b/cloudpy/package.py
@@ -26,6 +26,18 @@
'''
import signal
+import os
+import sys
+import base64
+
+import cloudpy.data.client
+
+def recursive_unlink (base):
+ for path, dirs, files in os.walk (base, False):
+ for file in files:
+ os.unlink (os.path.join (path, file))
+ for directory in dirs:
+ os.rmdir (os.path.join (path, directory))
class Package (object):
@@ -33,22 +45,43 @@ class Package (object):
data_pack = ""
entry_point = ""
+ package_dir = ""
+
def __init__ (self, name, data_pack, entry_point):
self.name = name
self.data_pack = data_pack
self.entry_point = entry_point
+ # setup signal handler
+ signal.signal (signal.SIGTERM, self.undeploy)
def deploy (self):
+ if not os.path.isdir (self.name):
+ os.mkdir (self.name)
+ os.chdir (self.name)
+ self.package_dir = os.getcwd ()
+ sys.path = [self.package_dir] + sys.path
if self.data_pack:
# fetch and unpack data pack
- pass
+ items = cloudpy.data.client.get_pack (self.data_pack)
+ for item in items:
+ dirname = os.path.dirname (item.metadata)
+ if dirname and not os.path.isdir (dirname):
+ os.makedirs (dirname)
+ with open (item.metadata, "w") as f:
+ f.write (base64.b64decode (item.data))
def run (self):
- print "-- test --"
+ entry_module, entry_function = self.entry_point.rsplit (".", 1)
+ try:
+ entry_module = __import__ (entry_module)
+ entry_function = getattr (entry_module, entry_function)
+ except Exception, e:
+ print "Entry point %s does not exist : %s" % (self.entry_point, e)
+ raise SystemExit
+ entry_function ()
- def undeploy (self):
- # FIXME : catch term signal
- pass
+ def undeploy (self, sig = None, stack = None):
+ recursive_unlink (self.package_dir)
def to_dict (self):
return {
diff --git a/data.py b/data.py
new file mode 100644
index 0000000..a8e86cd
--- /dev/null
+++ b/data.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+
+import cloudpy.config
+import cloudpy.data.server
+
+if __name__ == "__main__":
+ cloudpy.config.load_server_config ()
+ host, port = cloudpy.config.config["data"].split (":")
+ print "Running Cloudpy Data server on %s:%s" % (host, port)
+ server = cloudpy.data.server.DataServer (host, port)
+ try:
+ server.run ()
+ except KeyboardInterrupt:
+ print "Shutting down Data server"
diff --git a/deploy.py b/deploy.py
index 35c1313..bf89afb 100644
--- a/deploy.py
+++ b/deploy.py
@@ -3,10 +3,14 @@
import os
import sys
import yaml
+import base64
from cloudpy.package import Package
-import cloudpy.heartbeat
+import cloudpy.heartbeat.client
+import cloudpy.data.client
import cloudpy.config
+import cloudpy.uid
+import cloudpy.item
if __name__ == "__main__":
if len (sys.argv) != 3:
@@ -21,9 +25,17 @@ if __name__ == "__main__":
with open (os.path.join (package_path, "package.yml")) as f:
config = yaml.load (f)
files = config["files"]
- # FIXME : create data_pack
- package = Package (config["name"], "", config["entry_point"])
- package_id = cloudpy.heartbeat.deploy (package, requested_workers)
+ items = []
+ for file in files:
+ path = os.path.join (package_path, file)
+ data = base64.b64encode (open (path).read ())
+ uid = cloudpy.uid.generate ()
+ item = cloudpy.item.Item ("", uid, file, data)
+ items.append (item)
+ pack = cloudpy.data.client.create_pack ()
+ cloudpy.data.client.put_pack (pack, items)
+ package = Package (config["name"], pack, config["entry_point"])
+ package_id = cloudpy.heartbeat.client.deploy (package, requested_workers)
if package_id:
print "Package is being deployed as", package_id
else:
diff --git a/heartbeat.py b/heartbeat.py
index 0b5ad9a..5e4a318 100644
--- a/heartbeat.py
+++ b/heartbeat.py
@@ -1,13 +1,13 @@
#!/usr/bin/env python
import cloudpy.config
-import cloudpy.heartbeat
+import cloudpy.heartbeat.server
if __name__ == "__main__":
cloudpy.config.load_server_config ()
host, port = cloudpy.config.config["heartbeat"].split (":")
print "Running Cloudpy Heartbeat server on %s:%s" % (host, port)
- server = cloudpy.heartbeat.HeartbeatServer (host, port)
+ server = cloudpy.heartbeat.server.HeartbeatServer (host, port)
try:
server.run ()
except KeyboardInterrupt:
diff --git a/setup-workers.py b/setup-workers.py
index a7fcb36..c6d35c2 100644
--- a/setup-workers.py
+++ b/setup-workers.py
@@ -37,7 +37,8 @@ CORE_FILES = [
"worker_config.yml",
] \
+ glob.glob ("cloudpy/*.py") \
- + glob.glob ("cloudpy/heartbeat/*.py")
+ + glob.glob ("cloudpy/heartbeat/*.py") \
+ + glob.glob ("cloudpy/data/*.py")
# FIXME : remove this glob
def upload_files (ssh, path):
diff --git a/undeploy.py b/undeploy.py
index 609e5b3..bc10420 100644
--- a/undeploy.py
+++ b/undeploy.py
@@ -3,7 +3,7 @@
import sys
import cloudpy.config
-import cloudpy.heartbeat
+import cloudpy.heartbeat.client
if __name__ == "__main__":
if len (sys.argv) != 2:
@@ -11,7 +11,7 @@ if __name__ == "__main__":
sys.exit (2)
package_id = sys.argv[1]
cloudpy.config.load_server_config ()
- if cloudpy.heartbeat.undeploy (package_id):
+ if cloudpy.heartbeat.client.undeploy (package_id):
print "Package is being undeployed."
else:
print "Could not undeploy package (it probably wasn't deployed)."
diff --git a/worker.py b/worker.py
index c471deb..6ef23e1 100644
--- a/worker.py
+++ b/worker.py
@@ -33,7 +33,7 @@ import signal
import yaml
import cloudpy.config
-import cloudpy.heartbeat
+import cloudpy.heartbeat.client
import cloudpy.package
if os.path.dirname (__file__):
@@ -56,14 +56,14 @@ if os.path.exists ("pid"):
package = None
if running:
- kill = cloudpy.heartbeat.update (running = True)
+ kill = cloudpy.heartbeat.client.update (running = True)
if kill: # kill the worker
- os.kill (int (pid), signal.SIGKILL)
+ os.kill (int (pid), signal.SIGTERM)
running = False
fcntl.lockf (lock_f, fcntl.LOCK_UN)
pid_f.close ()
if not running:
- package = cloudpy.heartbeat.update (running = False)
+ package = cloudpy.heartbeat.client.update (running = False)
if package: # we are going to run a package : set pid file ...
with open ("pid", "w") as pid_f:
pid_f.write (str (os.getpid ()))
@@ -79,3 +79,5 @@ if not package:
package.deploy ()
# run package
package.run ()
+# undeploy package
+package.undeploy ()