summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuillaume Seguin <guillaume@segu.in>2009-11-16 00:55:08 +0100
committerGuillaume Seguin <guillaume@segu.in>2009-11-16 00:55:08 +0100
commit5626e1bdbfccb0671c969eff1d6232d28ce0380b (patch)
tree90d102439b7432747104a0acdbc1d6a96a9b2e6e
downloadcloudpy-5626e1bdbfccb0671c969eff1d6232d28ce0380b.tar.gz
cloudpy-5626e1bdbfccb0671c969eff1d6232d28ce0380b.tar.bz2
Initial import (only heartbeat is functionnal for now)
-rw-r--r--.gitignore3
-rw-r--r--cloudpy/__init__.py0
-rw-r--r--cloudpy/config.py25
-rw-r--r--cloudpy/connection.py214
-rw-r--r--cloudpy/data.py0
-rw-r--r--cloudpy/database.py0
-rw-r--r--cloudpy/event.py95
-rw-r--r--cloudpy/event_base.py47
-rw-r--r--cloudpy/heartbeat/__init__.py167
-rw-r--r--cloudpy/heartbeat/events.py91
-rw-r--r--cloudpy/helpers.py0
-rw-r--r--cloudpy/package.py62
-rw-r--r--cloudpy/task.py0
-rw-r--r--cloudpy/uid.py4
-rw-r--r--cloudpy/workqueue.py0
-rw-r--r--deploy.py30
-rw-r--r--heartbeat.py14
-rw-r--r--setup-workers.py115
-rw-r--r--undeploy.py17
-rw-r--r--worker.py81
20 files changed, 965 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d2d414b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+*.swp
+*.swo
+*.pyc
diff --git a/cloudpy/__init__.py b/cloudpy/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cloudpy/__init__.py
diff --git a/cloudpy/config.py b/cloudpy/config.py
new file mode 100644
index 0000000..1ea3c09
--- /dev/null
+++ b/cloudpy/config.py
@@ -0,0 +1,25 @@
+import yaml
+
+config = None
+config_file = "config.yml"
+worker_config_file = "worker_config.yml"
+
+def load_config (config_file = config_file):
+ with open (config_file) as f:
+ config = yaml.load (f)
+ return config
+
+def save_config (config, config_file = config_file):
+ with open (config_file, "w") as f:
+ yaml.dump (config, f)
+
+def load_worker_config (config_file = worker_config_file):
+ global config
+ config = load_config (config_file)
+
+def load_server_config (config_file = config_file):
+ global config
+ config = load_config (config_file)
+
+def save_worker_config (config, config_file = worker_config_file):
+ save_config (config, config_file)
diff --git a/cloudpy/connection.py b/cloudpy/connection.py
new file mode 100644
index 0000000..7090b73
--- /dev/null
+++ b/cloudpy/connection.py
@@ -0,0 +1,214 @@
+import socket
+import select
+import threading
+import logging
+
+from errno import EWOULDBLOCK
+
+import event
+
+class Parser (object):
+
+ _buffer = None
+ _separator = None
+
+ def __init__ (self, separator = "\n"):
+ self._buffer = ""
+ self._separator = separator
+
+ def parse (self, data):
+ data = self._buffer + data
+ bits = data.split (self._separator)
+ self._buffer = bits[-1]
+ return bits[:-1]
+
+ def prepare (self, data):
+ return data + self._separator
+
+class Connection (object):
+
+ _socket = None
+
+ _host = None
+ _port = None
+ _buffer_size = None
+
+ def __init__ (self, host, port, buffer_size = 4096):
+ self._host = host
+ self._port = int (port)
+ self._buffer_size = buffer_size
+ self._socket = socket.socket ()
+ self.prepare ()
+
+ def prepare (self):
+ raise NotImplementedError
+
+ def run_thread (self):
+ thread = threading.Thread ()
+ thread.run = self.run
+ thread.start ()
+
+ def run (self):
+ raise NotImplementedError
+
+ def close (self):
+ logging.log (logging.INFO, "Closing Connection %d" % id (self))
+ self._socket.close ()
+
+ def on_receive (self, sock, data):
+ raise NotImplementedError
+
+ def on_connection_lost (self, sock):
+ raise NotImplementedError
+
+ def __delete__ (self):
+ self.close ()
+
+class ClientConnection (Connection):
+
+ _parser = None
+
+ def __init__ (self, *args, **kwargs):
+ super (ClientConnection, self).__init__ (*args, **kwargs)
+ self._parser = Parser ()
+
+ def send (self, data):
+ self._socket.send (data)
+
+ def send_event (self, ev):
+ ev_desc = ev.describe ()
+ logging.log (logging.INFO, "Sending event to server : %s" \
+ % (ev_desc,))
+ self.send (self._parser.prepare (ev_desc))
+
+ def prepare (self):
+ self._socket.connect ((self._host, self._port))
+ self._socket.settimeout (1)
+ self.on_connected ()
+
+ def run (self):
+ while True:
+ try:
+ data = self._socket.recv (self._buffer_size)
+ if not data:
+ break
+ self.on_receive (self._socket, data)
+ except socket.error, se:
+ if se.args[0] == "timed out":
+ pass
+ else:
+ break
+ self.on_connection_lost (self._socket)
+
+ def on_receive (self, sock, data):
+ logging.log (logging.DEBUG, "Got data from server : %s" \
+ % (data.replace ("\n", "#"),))
+ bits = self._parser.parse (data)
+ for bit in bits:
+ ev = event.EventSerializer.load (bit)
+ logging.log (logging.DEBUG, "Processing %s from server" \
+ % (ev.__class__.__name__,))
+ ev.process (self, sock)
+
+ def on_connection_lost (self, sock):
+ logging.log (logging.INFO, "Lost connection to server")
+
+ def on_connected (self):
+ logging.log (logging.INFO, "Connected to server")
+
+class ServerConnection (Connection):
+
+ _parsers = None
+ _addresses = None
+
+ _client_sockets = None
+ _select_timeout = 0.1
+ _stop = False
+
+ def __init__ (self, *args, **kwargs):
+ super (ServerConnection, self).__init__ (*args, **kwargs)
+ self._parsers = {}
+ self._addresses = {}
+
+ def close (self):
+ logging.log (logging.INFO, "Closing ServerConnection %d" % id (self))
+ self._stop = True
+ for sock in self._client_sockets:
+ sock.close ()
+ self._socket.close ()
+
+ def broadcast (self, data):
+ for sock in self._client_sockets:
+ sock.send (data)
+
+ def send_event (self, sock, ev):
+ ev_desc = ev.describe ()
+ logging.log (logging.INFO, "Sending event to %s:%d : %s" \
+ % (self._addresses[sock] + (ev_desc,)))
+ sock.send (self._parsers[sock].prepare (ev_desc))
+
+ def prepare (self):
+ self._socket.setsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self._socket.bind ((self._host, self._port))
+ logging.log (logging.INFO, "Server bound to %s:%d" \
+ % (self._host, self._port))
+ self._stop = False
+
+ def run (self):
+ self._client_sockets = []
+ self._socket.listen (1)
+ while not self._stop:
+ readables, _, _ = select.select ([self._socket] + self._client_sockets, [], [],
+ self._select_timeout)
+ for sock in readables:
+ if sock == self._socket:
+ sock, address = self._socket.accept ()
+ logging.log (logging.INFO, "Accepted client connection from %s"
+ % (address,))
+ self.on_accept (sock)
+ sock.setblocking (0)
+ self._client_sockets.append (sock)
+ else:
+ self.recv_socket (sock)
+
+ def recv_socket (self, sock):
+ while True:
+ try:
+ data = sock.recv (self._buffer_size)
+ if not data:
+ break
+ self.on_receive (sock, data)
+ except socket.error, se:
+ if se.args[0] == EWOULDBLOCK:
+ return
+ else:
+ break
+ self._client_sockets.remove (sock)
+ self.on_connection_lost (sock)
+
+ def get_address (self, sock):
+ return self._addresses[sock]
+
+ def on_receive (self, sock, data):
+ logging.log (logging.DEBUG, "Got data from %s:%d : %s" \
+ % (self._addresses[sock] + \
+ (data.replace ("\n", "#"),)))
+ bits = self._parsers[sock].parse (data)
+ for bit in bits:
+ ev = event.EventSerializer.load (bit)
+ logging.log (logging.DEBUG, "Processing %s from %s:%d" \
+ % ((ev.__class__.__name__,) \
+ + self._addresses[sock]))
+ ev.process (self, sock)
+
+ def on_accept (self, sock):
+ self._parsers[sock] = Parser ()
+ self._addresses[sock] = sock.getpeername ()
+ logging.log (logging.INFO, "New connection from %s:%d" \
+ % self._addresses[sock])
+
+ def on_connection_lost (self, sock):
+ logging.log (logging.INFO, "Lost connection to %s:%d" \
+ % self._addresses[sock])
+ del self._addresses[sock]
+ del self._parsers[sock]
diff --git a/cloudpy/data.py b/cloudpy/data.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cloudpy/data.py
diff --git a/cloudpy/database.py b/cloudpy/database.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cloudpy/database.py
diff --git a/cloudpy/event.py b/cloudpy/event.py
new file mode 100644
index 0000000..d8cb29b
--- /dev/null
+++ b/cloudpy/event.py
@@ -0,0 +1,95 @@
+import base64
+import yaml
+
+class ValueCoder (object):
+
+ @staticmethod
+ def encode (value, type):
+ try:
+ if type == "int":
+ return "%d" % value
+ elif type == "bool":
+ return "1" if value else "0"
+ elif type == "string":
+ return value
+ elif type == "yaml":
+ return yaml.dump (value)
+ elif type.startswith ("base64"):
+ type2 = type.split ("-", 1)[1]
+ return base64.b64encode (ValueCoder.encode (value, type2))
+ elif type.startswith ("list"):
+ type2 = type.split ("_", 1)[1]
+ return ",".join ([ValueCoder.encode (value2, type2)
+ for value2 in value])
+ elif type.startswith ("dict"):
+ type2, type3 = type.split ("_", 2)[1:]
+ return ",".join (["%s:%s" % (ValueCoder.encode (value2, type2),
+ ValueCoder.encode (value3, type3))
+ for (value2, value3) in value.items ()])
+ else:
+ raise ValueError, "Unknown type : " + type
+ except Exception, e:
+ raise ValueError, "Couldn't encode value %s of type %s : %s" \
+ % (value, type, e)
+
+ @staticmethod
+ def decode (value, type):
+ try:
+ if type == "int":
+ return int (value)
+ elif type == "bool":
+ return value == "1"
+ elif type == "string":
+ return value
+ elif type == "yaml":
+ return yaml.load (value)
+ elif type.startswith ("base64"):
+ type2 = type.split ("-", 1)[1]
+ return ValueCoder.decode (base64.b64decode (value), type2)
+ elif type.startswith ("list"):
+ type2 = type.split ("_", 1)[1]
+ return [ValueCoder.decode (value2, type2)
+ for value2 in value.split (",")]
+ elif type.startswith ("dict"):
+ type2, type3 = type.split ("_", 2)[1:]
+ return dict ([(ValueCoder.decode (value2, type2),
+ ValueCoder.decode (value3, type3))
+ for (value2, value3) in
+ [tmp.split (":")
+ for tmp in value.split (",")]])
+ else:
+ raise ValueError, "Unknown type : " + type
+ except Exception, e:
+ raise ValueError, "Couldn't decode value %s as type %s : %s" \
+ % (value, type, e)
+
+class EventSerializer (object):
+
+ @staticmethod
+ def load (data):
+ bits = data.split (" ")
+ class_name = bits[0]
+ try:
+ cls = globals ()[class_name]
+ assert issubclass (cls, Event)
+ except (AssertionError, KeyError):
+ raise ValueError, "Unknown event : " + (data)
+ args_type = cls.describe_args ()
+ args = bits[1:]
+ if len (args) != len (args_type):
+ raise ValueError, "Wrong number of arguments for event : " + data
+ args = [ValueCoder.decode (args[i], args_type[i])
+ for i in range (len (args))]
+ return cls (*args)
+
+ @staticmethod
+ def dump (event, *args):
+ args_type = event.describe_args ()
+ if len (args) != len (args_type):
+ raise ValueError, "Wrong number of arguments for event : " + data
+ args = [ValueCoder.encode (args[i], args_type[i])
+ for i in range (len (args))]
+ data = " ".join ([event.__class__.__name__] + args)
+ return data
+
+from heartbeat.events import *
diff --git a/cloudpy/event_base.py b/cloudpy/event_base.py
new file mode 100644
index 0000000..b3d5bac
--- /dev/null
+++ b/cloudpy/event_base.py
@@ -0,0 +1,47 @@
+from event import EventSerializer
+
+class Event (object):
+
+ def process (self, context):
+ raise NotImplementedError
+
+ def describe (self):
+ return EventSerializer.dump (self, *self.get_args ())
+
+ def get_args (self):
+ raise NotImplementedError
+
+ @staticmethod
+ def describe_args ():
+ raise NotImplementedError
+
+class EmptyEvent (Event):
+
+ def get_args (self):
+ return []
+
+ @staticmethod
+ def describe_args ():
+ return []
+
+class SingleValueEvent (Event):
+
+ value = None
+
+ def __init__ (self, value):
+ self.value = value
+
+ def get_args (self):
+ return [self.value]
+
+class BoolEvent (SingleValueEvent):
+
+ @staticmethod
+ def describe_args ():
+ return ["bool"]
+
+class StringEvent (SingleValueEvent):
+
+ @staticmethod
+ def describe_args ():
+ return ["string"]
diff --git a/cloudpy/heartbeat/__init__.py b/cloudpy/heartbeat/__init__.py
new file mode 100644
index 0000000..b8dbf77
--- /dev/null
+++ b/cloudpy/heartbeat/__init__.py
@@ -0,0 +1,167 @@
+import logging
+from datetime import datetime
+
+from sqlalchemy import create_engine, Table, Column, \
+ Integer, String, DateTime, MetaData, ForeignKey, \
+ or_
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relation, backref, sessionmaker
+from sqlalchemy.sql import func
+
+TableBase = declarative_base ()
+Session = sessionmaker ()
+
+import cloudpy.connection
+import cloudpy.package
+import cloudpy.config
+import cloudpy.uid
+import events
+
+class HeartbeatClient (cloudpy.connection.ClientConnection):
+
+ kill = False
+ package = None
+ package_id = None
+ undeploying = False
+
+class Package (TableBase):
+
+ __tablename__ = "packages"
+
+ id = Column (Integer, primary_key = True)
+ name = Column (String)
+ data_pack = Column (String)
+ entry_point = Column (String)
+ full_id = Column (String)
+ requested_workers = Column (Integer)
+
+ def __init__ (self, name, data_pack, entry_point,
+ full_id, requested_workers):
+ self.name = name
+ self.data_pack = data_pack
+ self.entry_point = entry_point
+ self.full_id = full_id
+ self.requested_workers = requested_workers
+
+ def __repr__ (self):
+ return "<Package %s (%s)>" % (self.name, self.full_id)
+
+class Worker (TableBase):
+
+ __tablename__ = "workers"
+
+ id = Column (Integer, primary_key = True)
+ name = Column (String)
+ package_id = Column (Integer, ForeignKey ('packages.id'))
+ last_update = Column (DateTime)
+ package = relation (Package, backref = backref ('workers', order_by = id))
+
+ def __init__ (self, name):
+ self.name = name
+
+ def __repr__ (self):
+ return "<Worker %s>" % self.name
+
+class HeartbeatServer (cloudpy.connection.ServerConnection):
+
+ db_session = None
+
+ def __init__ (self, *args, **kwargs):
+ super (HeartbeatServer, self).__init__ (*args, **kwargs)
+ engine = create_engine ("sqlite:///heartbeat.db")
+ metadata = TableBase.metadata
+ metadata.create_all (engine)
+ Session.configure (bind = engine)
+ self.db_session = Session ()
+
+ def update_running (self, host):
+ worker = self.db_session.query (Worker).filter_by (name = host).first ()
+ if not worker:
+ # this IS not right, a running worker should never disappear...
+ logging.log (logging.ERROR, "Could not find running worker %s, " \
+ + "something wrong must have occured" \
+ % host)
+ worker = Worker (host)
+ self.db_session.add (worker)
+ worker.last_update = datetime.now ()
+ self.db_session.commit ()
+ return not worker.package
+
+ def update_idle (self, host):
+ worker = self.db_session.query (Worker).filter_by (name = host).first ()
+ if not worker: # new worker registering
+ logging.log (logging.INFO, "Registering worker %s" % host)
+ worker = Worker (host)
+ self.db_session.add (worker)
+ worker.last_update = datetime.now ()
+ worker.package = None
+ self.db_session.commit ()
+ subq = self.db_session.query (Worker.package_id,
+ func.count ('*').label ('workers_count'))
+ subq = subq.group_by (Worker.package_id).subquery ()
+ q = self.db_session.query (Package, subq.c.workers_count)
+ q = q.outerjoin ((subq, Package.id == subq.c.package_id))
+ q = q.filter (or_ (subq.c.workers_count == None,
+ subq.c.workers_count != Package.requested_workers))
+ package, count = q.first ()
+ worker.package = package
+ self.db_session.commit ()
+ return {
+ "name": package.name,
+ "data_pack": package.data_pack,
+ "entry_point": package.entry_point,
+ }
+
+ def deploy (self, package_dict):
+ package_id = cloudpy.uid.generate ()
+ logging.log (logging.INFO, "Deploying package %s as %s" \
+ % (package_dict["name"], package_id))
+ package = Package (package_dict["name"],
+ package_dict["data_pack"],
+ package_dict["entry_point"],
+ package_id,
+ package_dict["requested_workers"])
+ self.db_session.add (package)
+ self.db_session.commit ()
+ return package_id
+
+ def undeploy (self, package_id):
+ logging.log (logging.INFO, "Undeploying package %s" % package_id)
+ query = self.db_session.query (Package)
+ query = query.filter (Package.full_id == package_id)
+ package = query.first ()
+ if not package:
+ return None
+ else:
+ self.db_session.delete (package)
+ self.db_session.commit ()
+ return package.data_pack
+
+def update (running):
+ host, port = cloudpy.config.config["heartbeat"].split (":")
+ client = HeartbeatClient (host, port)
+ ev = events.HeartbeatUpdateEvent (running)
+ client.send_event (ev)
+ client.run ()
+ if running:
+ return client.kill
+ else:
+ return client.package
+
+def deploy (package, requested_workers):
+ package = package.to_dict ()
+ package["requested_workers"] = requested_workers
+ host, port = cloudpy.config.config["heartbeat"].split (":")
+ client = HeartbeatClient (host, port)
+ ev = events.HeartbeatDeployEvent (package)
+ client.send_event (ev)
+ client.run ()
+ return client.package_id
+
+def undeploy (package_id):
+ host, port = cloudpy.config.config["heartbeat"].split (":")
+ client = HeartbeatClient (host, port)
+ ev = events.HeartbeatUndeployEvent (package_id)
+ client.send_event (ev)
+ client.run ()
+ return client.undeploying
diff --git a/cloudpy/heartbeat/events.py b/cloudpy/heartbeat/events.py
new file mode 100644
index 0000000..fca067f
--- /dev/null
+++ b/cloudpy/heartbeat/events.py
@@ -0,0 +1,91 @@
+import socket
+
+from cloudpy.event_base import *
+
+class HeartbeatUpdateEvent (BoolEvent):
+
+ # value == running
+
+ def process (self, connection, sock):
+ address = connection.get_address (sock)[0]
+ remote = socket.getfqdn (address)
+ if self.value:
+ kill = connection.update_running (remote)
+ if kill:
+ connection.send_event (sock, HeartbeatKillEvent ())
+ else:
+ connection.send_event (sock, HeartbeatOkEvent ())
+ else:
+ package = connection.update_idle (remote)
+ if package:
+ connection.send_event (sock, HeartbeatPackageEvent (package))
+ else:
+ connection.send_event (sock, HeartbeatOkEvent ())
+
+class HeartbeatKillEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ connection.kill = True
+ connection.close ()
+
+class HeartbeatOkEvent (EmptyEvent):
+
+ def process (self, connection, sock):
+ connection.kill = False
+ connection.package = None
+ connection.close ()
+
+class HeartbeatPackageEvent (Event):
+
+ package = None
+
+ def __init__ (self, package):
+ self.package = package
+
+ def get_args (self):
+ return [self.package]
+
+ @staticmethod
+ def describe_args ():
+ return ["base64-yaml"]
+
+ def process (self, connection, sock):
+ connection.package = self.package
+ connection.close ()
+
+class HeartbeatDeployEvent (HeartbeatPackageEvent):
+
+ def process (self, connection, sock):
+ package_id = connection.deploy (self.package)
+ if not package_id:
+ package_id = ""
+ connection.send_event (sock, HeartbeatPackageIdEvent (package_id))
+
+class HeartbeatPackageIdEvent (StringEvent):
+
+ # value == package_id
+
+ def process (self, connection, sock):
+ if not self.value:
+ connection.package_id = None
+ else:
+ connection.package_id = self.value
+ connection.close ()
+
+class HeartbeatUndeployEvent (StringEvent):
+
+ # value == package_id
+
+ def process (self, connection, sock):
+ data_pack = connection.undeploy (self.value)
+ if not data_pack:
+ data_pack = ""
+ connection.send_event (sock, HeartbeatUndeployDataPackEvent (data_pack))
+
+class HeartbeatUndeployDataPackEvent (StringEvent):
+
+ # value == data_pack
+
+ def process (self, connection, sock):
+ connection.undeploy_data_pack = self.value
+ connection.close ()
diff --git a/cloudpy/helpers.py b/cloudpy/helpers.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cloudpy/helpers.py
diff --git a/cloudpy/package.py b/cloudpy/package.py
new file mode 100644
index 0000000..de19f22
--- /dev/null
+++ b/cloudpy/package.py
@@ -0,0 +1,62 @@
+# coding=utf-8
+
+'''
+ cloudpy
+ Author : Guillaume "iXce" Seguin
+ Email : guillaume@segu.in (or guillaume.seguin@ens.fr)
+
+ # Simple Cloud Computing system #
+
+ Copyright (C) 2009 Guillaume Seguin
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; either version 2
+ of the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+'''
+
+import signal
+
+class Package (object):
+
+ name = ""
+ data_pack = ""
+ entry_point = ""
+
+ def __init__ (self, name, data_pack, entry_point):
+ self.name = name
+ self.data_pack = data_pack
+ self.entry_point = entry_point
+
+ def deploy (self):
+ if self.data_pack:
+ # fetch and unpack data pack
+ pass
+
+ def run (self):
+ print "-- test --"
+
+ def undeploy (self):
+ # FIXME : catch term signal
+ pass
+
+ def to_dict (self):
+ return {
+ "name": self.name,
+ "data_pack": self.data_pack,
+ "entry_point": self.entry_point,
+ }
+
+ @staticmethod
+ def from_dict (dict):
+ return Package (dict["name"], dict["data_pack"], dict["entry_point"])
diff --git a/cloudpy/task.py b/cloudpy/task.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cloudpy/task.py
diff --git a/cloudpy/uid.py b/cloudpy/uid.py
new file mode 100644
index 0000000..a1b9bd2
--- /dev/null
+++ b/cloudpy/uid.py
@@ -0,0 +1,4 @@
+import uuid
+
+def generate ():
+ return str (uuid.uuid1 ())
diff --git a/cloudpy/workqueue.py b/cloudpy/workqueue.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cloudpy/workqueue.py
diff --git a/deploy.py b/deploy.py
new file mode 100644
index 0000000..35c1313
--- /dev/null
+++ b/deploy.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import yaml
+
+from cloudpy.package import Package
+import cloudpy.heartbeat
+import cloudpy.config
+
+if __name__ == "__main__":
+ if len (sys.argv) != 3:
+ print "Usage: %s PACKAGE_PATH REQUESTED_WORKERS" % sys.argv[0]
+ sys.exit (2)
+ package_path = sys.argv[1]
+ if not os.path.isdir (package_path):
+ print "Wrong package path", package_path
+ sys.exit (1)
+ requested_workers = int (sys.argv[2])
+ cloudpy.config.load_server_config ()
+ with open (os.path.join (package_path, "package.yml")) as f:
+ config = yaml.load (f)
+ files = config["files"]
+ # FIXME : create data_pack
+ package = Package (config["name"], "", config["entry_point"])
+ package_id = cloudpy.heartbeat.deploy (package, requested_workers)
+ if package_id:
+ print "Package is being deployed as", package_id
+ else:
+ print "Could not deploy package"
diff --git a/heartbeat.py b/heartbeat.py
new file mode 100644
index 0000000..0b5ad9a
--- /dev/null
+++ b/heartbeat.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+
+import cloudpy.config
+import cloudpy.heartbeat
+
+if __name__ == "__main__":
+ cloudpy.config.load_server_config ()
+ host, port = cloudpy.config.config["heartbeat"].split (":")
+ print "Running Cloudpy Heartbeat server on %s:%s" % (host, port)
+ server = cloudpy.heartbeat.HeartbeatServer (host, port)
+ try:
+ server.run ()
+ except KeyboardInterrupt:
+ print "Shutting down Heartbeat server"
diff --git a/setup-workers.py b/setup-workers.py
new file mode 100644
index 0000000..a7fcb36
--- /dev/null
+++ b/setup-workers.py
@@ -0,0 +1,115 @@
+# coding=utf-8
+
+'''
+ cloudpy
+ Author : Guillaume "iXce" Seguin
+ Email : guillaume@segu.in (or guillaume.seguin@ens.fr)
+
+ # Simple Cloud Computing system #
+
+ Copyright (C) 2009 Guillaume Seguin
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; either version 2
+ of the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+'''
+
+import os
+import glob
+
+import paramiko
+
+import cloudpy.config
+
+CORE_FILES = [
+ "worker.py",
+ "worker_config.yml",
+ ] \
+ + glob.glob ("cloudpy/*.py") \
+ + glob.glob ("cloudpy/heartbeat/*.py")
+# FIXME : remove this glob
+
+def upload_files (ssh, path):
+ def makedirs (sftp, path):
+ sftp.chdir (".")
+ old_cwd = sftp.getcwd ()
+ try:
+ sftp.chdir (path)
+ except IOError, e:
+ if e.errno != 2:
+ raise e
+ else:
+ sftp.chdir (old_cwd)
+ return
+ for bit in path.split (os.path.sep):
+ try:
+ sftp.chdir (bit)
+ except IOError, e:
+ if e.errno == 2:
+ sftp.mkdir (bit, mode = 0700)
+ sftp.chdir (bit)
+ else:
+ raise e
+ sftp.chdir (old_cwd)
+ sftp = ssh.open_sftp ()
+ makedirs (sftp, path)
+ sftp.chdir (path)
+ for file in CORE_FILES:
+ makedirs (sftp, os.path.dirname (file))
+ sftp.put (file, file)
+ sftp.close ()
+
+def setup_cron (ssh, path):
+ stdin, stdout, stderr = ssh.exec_command ("pwd")
+ stdin.close ()
+ path = os.path.join (stdout.read ().strip (), path)
+ stdin, stdout, stderr = ssh.exec_command ("crontab -l")
+ stdin.close ()
+ current = stdout.read ()
+ new = [line
+ for line in current.splitlines ()
+ if "cloudpy" not in line]
+ new.append ("* * * * * python " + os.path.join (path, "worker.py"))
+ stdin, stdout, stderr = ssh.exec_command ("crontab -")
+ stdin.write ("\n".join (new))
+ stdin.close ()
+
+def setup_worker (worker, username, path):
+ ssh = paramiko.SSHClient ()
+ ssh.set_missing_host_key_policy (paramiko.AutoAddPolicy ())
+ ssh.connect (worker, username = username, allow_agent = True,
+ look_for_keys = False)
+ # upload files
+ upload_files (ssh, path)
+ # setup cron
+ setup_cron (ssh, path)
+ ssh.close ()
+
+def setup_workers (config):
+ workers = config["workers"]
+ del config["workers"]
+ for worker in workers:
+ username = os.getlogin ()
+ path = ""
+ if "@" in worker:
+ username, worker = worker.split ("@", 1)
+ if ":" in worker:
+ worker, path = worker.split (":", 1)
+ config["worker"] = "%s@%s:%s" % (username, worker, path)
+ cloudpy.config.save_worker_config (config)
+ setup_worker (worker, username, path)
+
+if __name__ == "__main__":
+ config = cloudpy.config.load_config ()
+ setup_workers (config)
diff --git a/undeploy.py b/undeploy.py
new file mode 100644
index 0000000..609e5b3
--- /dev/null
+++ b/undeploy.py
@@ -0,0 +1,17 @@
+#!/usr/bin/env python
+
+import sys
+
+import cloudpy.config
+import cloudpy.heartbeat
+
+if __name__ == "__main__":
+ if len (sys.argv) != 2:
+ print "Usage: %s PACKAGE_ID" % sys.argv[0]
+ sys.exit (2)
+ package_id = sys.argv[1]
+ cloudpy.config.load_server_config ()
+ if cloudpy.heartbeat.undeploy (package_id):
+ print "Package is being undeployed."
+ else:
+ print "Could not undeploy package (it probably wasn't deployed)."
diff --git a/worker.py b/worker.py
new file mode 100644
index 0000000..c471deb
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+'''
+ cloudpy
+ Author : Guillaume "iXce" Seguin
+ Email : guillaume@segu.in (or guillaume.seguin@ens.fr)
+
+ # Simple Cloud Computing system #
+
+ Copyright (C) 2009 Guillaume Seguin
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; either version 2
+ of the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+'''
+
+import os
+import fcntl
+import signal
+
+import yaml
+
+import cloudpy.config
+import cloudpy.heartbeat
+import cloudpy.package
+
+if os.path.dirname (__file__):
+ os.chdir (os.path.dirname (__file__))
+
+lock_f = open ("lock", "w")
+fcntl.lockf (lock_f, fcntl.LOCK_EX)
+
+cloudpy.config.load_worker_config ()
+
+running = False
+
+if os.path.exists ("pid"):
+ with open ("pid") as pid_f:
+ pid = pid_f.read ()
+ if pid and os.path.exists (os.path.join ("/proc", pid)):
+ with open (os.path.join ("/proc", pid, "cmdline")) as f:
+ running = "worker.py" in f.read ()
+
+package = None
+
+if running:
+ kill = cloudpy.heartbeat.update (running = True)
+ if kill: # kill the worker
+ os.kill (int (pid), signal.SIGKILL)
+ running = False
+ fcntl.lockf (lock_f, fcntl.LOCK_UN)
+ pid_f.close ()
+if not running:
+ package = cloudpy.heartbeat.update (running = False)
+ if package: # we are going to run a package : set pid file ...
+ with open ("pid", "w") as pid_f:
+ pid_f.write (str (os.getpid ()))
+ # ... and build Package object
+ package = cloudpy.package.Package.from_dict (package)
+ fcntl.lockf (lock_f, fcntl.LOCK_UN)
+ pid_f.close ()
+
+if not package:
+ raise SystemExit
+
+# package is set here : deploy it
+package.deploy ()
+# run package
+package.run ()