Friday, January 13, 2012

gstreamer appsrc in action

Lately I have been exploring gstreamer to play AV from a transport stream demultiplexer that I am developing (mostly for fun, slightly for work). After some research (read googling for play video using gstreamer), I concluded that gstreamer-appsrc is the way to go. Since I had a hard time finding a working example in the Internet on using this plugin, I thought I would share my test application here. If you are new to gstreamer, keep in mind that I am only couple of days ahead of you and I might have overlooked or missed even basic things in this post; and if you in fact know gstreamer, feel free to mention any such omissions you notice here so that I can correct them.

Here is a quick intro to gstreamer for new comers. It is a media framework based on plugins - you can read all about gstreamer here. That page says prior knowledge of glib and gobject is assumed; while proper knowledge of these two will help you progress faster, I didn't know either and I survived. The point is, you can (and you have to) learn those parallelly. In a nutshell, you work on a gstreamer pipeline which is basically a series of plugins where each one takes its input from the previous plugin, does something with it and gives the output to the next one. The data is passed around in buffers through the source and sink pads of the plugins. Usually at the left end of a pipeline there will be a pure source (like filesrc that reads data from a file and feeds the pipeline) and the last one would be a pure sink that plays the media. The typical steps in creating a gstreamer based player can be summarized as:
  1. Create your elements and set their properties, signal handlers and callbacks
  2. Add them to a bin (pipeline is a bin)
  3. Get a reference to the bin's bus and add bus callback for messages - at the minimum, an application should handle error messages and end of stream messages so as to stop the main loop on error and eos.
  4. Link their source and sink pads in the correct order
  5. Set the state of bin to playing and start the glib main loop.
You can find a gstreamer helloworld application and its explanation here. It uses a playbin, a readymade pipeline that does most of the work for us. It is good enough as long as you are not producing any media. But if you are producing some media data in your application and you need to play it using gstreamer, you need a way to feed this data to the pipeline. That is where appsrc comes into picture. The appsrc element allows applications to inject buffers into a pipeline.

There is this gstreamer appsrc example code in github, but unfortunately it didn't work for me - and based on the comment in that page, I am not the only one. It uses udpsink to stream data over a network. Since I just wanted to play it on my desktop, I replaced it with an xvimagesink which is used by most of the desktop based gstreamer video examples. But that didn't make any difference either - whenever I run the code, it exits with the following error:

ERROR from element mysource: Internal data flow error.
Debugging info: gstbasesrc.c(2582): gst_base_src_loop (): /GstPipeline:pipeline0/GstAppSrc:mysource:
streaming task paused, reason not-negotiated (-4)

As usual I resorted to StackOverflow for an answer, but apparently there aren't many gstreamer enthusiasts roaming in there. After earning my first tumbleweed badge for this question, I decided to do it myself. I started with this simple gstreamer AVI player. It uses the following pipeline:

filesrc -> avidemux -> decodebin -> customfilter -> ffmpegcolorspace -> videoscale -> autovideosink  

This pipeline is sort of self explanatory. The filesrc reads the avi file and passes the data to the demux where it gets split into audio and video. To keep things short, let us ignore audio. The video data is decoded in decodebin and the raw data is processed further and displayed using the last three plugins (gotta admit that I don't know nittygritties of those three; there are detailed websites out there explaining them - help yourself). If you look at the code in that page, you can see that not all the elements are linked together at the beginning. This is because in some plugins the source (output) pads are created dynamically based on the input data. So you have to listen for appropriate signals like pad-added on such elements and do the linking from the callbacks.

Since we are dealing with mpeg video, we don't need avidemux. And we can replace videoscale and autovideosink with an xvimagesink for the sake of brevity. So it comes down to replacing filesrc with an appsrc and feeding the data to the decoder. Let us create the elements:

/* Start by creating a new pipeline and the elements */
app->pipeline = (GstPipeline*)gst_pipeline_new("mypipeline");

/* Add bus callback */
bus = gst_pipeline_get_bus(app->pipeline);
gst_bus_add_watch(bus, (GstBusFunc)bus_callback, app);
gst_object_unref(bus);

/* Add signal handlers on appsrc */
g_signal_connect(app->src, "need-data", G_CALLBACK(start_feed), app);
g_signal_connect(app->src, "enough-data", G_CALLBACK(stop_feed), app);


/* Create the elements */
app->src = (GstAppSrc*)gst_element_factory_make("appsrc", "mysrc");
app->decoder = gst_element_factory_make("decodebin", "mydecoder");
app->ffmpeg = gst_element_factory_make("ffmpegcolorspace", "myffmpeg");
app->xvimagesink = gst_element_factory_make("xvimagesink", "myvsink");

/* Add elements to the pipeline */
gst_bin_add_many(GST_BIN(app->pipeline), (GstElement*)app->src, app->decoder, app->ffmpeg, app->xvimagesink, NULL);

/* Link them together - this should be done after adding to the bin */

if(!gst_element_link((GstElement*)app->src, app->decoder)){
    g_warning("failed to link src anbd decoder");
}

if(!gst_element_link(app->ffmpeg, app->xvimagesink)){
    g_warning("failed to link ffmpeg and xvsink");
}

/* Note that we haven't linked decoder to color-space element 
- that's done from pad-added signal callback. */
g_signal_connect(app->decoder, "pad-added", G_CALLBACK(on_pad_added), app->decoder);


Applications can feed the data to appsrc plugin using two methods; by calling gst_app_src_push_buffer function or by emitting push-buffer signals. I used the first approach since I am not fluent with signals and it gave intermittent warning messages while running. We start by writing a read_data function that reads chunks of data from a global file pointer and creates a GstBuffer and pushes them to the appsrc.

static gboolean read_data(gst_app_t *app)
{
    GstBuffer *buffer;
    guint8 *ptr;
    gint size;
    GstFlowReturn ret;

    ptr = g_malloc(BUFF_SIZE);
    g_assert(ptr);

    size = fread(ptr, 1, BUFF_SIZE, app->file);
    
    if(size == 0){
        ret = gst_app_src_end_of_stream(app->src);
        g_debug("eos returned %d at %d\n", ret, __LINE__);
        return FALSE;
    }

    buffer = gst_buffer_new();
    GST_BUFFER_MALLOCDATA(buffer) = ptr;
    GST_BUFFER_SIZE(buffer) = size;
    GST_BUFFER_DATA(buffer) = GST_BUFFER_MALLOCDATA(buffer);

    ret = gst_app_src_push_buffer(app->src, buffer);

    if(ret !=  GST_FLOW_OK){
        g_debug("push buffer returned %d for %d bytes \n", ret, size);
        return FALSE;
    }

    if(size != BUFF_SIZE){
        ret = gst_app_src_end_of_stream(app->src);
        g_debug("eos returned %d at %d\n", ret, __LINE__);
        return FALSE;
    }

    return TRUE;
}

The appsrc element emits mainly two signals namely need-data and enough-data to tell the application to start and stop feeding data. We need to listen to these callbacks. In the need-data callback, we add our read_data function as an idle handler to the main loop. The glib main loop will call this function from its main loop. When appsrc emits enough-data signal, we just remove this idle handler so that it is not called anymore. The boolean return value of read_data function is used by the main loop to decide whether to call it in the future - so we should return FALSE in case of any errors in pushing the data or we reach end of file.

static void start_feed (GstElement * pipeline, guint size, gst_app_t *app)
{
    if (app->sourceid == 0) {
        /* add idle handle to the main loop */
        app->sourceid = g_idle_add ((GSourceFunc) read_data, app);
    }
}

static void stop_feed (GstElement * pipeline, gst_app_t *app)
{
    if (app->sourceid != 0) {
        GST_DEBUG ("stop feeding");
        g_source_remove (app->sourceid);
        app->sourceid = 0;
    }
}

Now, start the pipeline, create a main loop and run it
gst_element_set_state((GstElement*)app->pipeline, GST_STATE_PLAYING);
app->loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(app->loop);

/* the previous call will return when we call quit from bus_callback - 
set the pipeline to null to do the cleanup */
gst_element_set_state((GstElement*)app->pipeline, GST_STATE_NULL);


And that's it, we got appsrc working. The source should be compiled using appropriate cflags and libs - you can use package config for getting them.

gcc -o testapp gst-testapp.c `pkg-config --cflags --libs  gstreamer-0.10 gstreamer-app-0.10`

If you get compiler or linker errors related to gstreamer, you probably don't have the necessary gstreamer development libraries installed. You can apt-get them using:

sudo apt-get install libgstreamer0.10-dev
sudo apt-get install libgstreamer-plugins-base0.10-dev

Here is the full source code.

/* 
  I don't know if it is syntax highlighter or blogger, but I can't seem to 
  put angle brackets around header file names properly.
*/
#include stdio.h
#include gst/gst.h
#include gst/app/gstappsrc.h

typedef struct {
 GstPipeline *pipeline;
 GstAppSrc *src;
 GstElement *sink;
 GstElement *decoder;
 GstElement *ffmpeg;
 GstElement *xvimagesink;
 GMainLoop *loop;
 guint sourceid;
 FILE *file;
}gst_app_t;

static gst_app_t gst_app;

#define BUFF_SIZE (1024)

static gboolean read_data(gst_app_t *app)
{
 GstBuffer *buffer;
 guint8 *ptr;
 gint size;
 GstFlowReturn ret;

 ptr = g_malloc(BUFF_SIZE);
 g_assert(ptr);

 size = fread(ptr, 1, BUFF_SIZE, app->file);
 
 if(size == 0){
  ret = gst_app_src_end_of_stream(app->src);
  g_debug("eos returned %d at %d\n", ret, __LINE__);
  return FALSE;
 }

 buffer = gst_buffer_new();
 GST_BUFFER_MALLOCDATA(buffer) = ptr;
 GST_BUFFER_SIZE(buffer) = size;
 GST_BUFFER_DATA(buffer) = GST_BUFFER_MALLOCDATA(buffer);

 ret = gst_app_src_push_buffer(app->src, buffer);

 if(ret !=  GST_FLOW_OK){
  g_debug("push buffer returned %d for %d bytes \n", ret, size);
  return FALSE;
 }

 if(size != BUFF_SIZE){
  ret = gst_app_src_end_of_stream(app->src);
  g_debug("eos returned %d at %d\n", ret, __LINE__);
  return FALSE;
 }

 return TRUE;
}

static void start_feed (GstElement * pipeline, guint size, gst_app_t *app)
{
 if (app->sourceid == 0) {
  GST_DEBUG ("start feeding");
  app->sourceid = g_idle_add ((GSourceFunc) read_data, app);
 }
}

static void stop_feed (GstElement * pipeline, gst_app_t *app)
{
 if (app->sourceid != 0) {
  GST_DEBUG ("stop feeding");
  g_source_remove (app->sourceid);
  app->sourceid = 0;
 }
}

static void on_pad_added(GstElement *element, GstPad *pad)
{
 GstCaps *caps;
 GstStructure *str;
 gchar *name;
 GstPad *ffmpegsink;
 GstPadLinkReturn ret;

 g_debug("pad added");

 caps = gst_pad_get_caps(pad);
 str = gst_caps_get_structure(caps, 0);

 g_assert(str);

 name = (gchar*)gst_structure_get_name(str);

 g_debug("pad name %s", name);

 if(g_strrstr(name, "video")){

  ffmpegsink = gst_element_get_pad(gst_app.ffmpeg, "sink");
  g_assert(ffmpegsink);
  ret = gst_pad_link(pad, ffmpegsink);
  g_debug("pad_link returned %d\n", ret);
  gst_object_unref(ffmpegsink);
 }
 gst_caps_unref(caps);
}

static gboolean bus_callback(GstBus *bus, GstMessage *message, gpointer *ptr)
{
 gst_app_t *app = (gst_app_t*)ptr;

 switch(GST_MESSAGE_TYPE(message)){

 case GST_MESSAGE_ERROR:{
  gchar *debug;
  GError *err;

  gst_message_parse_error(message, &err, &debug);
  g_print("Error %s\n", err->message);
  g_error_free(err);
  g_free(debug);
  g_main_loop_quit(app->loop);
 }
 break;

 case GST_MESSAGE_WARNING:{
  gchar *debug;
  GError *err;
  gchar *name;

  gst_message_parse_warning(message, &err, &debug);
  g_print("Warning %s\nDebug %s\n", err->message, debug);

  name = GST_MESSAGE_SRC_NAME(message);

  g_print("Name of src %s\n", name ? name : "nil");
  g_error_free(err);
  g_free(debug);
 }
 break;

 case GST_MESSAGE_EOS:
  g_print("End of stream\n");
  g_main_loop_quit(app->loop);
  break;

 case GST_MESSAGE_STATE_CHANGED:
  break;

 default:
  g_print("got message %s\n", \
   gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
  break;
 }

 return TRUE;
}

int main(int argc, char *argv[])
{
 gst_app_t *app = &gst_app;
 GstBus *bus;
 GstStateChangeReturn state_ret;

 if(argc != 2){
  printf("File name not specified\n");
  return 1;
 }

 app->file = fopen(argv[1], "r");

 g_assert(app->file);

 gst_init(NULL, NULL);

 app->pipeline = (GstPipeline*)gst_pipeline_new("mypipeline");
 bus = gst_pipeline_get_bus(app->pipeline);
 gst_bus_add_watch(bus, (GstBusFunc)bus_callback, app);
 gst_object_unref(bus);

 app->src = (GstAppSrc*)gst_element_factory_make("appsrc", "mysrc");
 app->decoder = gst_element_factory_make("decodebin", "mydecoder");
 app->ffmpeg = gst_element_factory_make("ffmpegcolorspace", "myffmpeg");
 app->xvimagesink = gst_element_factory_make("xvimagesink", "myvsink");

 g_assert(app->src);
 g_assert(app->decoder);
 g_assert(app->ffmpeg);
 g_assert(app->xvimagesink);

 g_signal_connect(app->src, "need-data", G_CALLBACK(start_feed), app);
 g_signal_connect(app->src, "enough-data", G_CALLBACK(stop_feed), app);
 g_signal_connect(app->decoder, "pad-added", 
  G_CALLBACK(on_pad_added), app->decoder);

 gst_bin_add_many(GST_BIN(app->pipeline), (GstElement*)app->src, 
  app->decoder, app->ffmpeg, app->xvimagesink, NULL);

 if(!gst_element_link((GstElement*)app->src, app->decoder)){
  g_warning("failed to link src anbd decoder");
 }

 if(!gst_element_link(app->ffmpeg, app->xvimagesink)){
  g_warning("failed to link ffmpeg and xvsink");
 }

 state_ret = gst_element_set_state((GstElement*)app->pipeline, GST_STATE_PLAYING);
 g_warning("set state returned %d\n", state_ret);

 app->loop = g_main_loop_new(NULL, FALSE);

 g_main_loop_run(app->loop);

 state_ret = gst_element_set_state((GstElement*)app->pipeline, GST_STATE_NULL);
 g_warning("set state null returned %d\n", state_ret);

 return 0;
}

This just shows appsrc in action - the real work is integrating it with your application. Remember that g_main_loop_run will return only when you call g_main_loop_quit - so you might want to call it from its own thread.

If you find it helpful or wrong, please share your feedback.