After sorting out my clutter issues and finally producing a video of a clutter animation, I thought I’d use it on the initial goal, that animation I had written ages ago. What I had sadly forecast occurred, the video dumping awfully slowed down the animation.
The whole problem is that, for now at least, I don’t think it’s possible to run the animation frame per frame rather than time based. So I thought “let’s just defer the whole video generation to after the end of the animation, and bufferize the frames meanwhile”. Well, this worked… until oomkiller jumped in and killed my process. Urgh.
So, I can’t bufferize the whole video, but I can’t push the frames to gstreamer in real time directly from the animation either. Well, all I need is parallelism then ! Push frames to a queue which is consumed by another execution unit (by pushing the frames to gstreamer). And since threading pretty much sucks in Python (well, it would definitely since we need real parallelism), let’s use the new multiprocessing framework from Python 2.6. Using it is pretty straightforward : create some parallel structures (queues, pipes), spawn a new process with it’s own main function, push to the structures from one process, read from another, and you’re done. The only thing I’m still wondering is why there is a close() function on Queues when there is no obvious way to detect from the other end that the queue has been closed (which I worked around by pushing a None message).
Well, now I have a smooth animation and a smooth video dump, my two cores being nicely fully used
The code is available below, with the interesting parts being StageRecorder.create_pipeline, StageRecorder.dump_frame, StageRecorder.stop_recording and StageRecorder.process_runner.
import clutter import gst import gobject from multiprocessing import Process, Queue DEFAULT_OUTPUT = "stage.ogg" DEFAULT_PIPELINE_DESC = "videorate ! theoraenc ! oggmux" class StageRecorderSrc (gst.Element): """Gstreamer element used to push our buffers into the pipeline""" __gstdetails__ = ( "Clutter Stage Recorder Source plugin", "stagerecorder.py", "", "") _src_template = gst.PadTemplate ("src", gst.PAD_SRC, gst.PAD_ALWAYS, gst.caps_new_any ()) __gsttemplates__ = (_src_template,) def __init__ (self, *args, **kwargs): gst.Element.__init__ (self, *args, **kwargs) self.src_pad = gst.Pad (self._src_template) self.src_pad.use_fixed_caps () self.add_pad (self.src_pad) gobject.type_register (StageRecorderSrc) class StageRecorder (object): """Clutter stage recorder which dumps frames into a gstreamer pipeline""" stage = None output_filename = None pipeline_desc = None parallel = False pipeline = None src = None buffer_queue = None process = None stage_width = 0 stage_height = 0 clock_start = -1 def __init__ (self, stage, pipeline_desc = DEFAULT_PIPELINE_DESC, output_filename = DEFAULT_OUTPUT, parallel = False): self.stage = stage stage.connect ("destroy", self.stop_recording) stage.connect_after ("paint", self.dump_frame) stage.connect ("notify::width", self.update_size) stage.connect ("notify::height", self.update_size) self.pipeline_desc = pipeline_desc self.output_filename = output_filename self.dumping = False self.pipeline = None self.src = None self.parallel = parallel self.clock_start = -1 def create_pipeline (self): """Create the gstreamer pipeline and run it""" self.pipeline = gst.parse_launch (self.pipeline_desc) if not self.pipeline: raise RuntimeError, "Couldn't create pipeline" self.add_source () self.add_sink () self.pipeline.set_state (gst.STATE_PLAYING) if self.parallel: self.buffer_queue = Queue () self.process = Process (target = self.process_runner, args = (self.buffer_queue,)) self.process.start () def add_source (self): """Add our data source and its filters""" sink_pad = self.pipeline.find_unlinked_pad (gst.PAD_SINK) if not sink_pad: raise RuntimeError, "Pipeline has no unlinked sink pad" src_element = StageRecorderSrc () self.pipeline.add (src_element) self.src = src_element.get_static_pad ("src") # The ffmpegcolorspace element is a generic converter; it will convert # our supplied fixed format data into whatever the encoder wants ffmpegcolorspace = gst.element_factory_make ("ffmpegcolorspace") if not ffmpegcolorspace: raise RuntimeError, "Can't create ffmpegcolorspace element" self.pipeline.add (ffmpegcolorspace) # No need to verticalflip here since clutter_stage_read_pixels did src_element.link (ffmpegcolorspace) src_pad = ffmpegcolorspace.get_static_pad ("src") if not src_pad: raise RuntimeError, "Can't get src pad to link into pipeline" if src_pad.link (sink_pad) != gst.PAD_LINK_OK: raise RuntimeError, "Can't link to sink pad" def add_sink (self): """Add the final filesink""" src_pad = self.pipeline.find_unlinked_pad (gst.PAD_SRC) if not src_pad: raise RuntimeError, "Pipeline has no unlinked src pad" filesink = gst.parse_launch ("filesink location=%s" \ % self.output_filename) if not filesink: raise RuntimeError, "Can't create filesink element" self.pipeline.add (filesink) sink_pad = filesink.get_static_pad ("sink") if not sink_pad: raise RuntimeError, "Can't get sink pad to link pipeline output" if src_pad.link (sink_pad) != gst.PAD_LINK_OK: raise RuntimeError, "Can't link to sink pad" def dump_frame (self, stage): """Dump a frame to the gstreamer pipeline""" # Prevent an infinite loop (clutter.Stage.read_pixels # performs another paint) if not self.dumping: self.dumping = True if not self.pipeline: self.create_pipeline () data = self.stage.read_pixels (0, 0, self.stage_width, self.stage_height) # Use clutter clock if self.clock_start == -1: self.clock_start = clutter.get_timestamp () timestamp = clutter.get_timestamp () - self.clock_start timestamp *= 1000 if self.parallel: self.buffer_queue.put ((data, timestamp)) else: buffer = gst.Buffer (data) buffer.timestamp = timestamp # Don't forget to set the right caps on the buffer self.set_caps_on (buffer) status = self.src.push (buffer) if status != gst.FLOW_OK: raise RuntimeError, "Error while pushing buffer : " + status self.dumping = False def update_size (self, stage = None, param = None): """Update the size of the gstreamer frames based on the stage size""" x1, y1, x2, y2 = self.stage.get_allocation_box () self.stage_width = int (0.5 + x2 - x1); self.stage_height = int (0.5 + y2 - y1); def set_caps_on (self, dest): """Set the current frame caps on the specified object""" # The data is always native-endian xRGB; ffmpegcolorspace # doesn't support little-endian xRGB, but does support # big-endian BGRx. caps = gst.caps_from_string ("video/x-raw-rgb,bpp=32,depth=24,\ red_mask=0xff000000,\ green_mask=0x00ff0000,\ blue_mask=0x0000ff00,\ endianness=4321,\ framerate=15/1,\ width=%d,height=%d" \ % (self.stage_width, self.stage_height)) if dest: dest.set_caps (caps) def stop_recording (self, stage): if self.parallel: self.buffer_queue.put (None) self.pipeline.set_state (gst.STATE_NULL) def process_runner (self, queue): while True: message = queue.get () if not message: return else: data, timestamp = message buffer = gst.Buffer (data) buffer.timestamp = timestamp self.set_caps_on (buffer) status = self.src.push (buffer) if status != gst.FLOW_OK: raise RuntimeError, "Error while pushing buffer : " + status
April 23rd, 2010 at 3:43 am
[...] This post was mentioned on Twitter by ixce. ixce said: Useful parallelism with Python using multiprocessing : http://is.gd/bCuaD [...]
January 26th, 2011 at 1:58 pm
That’s exactly my problem right now. Thank you, I hope your solution will work also for me!!!