April 21st, 2010 @ 20:54

After sorting out my clutter issues and finally producing a video of a clutter animation, I thought I’d use it on the initial goal, that animation I had written ages ago. What I had sadly forecast occurred, the video dumping awfully slowed down the animation.

The whole problem is that, for now at least, I don’t think it’s possible to run the animation frame per frame rather than time based. So I thought “let’s just defer the whole video generation to after the end of the animation, and bufferize the frames meanwhile”. Well, this worked… until oomkiller jumped in and killed my process. Urgh.

So, I can’t bufferize the whole video, but I can’t push the frames to gstreamer in real time directly from the animation either. Well, all I need is parallelism then ! Push frames to a queue which is consumed by another execution unit (by pushing the frames to gstreamer). And since threading pretty much sucks in Python (well, it would definitely since we need real parallelism), let’s use the new multiprocessing framework from Python 2.6. Using it is pretty straightforward : create some parallel structures (queues, pipes), spawn a new process with it’s own main function, push to the structures from one process, read from another, and you’re done. The only thing I’m still wondering is why there is a close() function on Queues when there is no obvious way to detect from the other end that the queue has been closed (which I worked around by pushing a None message).

Well, now I have a smooth animation and a smooth video dump, my two cores being nicely fully used :)

The code is available below, with the interesting parts being StageRecorder.create_pipeline, StageRecorder.dump_frame, StageRecorder.stop_recording and StageRecorder.process_runner.

import clutter
import gst
import gobject
 
from multiprocessing import Process, Queue
 
DEFAULT_OUTPUT = "stage.ogg"
DEFAULT_PIPELINE_DESC = "videorate ! theoraenc ! oggmux"
 
class StageRecorderSrc (gst.Element):
    """Gstreamer element used to push our buffers into the pipeline"""
 
    __gstdetails__ = (
        "Clutter Stage Recorder Source plugin",
        "stagerecorder.py",
        "",
        "")
 
    _src_template = gst.PadTemplate ("src",
                                     gst.PAD_SRC,
                                     gst.PAD_ALWAYS,
                                     gst.caps_new_any ())
 
    __gsttemplates__ = (_src_template,)
 
    def __init__ (self, *args, **kwargs):
        gst.Element.__init__ (self, *args, **kwargs)
        self.src_pad = gst.Pad (self._src_template)
        self.src_pad.use_fixed_caps ()
        self.add_pad (self.src_pad)
 
gobject.type_register (StageRecorderSrc)
 
class StageRecorder (object):
    """Clutter stage recorder which dumps frames into a gstreamer pipeline"""
 
    stage = None
    output_filename = None
    pipeline_desc = None
    parallel = False
 
    pipeline = None
    src = None
    buffer_queue = None
    process = None
 
    stage_width = 0
    stage_height = 0
 
    clock_start = -1
 
    def __init__ (self,
                  stage,
                  pipeline_desc = DEFAULT_PIPELINE_DESC,
                  output_filename = DEFAULT_OUTPUT,
                  parallel = False):
        self.stage = stage
        stage.connect ("destroy", self.stop_recording)
        stage.connect_after ("paint", self.dump_frame)
        stage.connect ("notify::width", self.update_size)
        stage.connect ("notify::height", self.update_size)
        self.pipeline_desc = pipeline_desc
        self.output_filename = output_filename
        self.dumping = False
        self.pipeline = None
        self.src = None
        self.parallel = parallel
        self.clock_start = -1
 
    def create_pipeline (self):
        """Create the gstreamer pipeline and run it"""
        self.pipeline = gst.parse_launch (self.pipeline_desc)
        if not self.pipeline:
            raise RuntimeError, "Couldn't create pipeline"
        self.add_source ()
        self.add_sink ()
        self.pipeline.set_state (gst.STATE_PLAYING)
        if self.parallel:
            self.buffer_queue = Queue ()
            self.process = Process (target = self.process_runner,
                                    args = (self.buffer_queue,))
            self.process.start ()
 
    def add_source (self):
        """Add our data source and its filters"""
        sink_pad = self.pipeline.find_unlinked_pad (gst.PAD_SINK)
        if not sink_pad:
            raise RuntimeError, "Pipeline has no unlinked sink pad"
 
        src_element = StageRecorderSrc ()
        self.pipeline.add (src_element)
        self.src = src_element.get_static_pad ("src")
 
        # The ffmpegcolorspace element is a generic converter; it will convert
        # our supplied fixed format data into whatever the encoder wants
        ffmpegcolorspace = gst.element_factory_make ("ffmpegcolorspace")
        if not ffmpegcolorspace:
            raise RuntimeError, "Can't create ffmpegcolorspace element"
        self.pipeline.add (ffmpegcolorspace)
 
        # No need to verticalflip here since clutter_stage_read_pixels did
 
        src_element.link (ffmpegcolorspace)
 
        src_pad = ffmpegcolorspace.get_static_pad ("src")
        if not src_pad:
            raise RuntimeError, "Can't get src pad to link into pipeline"
 
        if src_pad.link (sink_pad) != gst.PAD_LINK_OK:
            raise RuntimeError, "Can't link to sink pad"
 
    def add_sink (self):
        """Add the final filesink"""
        src_pad = self.pipeline.find_unlinked_pad (gst.PAD_SRC)
        if not src_pad:
            raise RuntimeError, "Pipeline has no unlinked src pad"
        filesink = gst.parse_launch ("filesink location=%s" \
                                        % self.output_filename)
        if not filesink:
            raise RuntimeError, "Can't create filesink element"
        self.pipeline.add (filesink)
 
        sink_pad = filesink.get_static_pad ("sink")
        if not sink_pad:
            raise RuntimeError, "Can't get sink pad to link pipeline output"
 
        if src_pad.link (sink_pad) != gst.PAD_LINK_OK:
            raise RuntimeError, "Can't link to sink pad"
 
    def dump_frame (self, stage):
        """Dump a frame to the gstreamer pipeline"""
        # Prevent an infinite loop (clutter.Stage.read_pixels
        # performs another paint)
        if not self.dumping:
            self.dumping = True
            if not self.pipeline:
                self.create_pipeline ()
            data = self.stage.read_pixels (0, 0,
                                           self.stage_width,
                                           self.stage_height)
            # Use clutter clock
            if self.clock_start == -1:
                self.clock_start = clutter.get_timestamp ()
            timestamp = clutter.get_timestamp () - self.clock_start
            timestamp *= 1000
            if self.parallel:
                self.buffer_queue.put ((data, timestamp))
            else:
                buffer = gst.Buffer (data)
                buffer.timestamp = timestamp
                # Don't forget to set the right caps on the buffer
                self.set_caps_on (buffer)
                status = self.src.push (buffer)
                if status != gst.FLOW_OK:
                    raise RuntimeError, "Error while pushing buffer : " + status
            self.dumping = False
 
    def update_size (self, stage = None, param = None):
        """Update the size of the gstreamer frames based on the stage size"""
        x1, y1, x2, y2 = self.stage.get_allocation_box ()
        self.stage_width = int (0.5 + x2 - x1);
        self.stage_height = int (0.5 + y2 - y1);
 
    def set_caps_on (self, dest):
        """Set the current frame caps on the specified object"""
        # The data is always native-endian xRGB; ffmpegcolorspace
        # doesn't support little-endian xRGB, but does support
        # big-endian BGRx.
        caps = gst.caps_from_string ("video/x-raw-rgb,bpp=32,depth=24,\
                                      red_mask=0xff000000,\
                                      green_mask=0x00ff0000,\
                                      blue_mask=0x0000ff00,\
                                      endianness=4321,\
                                      framerate=15/1,\
                                      width=%d,height=%d" \
                                        % (self.stage_width,
                                           self.stage_height))
        if dest:
            dest.set_caps (caps)
 
    def stop_recording (self, stage):
        if self.parallel:
            self.buffer_queue.put (None)
        self.pipeline.set_state (gst.STATE_NULL)
 
    def process_runner (self, queue):
        while True:
            message = queue.get ()
            if not message:
                return
            else:
                data, timestamp = message
                buffer = gst.Buffer (data)
                buffer.timestamp = timestamp
                self.set_caps_on (buffer)
                status = self.src.push (buffer)
                if status != gst.FLOW_OK:
                    raise RuntimeError, "Error while pushing buffer : " + status