Added gst-plugins-base-subtitles0.10-0.10.34 for Meego Harmattan 1.2
[mafwsubrenderer] / gst-plugins-base-subtitles0.10 / ext / ogg / gstoggmux.c
1 /* OGG muxer plugin for GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-oggmux
23  * @see_also: <link linkend="gst-plugins-base-plugins-oggdemux">oggdemux</link>
24  *
25  * This element merges streams (audio and video) into ogg files.
26  *
27  * <refsect2>
28  * <title>Example pipelines</title>
29  * |[
30  * gst-launch v4l2src num-buffers=500 ! video/x-raw-yuv,width=320,height=240 ! ffmpegcolorspace ! theoraenc ! oggmux ! filesink location=video.ogg
31  * ]| Encodes a video stream captured from a v4l2-compatible camera to Ogg/Theora
32  * (the encoding will stop automatically after 500 frames)
33  * </refsect2>
34  *
35  * Last reviewed on 2008-02-06 (0.10.17)
36  */
37
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41
42 #include <gst/gst.h>
43 #include <gst/base/gstcollectpads.h>
44 #include <gst/tag/tag.h>
45
46 #include "gstoggmux.h"
47
48 /* memcpy - if someone knows a way to get rid of it, please speak up
49  * note: the ogg docs even say you need this... */
50 #include <string.h>
51 #include <time.h>
52 #include <stdlib.h>             /* rand, srand, atoi */
53
54 GST_DEBUG_CATEGORY_STATIC (gst_ogg_mux_debug);
55 #define GST_CAT_DEFAULT gst_ogg_mux_debug
56
57 /* This isn't generally what you'd want with an end-time macro, because
58    technically the end time of a buffer with invalid duration is invalid. But
59    for sorting ogg pages this is what we want. */
60 #define GST_BUFFER_END_TIME(buf) \
61     (GST_BUFFER_DURATION_IS_VALID (buf) \
62     ? GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf) \
63     : GST_BUFFER_TIMESTAMP (buf))
64
65 #define GST_BUFFER_RUNNING_TIME(buf, oggpad) \
66     (GST_BUFFER_DURATION_IS_VALID (buf) \
67     ? gst_segment_to_running_time (&(oggpad)->segment, GST_FORMAT_TIME, \
68     GST_BUFFER_TIMESTAMP (buf)) : 0)
69
70 #define GST_GP_FORMAT "[gp %8" G_GINT64_FORMAT "]"
71 #define GST_GP_CAST(_gp) ((gint64) _gp)
72
73 typedef enum
74 {
75   GST_OGG_FLAG_BOS = GST_ELEMENT_FLAG_LAST,
76   GST_OGG_FLAG_EOS
77 }
78 GstOggFlag;
79
80 /* OggMux signals and args */
81 enum
82 {
83   /* FILL ME */
84   LAST_SIGNAL
85 };
86
87 /* set to 0.5 seconds by default */
88 #define DEFAULT_MAX_DELAY       G_GINT64_CONSTANT(500000000)
89 #define DEFAULT_MAX_PAGE_DELAY  G_GINT64_CONSTANT(500000000)
90 enum
91 {
92   ARG_0,
93   ARG_MAX_DELAY,
94   ARG_MAX_PAGE_DELAY,
95 };
96
97 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
98     GST_PAD_SRC,
99     GST_PAD_ALWAYS,
100     GST_STATIC_CAPS ("application/ogg")
101     );
102
103 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%d",
104     GST_PAD_SINK,
105     GST_PAD_REQUEST,
106     GST_STATIC_CAPS ("video/x-theora; "
107         "audio/x-vorbis; audio/x-flac; audio/x-speex; audio/x-celt; "
108         "application/x-ogm-video; application/x-ogm-audio; video/x-dirac; "
109         "video/x-smoke; video/x-vp8; text/x-cmml, encoded = (boolean) TRUE; "
110         "subtitle/x-kate; application/x-kate")
111     );
112
113 static void gst_ogg_mux_base_init (gpointer g_class);
114 static void gst_ogg_mux_class_init (GstOggMuxClass * klass);
115 static void gst_ogg_mux_init (GstOggMux * ogg_mux);
116 static void gst_ogg_mux_finalize (GObject * object);
117
118 static GstFlowReturn
119 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux);
120 static gboolean gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event);
121 static GstPad *gst_ogg_mux_request_new_pad (GstElement * element,
122     GstPadTemplate * templ, const gchar * name);
123 static void gst_ogg_mux_release_pad (GstElement * element, GstPad * pad);
124
125 static void gst_ogg_mux_set_property (GObject * object,
126     guint prop_id, const GValue * value, GParamSpec * pspec);
127 static void gst_ogg_mux_get_property (GObject * object,
128     guint prop_id, GValue * value, GParamSpec * pspec);
129 static GstStateChangeReturn gst_ogg_mux_change_state (GstElement * element,
130     GstStateChange transition);
131
132 static GstElementClass *parent_class = NULL;
133
134 /*static guint gst_ogg_mux_signals[LAST_SIGNAL] = { 0 }; */
135
136 GType
137 gst_ogg_mux_get_type (void)
138 {
139   static GType ogg_mux_type = 0;
140
141   if (G_UNLIKELY (ogg_mux_type == 0)) {
142     static const GTypeInfo ogg_mux_info = {
143       sizeof (GstOggMuxClass),
144       gst_ogg_mux_base_init,
145       NULL,
146       (GClassInitFunc) gst_ogg_mux_class_init,
147       NULL,
148       NULL,
149       sizeof (GstOggMux),
150       0,
151       (GInstanceInitFunc) gst_ogg_mux_init,
152     };
153     static const GInterfaceInfo preset_info = {
154       NULL,
155       NULL,
156       NULL
157     };
158
159     ogg_mux_type =
160         g_type_register_static (GST_TYPE_ELEMENT, "GstOggMux", &ogg_mux_info,
161         0);
162
163     g_type_add_interface_static (ogg_mux_type, GST_TYPE_PRESET, &preset_info);
164   }
165   return ogg_mux_type;
166 }
167
168 static void
169 gst_ogg_mux_base_init (gpointer g_class)
170 {
171   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
172
173   gst_element_class_add_pad_template (element_class,
174       gst_static_pad_template_get (&src_factory));
175   gst_element_class_add_pad_template (element_class,
176       gst_static_pad_template_get (&sink_factory));
177
178   gst_element_class_set_details_simple (element_class,
179       "Ogg muxer", "Codec/Muxer",
180       "mux ogg streams (info about ogg: http://xiph.org)",
181       "Wim Taymans <wim@fluendo.com>");
182 }
183
184 static void
185 gst_ogg_mux_class_init (GstOggMuxClass * klass)
186 {
187   GObjectClass *gobject_class;
188   GstElementClass *gstelement_class;
189
190   gobject_class = (GObjectClass *) klass;
191   gstelement_class = (GstElementClass *) klass;
192
193   parent_class = g_type_class_peek_parent (klass);
194
195   gobject_class->finalize = gst_ogg_mux_finalize;
196   gobject_class->get_property = gst_ogg_mux_get_property;
197   gobject_class->set_property = gst_ogg_mux_set_property;
198
199   gstelement_class->request_new_pad = gst_ogg_mux_request_new_pad;
200   gstelement_class->release_pad = gst_ogg_mux_release_pad;
201
202   g_object_class_install_property (gobject_class, ARG_MAX_DELAY,
203       g_param_spec_uint64 ("max-delay", "Max delay",
204           "Maximum delay in multiplexing streams", 0, G_MAXUINT64,
205           DEFAULT_MAX_DELAY,
206           (GParamFlags) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
207   g_object_class_install_property (gobject_class, ARG_MAX_PAGE_DELAY,
208       g_param_spec_uint64 ("max-page-delay", "Max page delay",
209           "Maximum delay for sending out a page", 0, G_MAXUINT64,
210           DEFAULT_MAX_PAGE_DELAY,
211           (GParamFlags) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
212
213   gstelement_class->change_state = gst_ogg_mux_change_state;
214
215 }
216
217 #if 0
218 static const GstEventMask *
219 gst_ogg_mux_get_sink_event_masks (GstPad * pad)
220 {
221   static const GstEventMask gst_ogg_mux_sink_event_masks[] = {
222     {GST_EVENT_EOS, 0},
223     {GST_EVENT_DISCONTINUOUS, 0},
224     {0,}
225   };
226
227   return gst_ogg_mux_sink_event_masks;
228 }
229 #endif
230
231 static void
232 gst_ogg_mux_clear (GstOggMux * ogg_mux)
233 {
234   ogg_mux->pulling = NULL;
235   ogg_mux->need_headers = TRUE;
236   ogg_mux->delta_pad = NULL;
237   ogg_mux->offset = 0;
238   ogg_mux->next_ts = 0;
239   ogg_mux->last_ts = GST_CLOCK_TIME_NONE;
240 }
241
242 static void
243 gst_ogg_mux_init (GstOggMux * ogg_mux)
244 {
245   GstElementClass *klass = GST_ELEMENT_GET_CLASS (ogg_mux);
246
247   ogg_mux->srcpad =
248       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
249           "src"), "src");
250   gst_pad_set_event_function (ogg_mux->srcpad, gst_ogg_mux_handle_src_event);
251   gst_element_add_pad (GST_ELEMENT (ogg_mux), ogg_mux->srcpad);
252
253   GST_OBJECT_FLAG_SET (GST_ELEMENT (ogg_mux), GST_OGG_FLAG_BOS);
254
255   /* seed random number generator for creation of serial numbers */
256   srand (time (NULL));
257
258   ogg_mux->collect = gst_collect_pads_new ();
259   gst_collect_pads_set_function (ogg_mux->collect,
260       (GstCollectPadsFunction) GST_DEBUG_FUNCPTR (gst_ogg_mux_collected),
261       ogg_mux);
262
263   ogg_mux->max_delay = DEFAULT_MAX_DELAY;
264   ogg_mux->max_page_delay = DEFAULT_MAX_PAGE_DELAY;
265
266   gst_ogg_mux_clear (ogg_mux);
267 }
268
269 static void
270 gst_ogg_mux_finalize (GObject * object)
271 {
272   GstOggMux *ogg_mux;
273
274   ogg_mux = GST_OGG_MUX (object);
275
276   if (ogg_mux->collect) {
277     gst_object_unref (ogg_mux->collect);
278     ogg_mux->collect = NULL;
279   }
280
281   G_OBJECT_CLASS (parent_class)->finalize (object);
282 }
283
284 static void
285 gst_ogg_mux_ogg_pad_destroy_notify (GstCollectData * data)
286 {
287   GstOggPadData *oggpad = (GstOggPadData *) data;
288   GstBuffer *buf;
289
290   ogg_stream_clear (&oggpad->map.stream);
291   gst_caps_replace (&oggpad->map.caps, NULL);
292
293   if (oggpad->pagebuffers) {
294     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
295       gst_buffer_unref (buf);
296     }
297     g_queue_free (oggpad->pagebuffers);
298     oggpad->pagebuffers = NULL;
299   }
300 }
301
302 static GstPadLinkReturn
303 gst_ogg_mux_sinkconnect (GstPad * pad, GstPad * peer)
304 {
305   GstOggMux *ogg_mux;
306
307   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
308
309   GST_DEBUG_OBJECT (ogg_mux, "sinkconnect triggered on %s", GST_PAD_NAME (pad));
310
311   gst_object_unref (ogg_mux);
312
313   return GST_PAD_LINK_OK;
314 }
315
316 static gboolean
317 gst_ogg_mux_sink_event (GstPad * pad, GstEvent * event)
318 {
319   GstOggMux *ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
320   GstOggPadData *ogg_pad = (GstOggPadData *) gst_pad_get_element_private (pad);
321   gboolean ret = FALSE;
322
323   GST_DEBUG_OBJECT (pad, "Got %s event", GST_EVENT_TYPE_NAME (event));
324
325   switch (GST_EVENT_TYPE (event)) {
326     case GST_EVENT_NEWSEGMENT:{
327       gboolean update;
328       gdouble rate;
329       gdouble applied_rate;
330       GstFormat format;
331       gint64 start, stop, position;
332
333       gst_event_parse_new_segment_full (event, &update, &rate,
334           &applied_rate, &format, &start, &stop, &position);
335
336       /* We don't support non time NEWSEGMENT events */
337       if (format != GST_FORMAT_TIME) {
338         gst_event_unref (event);
339         event = NULL;
340         break;
341       }
342
343       gst_segment_set_newsegment_full (&ogg_pad->segment, update, rate,
344           applied_rate, format, start, stop, position);
345
346       break;
347     }
348     case GST_EVENT_FLUSH_STOP:{
349       gst_segment_init (&ogg_pad->segment, GST_FORMAT_TIME);
350       break;
351     }
352     default:
353       break;
354   }
355
356   /* now GstCollectPads can take care of the rest, e.g. EOS */
357   if (event != NULL)
358     ret = ogg_pad->collect_event (pad, event);
359
360   gst_object_unref (ogg_mux);
361   return ret;
362 }
363
364 static gboolean
365 gst_ogg_mux_is_serialno_present (GstOggMux * ogg_mux, guint32 serialno)
366 {
367   GSList *walk;
368
369   walk = ogg_mux->collect->data;
370   while (walk) {
371     GstOggPadData *pad = (GstOggPadData *) walk->data;
372     if (pad->map.serialno == serialno)
373       return TRUE;
374     walk = walk->next;
375   }
376
377   return FALSE;
378 }
379
380 static guint32
381 gst_ogg_mux_generate_serialno (GstOggMux * ogg_mux)
382 {
383   guint32 serialno;
384
385   do {
386     serialno = g_random_int_range (0, G_MAXINT32);
387   } while (gst_ogg_mux_is_serialno_present (ogg_mux, serialno));
388
389   return serialno;
390 }
391
392 static GstPad *
393 gst_ogg_mux_request_new_pad (GstElement * element,
394     GstPadTemplate * templ, const gchar * req_name)
395 {
396   GstOggMux *ogg_mux;
397   GstPad *newpad;
398   GstElementClass *klass;
399
400   g_return_val_if_fail (templ != NULL, NULL);
401
402   if (templ->direction != GST_PAD_SINK)
403     goto wrong_direction;
404
405   g_return_val_if_fail (GST_IS_OGG_MUX (element), NULL);
406   ogg_mux = GST_OGG_MUX (element);
407
408   klass = GST_ELEMENT_GET_CLASS (element);
409
410   if (templ != gst_element_class_get_pad_template (klass, "sink_%d"))
411     goto wrong_template;
412
413   {
414     gint serial;
415     gchar *name;
416
417     if (req_name == NULL || strlen (req_name) < 6) {
418       /* no name given when requesting the pad, use random serial number */
419       serial = gst_ogg_mux_generate_serialno (ogg_mux);
420     } else {
421       /* parse serial number from requested padname */
422       serial = atoi (&req_name[5]);
423     }
424     /* create new pad with the name */
425     GST_DEBUG_OBJECT (ogg_mux, "Creating new pad for serial %d", serial);
426     name = g_strdup_printf ("sink_%d", serial);
427     newpad = gst_pad_new_from_template (templ, name);
428     g_free (name);
429
430     /* construct our own wrapper data structure for the pad to
431      * keep track of its status */
432     {
433       GstOggPadData *oggpad;
434
435       oggpad = (GstOggPadData *)
436           gst_collect_pads_add_pad_full (ogg_mux->collect, newpad,
437           sizeof (GstOggPadData), gst_ogg_mux_ogg_pad_destroy_notify);
438       ogg_mux->active_pads++;
439
440       oggpad->map.serialno = serial;
441       ogg_stream_init (&oggpad->map.stream, oggpad->map.serialno);
442       oggpad->packetno = 0;
443       oggpad->pageno = 0;
444       oggpad->eos = FALSE;
445       /* we assume there will be some control data first for this pad */
446       oggpad->state = GST_OGG_PAD_STATE_CONTROL;
447       oggpad->new_page = TRUE;
448       oggpad->first_delta = FALSE;
449       oggpad->prev_delta = FALSE;
450       oggpad->data_pushed = FALSE;
451       oggpad->pagebuffers = g_queue_new ();
452       oggpad->map.headers = NULL;
453       oggpad->map.queued = NULL;
454
455       gst_segment_init (&oggpad->segment, GST_FORMAT_TIME);
456
457       oggpad->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (newpad);
458       gst_pad_set_event_function (newpad,
459           GST_DEBUG_FUNCPTR (gst_ogg_mux_sink_event));
460     }
461   }
462
463   /* setup some pad functions */
464   gst_pad_set_link_function (newpad, gst_ogg_mux_sinkconnect);
465
466   /* dd the pad to the element */
467   gst_element_add_pad (element, newpad);
468
469   return newpad;
470
471   /* ERRORS */
472 wrong_direction:
473   {
474     g_warning ("ogg_mux: request pad that is not a SINK pad\n");
475     return NULL;
476   }
477 wrong_template:
478   {
479     g_warning ("ogg_mux: this is not our template!\n");
480     return NULL;
481   }
482 }
483
484 static void
485 gst_ogg_mux_release_pad (GstElement * element, GstPad * pad)
486 {
487   GstOggMux *ogg_mux;
488
489   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
490
491   gst_collect_pads_remove_pad (ogg_mux->collect, pad);
492   gst_element_remove_pad (element, pad);
493
494   gst_object_unref (ogg_mux);
495 }
496
497 /* handle events */
498 static gboolean
499 gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event)
500 {
501   GstEventType type;
502
503   type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
504
505   switch (type) {
506     case GST_EVENT_SEEK:
507       /* disable seeking for now */
508       return FALSE;
509     default:
510       break;
511   }
512
513   return gst_pad_event_default (pad, event);
514 }
515
516 static GstBuffer *
517 gst_ogg_mux_buffer_from_page (GstOggMux * mux, ogg_page * page, gboolean delta)
518 {
519   GstBuffer *buffer;
520
521   /* allocate space for header and body */
522   buffer = gst_buffer_new_and_alloc (page->header_len + page->body_len);
523   memcpy (GST_BUFFER_DATA (buffer), page->header, page->header_len);
524   memcpy (GST_BUFFER_DATA (buffer) + page->header_len,
525       page->body, page->body_len);
526
527   /* Here we set granulepos as our OFFSET_END to give easy direct access to
528    * this value later. Before we push it, we reset this to OFFSET + SIZE
529    * (see gst_ogg_mux_push_buffer). */
530   GST_BUFFER_OFFSET_END (buffer) = ogg_page_granulepos (page);
531   if (delta)
532     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
533
534   GST_LOG_OBJECT (mux, GST_GP_FORMAT
535       " created buffer %p from ogg page",
536       GST_GP_CAST (ogg_page_granulepos (page)), buffer);
537
538   return buffer;
539 }
540
541 static GstFlowReturn
542 gst_ogg_mux_push_buffer (GstOggMux * mux, GstBuffer * buffer,
543     GstOggPadData * oggpad)
544 {
545   GstCaps *caps;
546
547   /* fix up OFFSET and OFFSET_END again */
548   GST_BUFFER_OFFSET (buffer) = mux->offset;
549   mux->offset += GST_BUFFER_SIZE (buffer);
550   GST_BUFFER_OFFSET_END (buffer) = mux->offset;
551
552   /* Ensure we have monotonically increasing timestamps in the output. */
553   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) {
554     gint64 run_time = GST_BUFFER_RUNNING_TIME (buffer, oggpad);
555     if (mux->last_ts != GST_CLOCK_TIME_NONE && run_time < mux->last_ts)
556       GST_BUFFER_TIMESTAMP (buffer) = mux->last_ts;
557     else
558       mux->last_ts = run_time;
559   }
560
561   caps = gst_pad_get_negotiated_caps (mux->srcpad);
562   gst_buffer_set_caps (buffer, caps);
563   if (caps)
564     gst_caps_unref (caps);
565
566   return gst_pad_push (mux->srcpad, buffer);
567 }
568
569 /* if all queues have at least one page, dequeue the page with the lowest
570  * timestamp */
571 static gboolean
572 gst_ogg_mux_dequeue_page (GstOggMux * mux, GstFlowReturn * flowret)
573 {
574   GSList *walk;
575   GstOggPadData *opad = NULL;   /* "oldest" pad */
576   GstClockTime oldest = GST_CLOCK_TIME_NONE;
577   GstBuffer *buf = NULL;
578   gboolean ret = FALSE;
579
580   *flowret = GST_FLOW_OK;
581
582   walk = mux->collect->data;
583   while (walk) {
584     GstOggPadData *pad = (GstOggPadData *) walk->data;
585
586     /* We need each queue to either be at EOS, or have one or more pages
587      * available with a set granulepos (i.e. not -1), otherwise we don't have
588      * enough data yet to determine which stream needs to go next for correct
589      * time ordering. */
590     if (pad->pagebuffers->length == 0) {
591       if (pad->eos) {
592         GST_LOG_OBJECT (pad->collect.pad,
593             "pad is EOS, skipping for dequeue decision");
594       } else {
595         GST_LOG_OBJECT (pad->collect.pad,
596             "no pages in this queue, can't dequeue");
597         return FALSE;
598       }
599     } else {
600       /* We then need to check for a non-negative granulepos */
601       int i;
602       gboolean valid = FALSE;
603
604       for (i = 0; i < pad->pagebuffers->length; i++) {
605         buf = g_queue_peek_nth (pad->pagebuffers, i);
606         /* Here we check the OFFSET_END, which is actually temporarily the
607          * granulepos value for this buffer */
608         if (GST_BUFFER_OFFSET_END (buf) != -1) {
609           valid = TRUE;
610           break;
611         }
612       }
613       if (!valid) {
614         GST_LOG_OBJECT (pad->collect.pad,
615             "No page timestamps in queue, can't dequeue");
616         return FALSE;
617       }
618     }
619
620     walk = g_slist_next (walk);
621   }
622
623   walk = mux->collect->data;
624   while (walk) {
625     GstOggPadData *pad = (GstOggPadData *) walk->data;
626
627     /* any page with a granulepos of -1 can be pushed immediately.
628      * TODO: it CAN be, but it seems silly to do so? */
629     buf = g_queue_peek_head (pad->pagebuffers);
630     while (buf && GST_BUFFER_OFFSET_END (buf) == -1) {
631       GST_LOG_OBJECT (pad->collect.pad, "[gp        -1] pushing page");
632       g_queue_pop_head (pad->pagebuffers);
633       *flowret = gst_ogg_mux_push_buffer (mux, buf, pad);
634       buf = g_queue_peek_head (pad->pagebuffers);
635       ret = TRUE;
636     }
637
638     if (buf) {
639       /* if no oldest buffer yet, take this one */
640       if (oldest == GST_CLOCK_TIME_NONE) {
641         GST_LOG_OBJECT (mux, "no oldest yet, taking buffer %p from pad %"
642             GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
643             buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
644         oldest = GST_BUFFER_OFFSET (buf);
645         opad = pad;
646       } else {
647         /* if we have an oldest, compare with this one */
648         if (GST_BUFFER_OFFSET (buf) < oldest) {
649           GST_LOG_OBJECT (mux, "older buffer %p, taking from pad %"
650               GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
651               buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
652           oldest = GST_BUFFER_OFFSET (buf);
653           opad = pad;
654         }
655       }
656     }
657     walk = g_slist_next (walk);
658   }
659
660   if (oldest != GST_CLOCK_TIME_NONE) {
661     g_assert (opad);
662     buf = g_queue_pop_head (opad->pagebuffers);
663     GST_LOG_OBJECT (opad->collect.pad,
664         GST_GP_FORMAT " pushing oldest page buffer %p (granulepos time %"
665         GST_TIME_FORMAT ")", GST_BUFFER_OFFSET_END (buf), buf,
666         GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
667     *flowret = gst_ogg_mux_push_buffer (mux, buf, opad);
668     ret = TRUE;
669   }
670
671   return ret;
672 }
673
674 /* put the given ogg page on a per-pad queue, timestamping it correctly.
675  * after that, dequeue and push as many pages as possible.
676  * Caller should make sure:
677  * pad->timestamp     was set with the timestamp of the first packet put
678  *                    on the page
679  * pad->timestamp_end was set with the timestamp + duration of the last packet
680  *                    put on the page
681  * pad->gp_time       was set with the time matching the gp of the last
682  *                    packet put on the page
683  *
684  * will also reset timestamp and timestamp_end, so caller func can restart
685  * counting.
686  */
687 static GstFlowReturn
688 gst_ogg_mux_pad_queue_page (GstOggMux * mux, GstOggPadData * pad,
689     ogg_page * page, gboolean delta)
690 {
691   GstFlowReturn ret;
692   GstBuffer *buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
693
694   /* take the timestamp of the first packet on this page */
695   GST_BUFFER_TIMESTAMP (buffer) = pad->timestamp;
696   GST_BUFFER_DURATION (buffer) = pad->timestamp_end - pad->timestamp;
697   /* take the gp time of the last completed packet on this page */
698   GST_BUFFER_OFFSET (buffer) = pad->gp_time;
699
700   /* the next page will start where the current page's end time leaves off */
701   pad->timestamp = pad->timestamp_end;
702
703   g_queue_push_tail (pad->pagebuffers, buffer);
704   GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
705       " queued buffer page %p (gp time %"
706       GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT
707       "), %d page buffers queued", GST_GP_CAST (ogg_page_granulepos (page)),
708       buffer, GST_TIME_ARGS (GST_BUFFER_OFFSET (buffer)),
709       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
710       g_queue_get_length (pad->pagebuffers));
711
712   while (gst_ogg_mux_dequeue_page (mux, &ret)) {
713     if (ret != GST_FLOW_OK)
714       break;
715   }
716
717   return ret;
718 }
719
720 /*
721  * Given two pads, compare the buffers queued on it.
722  * Returns:
723  *  0 if they have an equal priority
724  * -1 if the first is better
725  *  1 if the second is better
726  * Priority decided by: a) validity, b) older timestamp, c) smaller number
727  * of muxed pages
728  */
729 static gint
730 gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPadData * first,
731     GstOggPadData * second)
732 {
733   guint64 firsttime, secondtime;
734
735   /* if the first pad doesn't contain anything or is even NULL, return
736    * the second pad as best candidate and vice versa */
737   if (first == NULL || (first->buffer == NULL && first->next_buffer == NULL))
738     return 1;
739   if (second == NULL || (second->buffer == NULL && second->next_buffer == NULL))
740     return -1;
741
742   /* no timestamp on first buffer, it must go first */
743   if (first->buffer)
744     firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
745   else
746     firsttime = GST_BUFFER_TIMESTAMP (first->next_buffer);
747   if (firsttime == GST_CLOCK_TIME_NONE)
748     return -1;
749
750   /* no timestamp on second buffer, it must go first */
751   if (second->buffer)
752     secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
753   else
754     secondtime = GST_BUFFER_TIMESTAMP (second->next_buffer);
755   if (secondtime == GST_CLOCK_TIME_NONE)
756     return 1;
757
758   firsttime = gst_segment_to_running_time (&first->segment, GST_FORMAT_TIME,
759       firsttime);
760   secondtime = gst_segment_to_running_time (&second->segment, GST_FORMAT_TIME,
761       secondtime);
762
763   /* first buffer has higher timestamp, second one should go first */
764   if (secondtime < firsttime)
765     return 1;
766   /* second buffer has higher timestamp, first one should go first */
767   else if (secondtime > firsttime)
768     return -1;
769   else {
770     /* buffers with equal timestamps, prefer the pad that has the
771      * least number of pages muxed */
772     if (second->pageno < first->pageno)
773       return 1;
774     else if (second->pageno > first->pageno)
775       return -1;
776   }
777
778   /* same priority if all of the above failed */
779   return 0;
780 }
781
782 /* make sure at least one buffer is queued on all pads, two if possible
783  * 
784  * if pad->buffer == NULL, pad->next_buffer !=  NULL, then
785  *   we do not know if the buffer is the last or not
786  * if pad->buffer != NULL, pad->next_buffer != NULL, then
787  *   pad->buffer is not the last buffer for the pad
788  * if pad->buffer != NULL, pad->next_buffer == NULL, then
789  *   pad->buffer if the last buffer for the pad
790  * 
791  * returns a pointer to an oggpad that holds the best buffer, or
792  * NULL when no pad was usable. "best" means the buffer marked
793  * with the lowest timestamp. If best->buffer == NULL then nothing
794  * should be done until more data arrives */
795 static GstOggPadData *
796 gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
797 {
798   GstOggPadData *bestpad = NULL, *still_hungry = NULL;
799   GSList *walk;
800
801   /* try to make sure we have a buffer from each usable pad first */
802   walk = ogg_mux->collect->data;
803   while (walk) {
804     GstOggPadData *pad;
805     GstCollectData *data;
806
807     data = (GstCollectData *) walk->data;
808     pad = (GstOggPadData *) data;
809
810     walk = g_slist_next (walk);
811
812     GST_LOG_OBJECT (data->pad, "looking at pad for buffer");
813
814     /* try to get a new buffer for this pad if needed and possible */
815     if (pad->buffer == NULL) {
816       GstBuffer *buf;
817
818       /* shift the buffer along if needed (it's okay if next_buffer is NULL) */
819       if (pad->buffer == NULL) {
820         GST_LOG_OBJECT (data->pad, "shifting buffer %" GST_PTR_FORMAT,
821             pad->next_buffer);
822         pad->buffer = pad->next_buffer;
823         pad->next_buffer = NULL;
824       }
825
826       buf = gst_collect_pads_pop (ogg_mux->collect, data);
827       GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf);
828
829       /* On EOS we get a NULL buffer */
830       if (buf != NULL) {
831         if (ogg_mux->delta_pad == NULL &&
832             GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))
833           ogg_mux->delta_pad = pad;
834
835         /* if we need headers */
836         if (pad->state == GST_OGG_PAD_STATE_CONTROL) {
837           /* and we have one */
838           ogg_packet packet;
839           gboolean is_header;
840
841           packet.packet = GST_BUFFER_DATA (buf);
842           packet.bytes = GST_BUFFER_SIZE (buf);
843
844           if (GST_BUFFER_OFFSET_END_IS_VALID (buf))
845             packet.granulepos = GST_BUFFER_OFFSET_END (buf);
846           else
847             packet.granulepos = 0;
848
849           /* if we're not yet in data mode, ensure we're setup on the first packet */
850           if (!pad->have_type) {
851             /* Use headers in caps, if any; this will allow us to be resilient
852              * to starting streams on the fly, and some streams (like VP8
853              * at least) do not send headers packets, as other muxers don't
854              * expect/need them. */
855             pad->have_type =
856                 gst_ogg_stream_setup_map_from_caps_headers (&pad->map,
857                 GST_BUFFER_CAPS (buf));
858
859             if (!pad->have_type) {
860               /* fallback on the packet */
861               pad->have_type = gst_ogg_stream_setup_map (&pad->map, &packet);
862             }
863             if (!pad->have_type) {
864               GST_ERROR_OBJECT (pad, "mapper didn't recognise input stream "
865                   "(pad caps: %" GST_PTR_FORMAT ")", GST_PAD_CAPS (pad));
866             } else {
867               GST_DEBUG_OBJECT (pad, "caps detected: %" GST_PTR_FORMAT,
868                   pad->map.caps);
869             }
870           }
871
872           if (pad->have_type)
873             is_header = gst_ogg_stream_packet_is_header (&pad->map, &packet);
874           else                  /* fallback (FIXME 0.11: remove IN_CAPS hack) */
875             is_header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
876
877           if (is_header) {
878             GST_DEBUG_OBJECT (ogg_mux,
879                 "got header buffer in control state, ignoring");
880             /* just ignore */
881             pad->map.n_header_packets_seen++;
882             gst_buffer_unref (buf);
883             buf = NULL;
884           } else {
885             GST_DEBUG_OBJECT (ogg_mux,
886                 "got data buffer in control state, switching to data mode");
887             /* this is a data buffer so switch to data state */
888             pad->state = GST_OGG_PAD_STATE_DATA;
889           }
890         }
891       } else {
892         GST_DEBUG_OBJECT (data->pad, "EOS on pad");
893         if (!pad->eos) {
894           ogg_page page;
895
896           /* it's no longer active */
897           ogg_mux->active_pads--;
898
899           /* Just gone to EOS. Flush existing page(s) */
900           pad->eos = TRUE;
901
902           while (ogg_stream_flush (&pad->map.stream, &page)) {
903             /* Place page into the per-pad queue */
904             gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
905             /* increment the page number counter */
906             pad->pageno++;
907             /* mark other pages as delta */
908             pad->first_delta = TRUE;
909           }
910         }
911       }
912
913       pad->next_buffer = buf;
914     }
915
916     /* we should have a buffer now, see if it is the best pad to
917      * pull on */
918     if (pad->buffer || pad->next_buffer) {
919       if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) {
920         GST_LOG_OBJECT (data->pad,
921             "new best pad, with buffers %" GST_PTR_FORMAT
922             " and %" GST_PTR_FORMAT, pad->buffer, pad->next_buffer);
923
924         bestpad = pad;
925       }
926     } else if (!pad->eos) {
927       GST_LOG_OBJECT (data->pad, "hungry pad");
928       still_hungry = pad;
929     }
930   }
931
932   if (still_hungry)
933     /* drop back into collectpads... */
934     return still_hungry;
935   else
936     return bestpad;
937 }
938
939 static GList *
940 gst_ogg_mux_get_headers (GstOggPadData * pad)
941 {
942   GList *res = NULL;
943   GstStructure *structure;
944   GstCaps *caps;
945   GstPad *thepad;
946
947   thepad = pad->collect.pad;
948
949   GST_LOG_OBJECT (thepad, "getting headers");
950
951   caps = gst_pad_get_negotiated_caps (thepad);
952   if (caps != NULL) {
953     const GValue *streamheader;
954
955     structure = gst_caps_get_structure (caps, 0);
956     streamheader = gst_structure_get_value (structure, "streamheader");
957     if (streamheader != NULL) {
958       GST_LOG_OBJECT (thepad, "got header");
959       if (G_VALUE_TYPE (streamheader) == GST_TYPE_ARRAY) {
960         GArray *bufarr = g_value_peek_pointer (streamheader);
961         gint i;
962
963         GST_LOG_OBJECT (thepad, "got fixed list");
964
965         for (i = 0; i < bufarr->len; i++) {
966           GValue *bufval = &g_array_index (bufarr, GValue, i);
967
968           GST_LOG_OBJECT (thepad, "item %d", i);
969           if (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER) {
970             GstBuffer *buf = g_value_peek_pointer (bufval);
971
972             GST_LOG_OBJECT (thepad, "adding item %d to header list", i);
973
974             gst_buffer_ref (buf);
975             res = g_list_append (res, buf);
976           }
977         }
978       } else {
979         GST_LOG_OBJECT (thepad, "streamheader is not fixed list");
980       }
981
982       /* Start a new page for every CMML buffer */
983       if (gst_structure_has_name (structure, "text/x-cmml"))
984         pad->always_flush_page = TRUE;
985     } else if (gst_structure_has_name (structure, "video/x-dirac")) {
986       res = g_list_append (res, pad->buffer);
987       pad->buffer = pad->next_buffer;
988       pad->next_buffer = NULL;
989       pad->always_flush_page = TRUE;
990     } else {
991       GST_LOG_OBJECT (thepad, "caps don't have streamheader");
992     }
993     gst_caps_unref (caps);
994   } else {
995     GST_LOG_OBJECT (thepad, "got empty caps as negotiated format");
996   }
997   return res;
998 }
999
1000 static GstCaps *
1001 gst_ogg_mux_set_header_on_caps (GstCaps * caps, GList * buffers)
1002 {
1003   GstStructure *structure;
1004   GValue array = { 0 };
1005   GList *walk = buffers;
1006
1007   caps = gst_caps_make_writable (caps);
1008
1009   structure = gst_caps_get_structure (caps, 0);
1010
1011   /* put buffers in a fixed list */
1012   g_value_init (&array, GST_TYPE_ARRAY);
1013
1014   while (walk) {
1015     GstBuffer *buf = GST_BUFFER (walk->data);
1016     GstBuffer *copy;
1017     GValue value = { 0 };
1018
1019     walk = walk->next;
1020
1021     /* mark buffer */
1022     GST_LOG ("Setting IN_CAPS on buffer of length %d", GST_BUFFER_SIZE (buf));
1023     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
1024
1025     g_value_init (&value, GST_TYPE_BUFFER);
1026     copy = gst_buffer_copy (buf);
1027     gst_value_set_buffer (&value, copy);
1028     gst_buffer_unref (copy);
1029     gst_value_array_append_value (&array, &value);
1030     g_value_unset (&value);
1031   }
1032   gst_structure_set_value (structure, "streamheader", &array);
1033   g_value_unset (&array);
1034
1035   return caps;
1036 }
1037
1038 /*
1039  * For each pad we need to write out one (small) header in one
1040  * page that allows decoders to identify the type of the stream.
1041  * After that we need to write out all extra info for the decoders.
1042  * In the case of a codec that also needs data as configuration, we can
1043  * find that info in the streamcaps. 
1044  * After writing the headers we must start a new page for the data.
1045  */
1046 static GstFlowReturn
1047 gst_ogg_mux_send_headers (GstOggMux * mux)
1048 {
1049   GSList *walk;
1050   GList *hbufs, *hwalk;
1051   GstCaps *caps;
1052   GstFlowReturn ret;
1053
1054   hbufs = NULL;
1055   ret = GST_FLOW_OK;
1056
1057   GST_LOG_OBJECT (mux, "collecting headers");
1058
1059   walk = mux->collect->data;
1060   while (walk) {
1061     GstOggPadData *pad;
1062     GstPad *thepad;
1063
1064     pad = (GstOggPadData *) walk->data;
1065     thepad = pad->collect.pad;
1066
1067     walk = g_slist_next (walk);
1068
1069     GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad));
1070
1071     /* if the pad has no buffer, we don't care */
1072     if (pad->buffer == NULL && pad->next_buffer == NULL)
1073       continue;
1074
1075     /* now figure out the headers */
1076     pad->map.headers = gst_ogg_mux_get_headers (pad);
1077   }
1078
1079   GST_LOG_OBJECT (mux, "creating BOS pages");
1080   walk = mux->collect->data;
1081   while (walk) {
1082     GstOggPadData *pad;
1083     GstBuffer *buf;
1084     ogg_packet packet;
1085     ogg_page page;
1086     GstPad *thepad;
1087     GstCaps *caps;
1088     GstStructure *structure;
1089     GstBuffer *hbuf;
1090
1091     pad = (GstOggPadData *) walk->data;
1092     thepad = pad->collect.pad;
1093     caps = gst_pad_get_negotiated_caps (thepad);
1094     structure = gst_caps_get_structure (caps, 0);
1095
1096     walk = walk->next;
1097
1098     pad->packetno = 0;
1099
1100     GST_LOG_OBJECT (thepad, "looping over headers");
1101
1102     if (pad->map.headers) {
1103       buf = GST_BUFFER (pad->map.headers->data);
1104       pad->map.headers = g_list_remove (pad->map.headers, buf);
1105     } else if (pad->buffer) {
1106       buf = pad->buffer;
1107       gst_buffer_ref (buf);
1108     } else if (pad->next_buffer) {
1109       buf = pad->next_buffer;
1110       gst_buffer_ref (buf);
1111     } else {
1112       /* fixme -- should be caught in the previous list traversal. */
1113       GST_OBJECT_LOCK (thepad);
1114       g_critical ("No headers or buffers on pad %s:%s",
1115           GST_DEBUG_PAD_NAME (thepad));
1116       GST_OBJECT_UNLOCK (thepad);
1117       continue;
1118     }
1119
1120     /* create a packet from the buffer */
1121     packet.packet = GST_BUFFER_DATA (buf);
1122     packet.bytes = GST_BUFFER_SIZE (buf);
1123     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1124     if (packet.granulepos == -1)
1125       packet.granulepos = 0;
1126     /* mark BOS and packet number */
1127     packet.b_o_s = (pad->packetno == 0);
1128     packet.packetno = pad->packetno++;
1129     /* mark EOS */
1130     packet.e_o_s = 0;
1131
1132     /* swap the packet in */
1133     ogg_stream_packetin (&pad->map.stream, &packet);
1134     gst_buffer_unref (buf);
1135
1136     GST_LOG_OBJECT (thepad, "flushing out BOS page");
1137     if (!ogg_stream_flush (&pad->map.stream, &page))
1138       g_critical ("Could not flush BOS page");
1139
1140     hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1141
1142     GST_LOG_OBJECT (mux, "swapped out page with mime type %s",
1143         gst_structure_get_name (structure));
1144
1145     /* quick hack: put Theora, VP8 and Dirac video pages at the front.
1146      * Ideally, we would have a settable enum for which Ogg
1147      * profile we work with, and order based on that.
1148      * (FIXME: if there is more than one video stream, shouldn't we only put
1149      * one's BOS into the first page, followed by an audio stream's BOS, and
1150      * only then followed by the remaining video and audio streams?) */
1151     if (gst_structure_has_name (structure, "video/x-theora")) {
1152       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "Theora");
1153       hbufs = g_list_prepend (hbufs, hbuf);
1154     } else if (gst_structure_has_name (structure, "video/x-dirac")) {
1155       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "Dirac");
1156       hbufs = g_list_prepend (hbufs, hbuf);
1157       pad->always_flush_page = TRUE;
1158     } else if (gst_structure_has_name (structure, "video/x-vp8")) {
1159       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "VP8");
1160       hbufs = g_list_prepend (hbufs, hbuf);
1161     } else {
1162       hbufs = g_list_append (hbufs, hbuf);
1163     }
1164     gst_caps_unref (caps);
1165   }
1166
1167   GST_LOG_OBJECT (mux, "creating next headers");
1168   walk = mux->collect->data;
1169   while (walk) {
1170     GstOggPadData *pad;
1171     GstPad *thepad;
1172
1173     pad = (GstOggPadData *) walk->data;
1174     thepad = pad->collect.pad;
1175
1176     walk = walk->next;
1177
1178     GST_LOG_OBJECT (mux, "looping over headers for pad %s:%s",
1179         GST_DEBUG_PAD_NAME (thepad));
1180
1181     hwalk = pad->map.headers;
1182     while (hwalk) {
1183       GstBuffer *buf = GST_BUFFER (hwalk->data);
1184       ogg_packet packet;
1185       ogg_page page;
1186
1187       hwalk = hwalk->next;
1188
1189       /* create a packet from the buffer */
1190       packet.packet = GST_BUFFER_DATA (buf);
1191       packet.bytes = GST_BUFFER_SIZE (buf);
1192       packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1193       if (packet.granulepos == -1)
1194         packet.granulepos = 0;
1195       /* mark BOS and packet number */
1196       packet.b_o_s = (pad->packetno == 0);
1197       packet.packetno = pad->packetno++;
1198       /* mark EOS */
1199       packet.e_o_s = 0;
1200
1201       /* swap the packet in */
1202       ogg_stream_packetin (&pad->map.stream, &packet);
1203       gst_buffer_unref (buf);
1204
1205       /* if last header, flush page */
1206       if (hwalk == NULL) {
1207         GST_LOG_OBJECT (mux,
1208             "flushing page as packet %" G_GUINT64_FORMAT " is first or "
1209             "last packet", (guint64) packet.packetno);
1210         while (ogg_stream_flush (&pad->map.stream, &page)) {
1211           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1212
1213           GST_LOG_OBJECT (mux, "swapped out page");
1214           hbufs = g_list_append (hbufs, hbuf);
1215         }
1216       } else {
1217         GST_LOG_OBJECT (mux, "try to swap out page");
1218         /* just try to swap out a page then */
1219         while (ogg_stream_pageout (&pad->map.stream, &page) > 0) {
1220           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1221
1222           GST_LOG_OBJECT (mux, "swapped out page");
1223           hbufs = g_list_append (hbufs, hbuf);
1224         }
1225       }
1226     }
1227     g_list_free (pad->map.headers);
1228     pad->map.headers = NULL;
1229   }
1230   /* hbufs holds all buffers for the headers now */
1231
1232   /* create caps with the buffers */
1233   caps = gst_pad_get_caps (mux->srcpad);
1234   if (caps) {
1235     caps = gst_ogg_mux_set_header_on_caps (caps, hbufs);
1236     gst_pad_set_caps (mux->srcpad, caps);
1237     gst_caps_unref (caps);
1238   }
1239   /* and send the buffers */
1240   while (hbufs != NULL) {
1241     GstBuffer *buf = GST_BUFFER (hbufs->data);
1242
1243     hbufs = g_list_delete_link (hbufs, hbufs);
1244
1245     if ((ret = gst_ogg_mux_push_buffer (mux, buf, NULL)) != GST_FLOW_OK)
1246       break;
1247   }
1248   /* free any remaining nodes/buffers in case we couldn't push them */
1249   g_list_foreach (hbufs, (GFunc) gst_mini_object_unref, NULL);
1250   g_list_free (hbufs);
1251
1252   return ret;
1253 }
1254
1255 /* this function is called to process data on the best pending pad.
1256  *
1257  * basic idea:
1258  *
1259  * 1) store the selected pad and keep on pulling until we fill a
1260  *    complete ogg page or the ogg page is filled above the max-delay
1261  *    threshold. This is needed because the ogg spec says that
1262  *    you should fill a complete page with data from the same logical
1263  *    stream. When the page is filled, go back to 1).
1264  * 2) before filling a page, read ahead one more buffer to see if this
1265  *    packet is the last of the stream. We need to do this because the ogg
1266  *    spec mandates that the last packet should have the EOS flag set before
1267  *    sending it to ogg. if pad->buffer is NULL we need to wait to find out
1268  *    whether there are any more buffers.
1269  * 3) pages get queued on a per-pad queue. Every time a page is queued, a
1270  *    dequeue is called, which will dequeue the oldest page on any pad, provided
1271  *    that ALL pads have at least one marked page in the queue (or remaining
1272  *    pads are at EOS)
1273  */
1274 static GstFlowReturn
1275 gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPadData * best)
1276 {
1277   GstFlowReturn ret = GST_FLOW_OK;
1278   gboolean delta_unit;
1279   gint64 granulepos = 0;
1280   GstClockTime timestamp, gp_time;
1281
1282   GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT
1283       ", currently pulling from %" GST_PTR_FORMAT, best->collect.pad,
1284       ogg_mux->pulling);
1285
1286   /* best->buffer is non-NULL, either the pad is EOS's or there is a next 
1287    * buffer */
1288   if (best->next_buffer == NULL && !best->eos) {
1289     GST_WARNING_OBJECT (ogg_mux, "no subsequent buffer and EOS not reached");
1290     return GST_FLOW_WRONG_STATE;
1291   }
1292
1293   /* if we were already pulling from one pad, but the new "best" buffer is
1294    * from another pad, we need to check if we have reason to flush a page
1295    * for the pad we were pulling from before */
1296   if (ogg_mux->pulling && best &&
1297       ogg_mux->pulling != best && ogg_mux->pulling->buffer) {
1298     GstOggPadData *pad = ogg_mux->pulling;
1299     GstClockTime last_ts = GST_BUFFER_END_TIME (pad->buffer);
1300
1301     /* if the next packet in the current page is going to make the page
1302      * too long, we need to flush */
1303     if (last_ts > ogg_mux->next_ts + ogg_mux->max_delay) {
1304       ogg_page page;
1305
1306       GST_LOG_OBJECT (pad->collect.pad,
1307           GST_GP_FORMAT " stored packet %" G_GINT64_FORMAT
1308           " will make page too long, flushing",
1309           GST_BUFFER_OFFSET_END (pad->buffer),
1310           (gint64) pad->map.stream.packetno);
1311
1312       while (ogg_stream_flush (&pad->map.stream, &page)) {
1313         /* end time of this page is the timestamp of the next buffer */
1314         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1315         /* Place page into the per-pad queue */
1316         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1317             pad->first_delta);
1318         /* increment the page number counter */
1319         pad->pageno++;
1320         /* mark other pages as delta */
1321         pad->first_delta = TRUE;
1322       }
1323       pad->new_page = TRUE;
1324       ogg_mux->pulling = NULL;
1325     }
1326   }
1327
1328   /* if we don't know which pad to pull on, use the best one */
1329   if (ogg_mux->pulling == NULL) {
1330     ogg_mux->pulling = best;
1331     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from best pad");
1332
1333     /* remember timestamp and gp time of first buffer for this new pad */
1334     if (ogg_mux->pulling != NULL) {
1335       ogg_mux->next_ts = GST_BUFFER_TIMESTAMP (ogg_mux->pulling->buffer);
1336       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "updated times, next ts %"
1337           GST_TIME_FORMAT, GST_TIME_ARGS (ogg_mux->next_ts));
1338     } else {
1339       /* no pad to pull on, send EOS */
1340       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1341       return GST_FLOW_WRONG_STATE;
1342     }
1343   }
1344
1345   if (ogg_mux->need_headers) {
1346     ret = gst_ogg_mux_send_headers (ogg_mux);
1347     ogg_mux->need_headers = FALSE;
1348   }
1349
1350   /* we are pulling from a pad, continue to do so until a page
1351    * has been filled and queued */
1352   if (ogg_mux->pulling != NULL) {
1353     ogg_packet packet;
1354     ogg_page page;
1355     GstBuffer *buf, *tmpbuf;
1356     GstOggPadData *pad = ogg_mux->pulling;
1357     gint64 duration;
1358     gboolean force_flush;
1359
1360     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from pad");
1361
1362     /* now see if we have a buffer */
1363     buf = pad->buffer;
1364     if (buf == NULL) {
1365       GST_DEBUG_OBJECT (ogg_mux, "pad was EOS");
1366       ogg_mux->pulling = NULL;
1367       return GST_FLOW_OK;
1368     }
1369
1370     delta_unit = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1371     duration = GST_BUFFER_DURATION (buf);
1372
1373     /* if the current "next timestamp" on the pad is unset, then this is the
1374      * first packet on the new page.  Update our pad's page timestamp */
1375     if (ogg_mux->pulling->timestamp == GST_CLOCK_TIME_NONE) {
1376       ogg_mux->pulling->timestamp = GST_BUFFER_TIMESTAMP (buf);
1377       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad,
1378           "updated pad timestamp to %" GST_TIME_FORMAT,
1379           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
1380     }
1381     /* create a packet from the buffer */
1382     packet.packet = GST_BUFFER_DATA (buf);
1383     packet.bytes = GST_BUFFER_SIZE (buf);
1384     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1385     if (packet.granulepos == -1)
1386       packet.granulepos = 0;
1387     /* mark BOS and packet number */
1388     packet.b_o_s = (pad->packetno == 0);
1389     packet.packetno = pad->packetno++;
1390     GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
1391         " packet %" G_GINT64_FORMAT " (%ld bytes) created from buffer",
1392         GST_GP_CAST (packet.granulepos), (gint64) packet.packetno,
1393         packet.bytes);
1394
1395     packet.e_o_s = (pad->eos ? 1 : 0);
1396     tmpbuf = NULL;
1397
1398     /* we flush when we see a new keyframe */
1399     force_flush = (pad->prev_delta && !delta_unit) || pad->always_flush_page;
1400     if (duration != -1) {
1401       pad->duration += duration;
1402       /* if page duration exceeds max, flush page */
1403       if (pad->duration > ogg_mux->max_page_delay) {
1404         force_flush = TRUE;
1405         pad->duration = 0;
1406       }
1407     }
1408
1409     if (GST_BUFFER_IS_DISCONT (buf)) {
1410       if (pad->data_pushed) {
1411         GST_LOG_OBJECT (pad->collect.pad, "got discont");
1412         packet.packetno++;
1413         /* No public API for this; hack things in */
1414         pad->map.stream.pageno++;
1415         force_flush = TRUE;
1416       } else {
1417         GST_LOG_OBJECT (pad->collect.pad, "discont at stream start");
1418       }
1419     }
1420
1421     /* flush the currently built page if necessary */
1422     if (force_flush) {
1423       GST_LOG_OBJECT (pad->collect.pad,
1424           GST_GP_FORMAT " forced flush of page before this packet",
1425           GST_BUFFER_OFFSET_END (pad->buffer));
1426       while (ogg_stream_flush (&pad->map.stream, &page)) {
1427         /* end time of this page is the timestamp of the next buffer */
1428         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1429         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1430             pad->first_delta);
1431
1432         /* increment the page number counter */
1433         pad->pageno++;
1434         /* mark other pages as delta */
1435         pad->first_delta = TRUE;
1436       }
1437       pad->new_page = TRUE;
1438     }
1439
1440     /* if this is the first packet of a new page figure out the delta flag */
1441     if (pad->new_page) {
1442       if (delta_unit) {
1443         /* mark the page as delta */
1444         pad->first_delta = TRUE;
1445       } else {
1446         /* got a keyframe */
1447         if (ogg_mux->delta_pad == pad) {
1448           /* if we get it on the pad with deltaunits,
1449            * we mark the page as non delta */
1450           pad->first_delta = FALSE;
1451         } else if (ogg_mux->delta_pad != NULL) {
1452           /* if there are pads with delta frames, we
1453            * must mark this one as delta */
1454           pad->first_delta = TRUE;
1455         } else {
1456           pad->first_delta = FALSE;
1457         }
1458       }
1459       pad->new_page = FALSE;
1460     }
1461
1462     /* save key unit to track delta->key unit transitions */
1463     pad->prev_delta = delta_unit;
1464
1465     /* swap the packet in */
1466     if (packet.e_o_s == 1)
1467       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in EOS packet");
1468     if (packet.b_o_s == 1)
1469       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in BOS packet");
1470
1471     ogg_stream_packetin (&pad->map.stream, &packet);
1472     pad->data_pushed = TRUE;
1473
1474     gp_time = GST_BUFFER_OFFSET (pad->buffer);
1475     granulepos = GST_BUFFER_OFFSET_END (pad->buffer);
1476     timestamp = GST_BUFFER_TIMESTAMP (pad->buffer);
1477
1478     GST_LOG_OBJECT (pad->collect.pad,
1479         GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", gp time %"
1480         GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT " packetin'd",
1481         granulepos, (gint64) packet.packetno, GST_TIME_ARGS (gp_time),
1482         GST_TIME_ARGS (timestamp));
1483     /* don't need the old buffer anymore */
1484     gst_buffer_unref (pad->buffer);
1485     /* store new readahead buffer */
1486     pad->buffer = tmpbuf;
1487
1488     /* let ogg write out the pages now. The packet we got could end
1489      * up in more than one page so we need to write them all */
1490     if (ogg_stream_pageout (&pad->map.stream, &page) > 0) {
1491       /* we have a new page, so we need to timestamp it correctly.
1492        * if this fresh packet ends on this page, then the page's granulepos
1493        * comes from that packet, and we should set this buffer's timestamp */
1494
1495       GST_LOG_OBJECT (pad->collect.pad,
1496           GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", time %"
1497           GST_TIME_FORMAT ") caused new page",
1498           granulepos, (gint64) packet.packetno, GST_TIME_ARGS (timestamp));
1499       GST_LOG_OBJECT (pad->collect.pad,
1500           GST_GP_FORMAT " new page %ld",
1501           GST_GP_CAST (ogg_page_granulepos (&page)), pad->map.stream.pageno);
1502
1503       if (ogg_page_granulepos (&page) == granulepos) {
1504         /* the packet we streamed in finishes on the current page,
1505          * because the page's granulepos is the granulepos of the last
1506          * packet completed on that page,
1507          * so update the timestamp that we will give to the page */
1508         GST_LOG_OBJECT (pad->collect.pad,
1509             GST_GP_FORMAT
1510             " packet finishes on current page, updating gp time to %"
1511             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (gp_time));
1512         pad->gp_time = gp_time;
1513       } else {
1514         GST_LOG_OBJECT (pad->collect.pad,
1515             GST_GP_FORMAT
1516             " packet spans beyond current page, keeping old gp time %"
1517             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (pad->gp_time));
1518       }
1519
1520       /* push the page */
1521       /* end time of this page is the timestamp of the next buffer */
1522       pad->timestamp_end = timestamp;
1523       ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
1524       pad->pageno++;
1525       /* mark next pages as delta */
1526       pad->first_delta = TRUE;
1527
1528       /* use an inner loop here to flush the remaining pages and
1529        * mark them as delta frames as well */
1530       while (ogg_stream_pageout (&pad->map.stream, &page) > 0) {
1531         if (ogg_page_granulepos (&page) == granulepos) {
1532           /* the page has taken up the new packet completely, which means
1533            * the packet ends the page and we can update the gp time
1534            * before pushing out */
1535           pad->gp_time = gp_time;
1536         }
1537
1538         /* we have a complete page now, we can push the page
1539          * and make sure to pull on a new pad the next time around */
1540         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1541             pad->first_delta);
1542         /* increment the page number counter */
1543         pad->pageno++;
1544       }
1545       /* need a new page as well */
1546       pad->new_page = TRUE;
1547       pad->duration = 0;
1548       /* we're done pulling on this pad, make sure to choose a new
1549        * pad for pulling in the next iteration */
1550       ogg_mux->pulling = NULL;
1551     }
1552
1553     /* Update the gp time, if necessary, since any future page will have at
1554      * least this gp time.
1555      */
1556     if (pad->gp_time < gp_time) {
1557       pad->gp_time = gp_time;
1558       GST_LOG_OBJECT (pad->collect.pad,
1559           "Updated running gp time of pad %" GST_PTR_FORMAT
1560           " to %" GST_TIME_FORMAT, pad->collect.pad, GST_TIME_ARGS (gp_time));
1561     }
1562   }
1563
1564   return ret;
1565 }
1566
1567 /* all_pads_eos:
1568  *
1569  * Checks if all pads are EOS'd by peeking.
1570  *
1571  * Returns TRUE if all pads are EOS.
1572  */
1573 static gboolean
1574 all_pads_eos (GstCollectPads * pads)
1575 {
1576   GSList *walk;
1577   gboolean alleos = TRUE;
1578
1579   walk = pads->data;
1580   while (walk) {
1581     GstBuffer *buf;
1582     GstCollectData *data = (GstCollectData *) walk->data;
1583
1584     buf = gst_collect_pads_peek (pads, data);
1585     if (buf) {
1586       alleos = FALSE;
1587       gst_buffer_unref (buf);
1588       goto beach;
1589     }
1590     walk = walk->next;
1591   }
1592 beach:
1593   return alleos;
1594 }
1595
1596 /* This function is called when there is data on all pads.
1597  * 
1598  * It finds a pad to pull on, this is done by looking at the buffers
1599  * to decide which one to use, and using the 'oldest' one first. It then calls
1600  * gst_ogg_mux_process_best_pad() to process as much data as possible.
1601  * 
1602  * If all the pads have received EOS, it flushes out all data by continually
1603  * getting the best pad and calling gst_ogg_mux_process_best_pad() until they
1604  * are all empty, and then sends EOS.
1605  */
1606 static GstFlowReturn
1607 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
1608 {
1609   GstOggPadData *best;
1610   GstFlowReturn ret;
1611   gint activebefore;
1612
1613   GST_LOG_OBJECT (ogg_mux, "collected");
1614
1615   activebefore = ogg_mux->active_pads;
1616
1617   /* queue buffers on all pads; find a buffer with the lowest timestamp */
1618   best = gst_ogg_mux_queue_pads (ogg_mux);
1619   if (best && !best->buffer) {
1620     GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
1621     return GST_FLOW_OK;
1622   }
1623
1624   if (!best) {
1625     return GST_FLOW_WRONG_STATE;
1626   }
1627
1628   ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1629
1630   if (ogg_mux->active_pads < activebefore) {
1631     /* If the active pad count went down, this mean at least one pad has gone
1632      * EOS. Since CollectPads only calls _collected() once when all pads are
1633      * EOS, and our code doesn't _pop() from all pads we need to check that by
1634      * peeking on all pads, else we won't be called again and the muxing will
1635      * not terminate (push out EOS). */
1636
1637     /* if all the pads have been removed, flush all pending data */
1638     if ((ret == GST_FLOW_OK) && all_pads_eos (pads)) {
1639       GST_LOG_OBJECT (ogg_mux, "no pads remaining, flushing data");
1640
1641       do {
1642         best = gst_ogg_mux_queue_pads (ogg_mux);
1643         if (best)
1644           ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1645       } while ((ret == GST_FLOW_OK) && (best != NULL));
1646
1647       GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
1648       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1649     }
1650   }
1651
1652   return ret;
1653 }
1654
1655 static void
1656 gst_ogg_mux_get_property (GObject * object,
1657     guint prop_id, GValue * value, GParamSpec * pspec)
1658 {
1659   GstOggMux *ogg_mux;
1660
1661   ogg_mux = GST_OGG_MUX (object);
1662
1663   switch (prop_id) {
1664     case ARG_MAX_DELAY:
1665       g_value_set_uint64 (value, ogg_mux->max_delay);
1666       break;
1667     case ARG_MAX_PAGE_DELAY:
1668       g_value_set_uint64 (value, ogg_mux->max_page_delay);
1669       break;
1670     default:
1671       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1672       break;
1673   }
1674 }
1675
1676 static void
1677 gst_ogg_mux_set_property (GObject * object,
1678     guint prop_id, const GValue * value, GParamSpec * pspec)
1679 {
1680   GstOggMux *ogg_mux;
1681
1682   ogg_mux = GST_OGG_MUX (object);
1683
1684   switch (prop_id) {
1685     case ARG_MAX_DELAY:
1686       ogg_mux->max_delay = g_value_get_uint64 (value);
1687       break;
1688     case ARG_MAX_PAGE_DELAY:
1689       ogg_mux->max_page_delay = g_value_get_uint64 (value);
1690       break;
1691     default:
1692       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1693       break;
1694   }
1695 }
1696
1697 /* reset all variables in the ogg pads. */
1698 static void
1699 gst_ogg_mux_init_collectpads (GstCollectPads * collect)
1700 {
1701   GSList *walk;
1702
1703   walk = collect->data;
1704   while (walk) {
1705     GstOggPadData *oggpad = (GstOggPadData *) walk->data;
1706
1707     ogg_stream_init (&oggpad->map.stream, oggpad->map.serialno);
1708     oggpad->packetno = 0;
1709     oggpad->pageno = 0;
1710     oggpad->eos = FALSE;
1711     /* we assume there will be some control data first for this pad */
1712     oggpad->state = GST_OGG_PAD_STATE_CONTROL;
1713     oggpad->new_page = TRUE;
1714     oggpad->first_delta = FALSE;
1715     oggpad->prev_delta = FALSE;
1716     oggpad->data_pushed = FALSE;
1717     oggpad->pagebuffers = g_queue_new ();
1718
1719     gst_segment_init (&oggpad->segment, GST_FORMAT_TIME);
1720
1721     walk = g_slist_next (walk);
1722   }
1723 }
1724
1725 /* Clear all buffers from the collectpads object */
1726 static void
1727 gst_ogg_mux_clear_collectpads (GstCollectPads * collect)
1728 {
1729   GSList *walk;
1730
1731   for (walk = collect->data; walk; walk = g_slist_next (walk)) {
1732     GstOggPadData *oggpad = (GstOggPadData *) walk->data;
1733     GstBuffer *buf;
1734
1735     ogg_stream_clear (&oggpad->map.stream);
1736
1737     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
1738       gst_buffer_unref (buf);
1739     }
1740     g_queue_free (oggpad->pagebuffers);
1741     oggpad->pagebuffers = NULL;
1742
1743     if (oggpad->buffer) {
1744       gst_buffer_unref (oggpad->buffer);
1745       oggpad->buffer = NULL;
1746     }
1747     if (oggpad->next_buffer) {
1748       gst_buffer_unref (oggpad->next_buffer);
1749       oggpad->next_buffer = NULL;
1750     }
1751
1752     gst_segment_init (&oggpad->segment, GST_FORMAT_TIME);
1753   }
1754 }
1755
1756 static GstStateChangeReturn
1757 gst_ogg_mux_change_state (GstElement * element, GstStateChange transition)
1758 {
1759   GstOggMux *ogg_mux;
1760   GstStateChangeReturn ret;
1761
1762   ogg_mux = GST_OGG_MUX (element);
1763
1764   switch (transition) {
1765     case GST_STATE_CHANGE_NULL_TO_READY:
1766       break;
1767     case GST_STATE_CHANGE_READY_TO_PAUSED:
1768       gst_ogg_mux_clear (ogg_mux);
1769       gst_ogg_mux_init_collectpads (ogg_mux->collect);
1770       gst_collect_pads_start (ogg_mux->collect);
1771       break;
1772     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1773       break;
1774     case GST_STATE_CHANGE_PAUSED_TO_READY:
1775       gst_collect_pads_stop (ogg_mux->collect);
1776       break;
1777     default:
1778       break;
1779   }
1780
1781   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1782
1783   switch (transition) {
1784     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1785       break;
1786     case GST_STATE_CHANGE_PAUSED_TO_READY:
1787       gst_ogg_mux_clear_collectpads (ogg_mux->collect);
1788       break;
1789     case GST_STATE_CHANGE_READY_TO_NULL:
1790       break;
1791     default:
1792       break;
1793   }
1794
1795   return ret;
1796 }
1797
1798 gboolean
1799 gst_ogg_mux_plugin_init (GstPlugin * plugin)
1800 {
1801   GST_DEBUG_CATEGORY_INIT (gst_ogg_mux_debug, "oggmux", 0, "ogg muxer");
1802
1803   return gst_element_register (plugin, "oggmux", GST_RANK_PRIMARY,
1804       GST_TYPE_OGG_MUX);
1805 }