Added gst-plugins-base-subtitles0.10-0.10.34 for Meego Harmattan 1.2
[mafwsubrenderer] / gst-plugins-base-subtitles0.10 / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-appsrc
23  *
24  * The appsrc element can be used by applications to insert data into a
25  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
26  * external API functions.
27  *
28  * For the documentation of the API, please see the
29  * <link linkend="gst-plugins-base-libs-appsrc">libgstapp</link> section in the
30  * GStreamer Plugins Base Libraries documentation.
31  * 
32  * Since: 0.10.22
33  */
34
35 /**
36  * SECTION:gstappsrc
37  * @short_description: Easy way for applications to inject buffers into a
38  *     pipeline
39  * @see_also: #GstBaseSrc, appsink
40  *
41  * The appsrc element can be used by applications to insert data into a
42  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
43  * external API functions.
44  *
45  * appsrc can be used by linking with the libgstapp library to access the
46  * methods directly or by using the appsrc action signals.
47  *
48  * Before operating appsrc, the caps property must be set to a fixed caps
49  * describing the format of the data that will be pushed with appsrc. An
50  * exception to this is when pushing buffers with unknown caps, in which case no
51  * caps should be set. This is typically true of file-like sources that push raw
52  * byte buffers.
53  *
54  * The main way of handing data to the appsrc element is by calling the
55  * gst_app_src_push_buffer() method or by emiting the push-buffer action signal.
56  * This will put the buffer onto a queue from which appsrc will read from in its
57  * streaming thread. It is important to note that data transport will not happen
58  * from the thread that performed the push-buffer call.
59  *
60  * The "max-bytes" property controls how much data can be queued in appsrc
61  * before appsrc considers the queue full. A filled internal queue will always
62  * signal the "enough-data" signal, which signals the application that it should
63  * stop pushing data into appsrc. The "block" property will cause appsrc to
64  * block the push-buffer method until free data becomes available again.
65  *
66  * When the internal queue is running out of data, the "need-data" signal is
67  * emited, which signals the application that it should start pushing more data
68  * into appsrc.
69  *
70  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
71  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
72  * "random-access". The signal argument will contain the new desired position in
73  * the stream expressed in the unit set with the "format" property. After
74  * receiving the seek-data signal, the application should push-buffers from the
75  * new position.
76  *
77  * These signals allow the application to operate the appsrc in two different
78  * ways:
79  *
80  * The push model, in which the application repeadedly calls the push-buffer method
81  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
82  * with the enough-data and need-data signals by respectively stopping/starting
83  * the push-buffer calls. This is a typical mode of operation for the
84  * stream-type "stream" and "seekable". Use this model when implementing various
85  * network protocols or hardware devices.
86  *
87  * The pull model where the need-data signal triggers the next push-buffer call.
88  * This mode is typically used in the "random-access" stream-type. Use this
89  * model for file access or other randomly accessable sources. In this mode, a
90  * buffer of exactly the amount of bytes given by the need-data signal should be
91  * pushed into appsrc.
92  *
93  * In all modes, the size property on appsrc should contain the total stream
94  * size in bytes. Setting this property is mandatory in the random-access mode.
95  * For the stream and seekable modes, setting this property is optional but
96  * recommended.
97  *
98  * When the application is finished pushing data into appsrc, it should call 
99  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
100  * this call, no more buffers can be pushed into appsrc until a flushing seek
101  * happened or the state of the appsrc has gone through READY.
102  *
103  * Last reviewed on 2008-12-17 (0.10.10)
104  *
105  * Since: 0.10.22
106  */
107
108 #ifdef HAVE_CONFIG_H
109 #include "config.h"
110 #endif
111
112 #include <gst/gst.h>
113 #include <gst/base/gstbasesrc.h>
114
115 #include <string.h>
116
117 #include "gstapp-marshal.h"
118 #include "gstappsrc.h"
119
120 struct _GstAppSrcPrivate
121 {
122   GCond *cond;
123   GMutex *mutex;
124   GQueue *queue;
125
126   GstCaps *caps;
127   gint64 size;
128   GstAppStreamType stream_type;
129   guint64 max_bytes;
130   GstFormat format;
131   gboolean block;
132
133   gboolean flushing;
134   gboolean started;
135   gboolean is_eos;
136   guint64 queued_bytes;
137   guint64 offset;
138   GstAppStreamType current_type;
139
140   guint64 min_latency;
141   guint64 max_latency;
142   gboolean emit_signals;
143   guint min_percent;
144
145   GstAppSrcCallbacks callbacks;
146   gpointer user_data;
147   GDestroyNotify notify;
148 };
149
150 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
151 #define GST_CAT_DEFAULT app_src_debug
152
153 enum
154 {
155   /* signals */
156   SIGNAL_NEED_DATA,
157   SIGNAL_ENOUGH_DATA,
158   SIGNAL_SEEK_DATA,
159
160   /* actions */
161   SIGNAL_PUSH_BUFFER,
162   SIGNAL_END_OF_STREAM,
163
164   LAST_SIGNAL
165 };
166
167 #define DEFAULT_PROP_SIZE          -1
168 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
169 #define DEFAULT_PROP_MAX_BYTES     200000
170 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
171 #define DEFAULT_PROP_BLOCK         FALSE
172 #define DEFAULT_PROP_IS_LIVE       FALSE
173 #define DEFAULT_PROP_MIN_LATENCY   -1
174 #define DEFAULT_PROP_MAX_LATENCY   -1
175 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
176 #define DEFAULT_PROP_MIN_PERCENT   0
177
178 enum
179 {
180   PROP_0,
181   PROP_CAPS,
182   PROP_SIZE,
183   PROP_STREAM_TYPE,
184   PROP_MAX_BYTES,
185   PROP_FORMAT,
186   PROP_BLOCK,
187   PROP_IS_LIVE,
188   PROP_MIN_LATENCY,
189   PROP_MAX_LATENCY,
190   PROP_EMIT_SIGNALS,
191   PROP_MIN_PERCENT,
192   PROP_LAST
193 };
194
195 static GstStaticPadTemplate gst_app_src_template =
196 GST_STATIC_PAD_TEMPLATE ("src",
197     GST_PAD_SRC,
198     GST_PAD_ALWAYS,
199     GST_STATIC_CAPS_ANY);
200
201 GType
202 gst_app_stream_type_get_type (void)
203 {
204   static volatile gsize stream_type_type = 0;
205   static const GEnumValue stream_type[] = {
206     {GST_APP_STREAM_TYPE_STREAM, "GST_APP_STREAM_TYPE_STREAM", "stream"},
207     {GST_APP_STREAM_TYPE_SEEKABLE, "GST_APP_STREAM_TYPE_SEEKABLE", "seekable"},
208     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "GST_APP_STREAM_TYPE_RANDOM_ACCESS",
209         "random-access"},
210     {0, NULL, NULL}
211   };
212
213   if (g_once_init_enter (&stream_type_type)) {
214     GType tmp = g_enum_register_static ("GstAppStreamType", stream_type);
215     g_once_init_leave (&stream_type_type, tmp);
216   }
217
218   return (GType) stream_type_type;
219 }
220
221 static void gst_app_src_uri_handler_init (gpointer g_iface,
222     gpointer iface_data);
223
224 static void gst_app_src_dispose (GObject * object);
225 static void gst_app_src_finalize (GObject * object);
226
227 static void gst_app_src_set_property (GObject * object, guint prop_id,
228     const GValue * value, GParamSpec * pspec);
229 static void gst_app_src_get_property (GObject * object, guint prop_id,
230     GValue * value, GParamSpec * pspec);
231
232 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
233     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
234
235 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
236     guint64 offset, guint size, GstBuffer ** buf);
237 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
238 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
239 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
240 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
241 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
242 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
243 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
244 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
245 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
246
247 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
248     GstBuffer * buffer);
249
250 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
251
252 static void
253 _do_init (GType filesrc_type)
254 {
255   static const GInterfaceInfo urihandler_info = {
256     gst_app_src_uri_handler_init,
257     NULL,
258     NULL
259   };
260   g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
261       &urihandler_info);
262 }
263
264 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
265     _do_init);
266
267 static void
268 gst_app_src_base_init (gpointer g_class)
269 {
270   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
271
272   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
273
274   gst_element_class_set_details_simple (element_class, "AppSrc",
275       "Generic/Source", "Allow the application to feed buffers to a pipeline",
276       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
277
278   gst_element_class_add_pad_template (element_class,
279       gst_static_pad_template_get (&gst_app_src_template));
280 }
281
282 static void
283 gst_app_src_class_init (GstAppSrcClass * klass)
284 {
285   GObjectClass *gobject_class = (GObjectClass *) klass;
286   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
287
288   gobject_class->dispose = gst_app_src_dispose;
289   gobject_class->finalize = gst_app_src_finalize;
290
291   gobject_class->set_property = gst_app_src_set_property;
292   gobject_class->get_property = gst_app_src_get_property;
293
294   /**
295    * GstAppSrc::caps
296    *
297    * The GstCaps that will negotiated downstream and will be put
298    * on outgoing buffers.
299    */
300   g_object_class_install_property (gobject_class, PROP_CAPS,
301       g_param_spec_boxed ("caps", "Caps",
302           "The allowed caps for the src pad", GST_TYPE_CAPS,
303           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
304   /**
305    * GstAppSrc::format
306    *
307    * The format to use for segment events. When the source is producing
308    * timestamped buffers this property should be set to GST_FORMAT_TIME.
309    */
310   g_object_class_install_property (gobject_class, PROP_FORMAT,
311       g_param_spec_enum ("format", "Format",
312           "The format of the segment events and seek", GST_TYPE_FORMAT,
313           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
314   /**
315    * GstAppSrc::size
316    *
317    * The total size in bytes of the data stream. If the total size is known, it
318    * is recommended to configure it with this property.
319    */
320   g_object_class_install_property (gobject_class, PROP_SIZE,
321       g_param_spec_int64 ("size", "Size",
322           "The size of the data stream in bytes (-1 if unknown)",
323           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
324           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
325   /**
326    * GstAppSrc::stream-type
327    *
328    * The type of stream that this source is producing.  For seekable streams the
329    * application should connect to the seek-data signal.
330    */
331   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
332       g_param_spec_enum ("stream-type", "Stream Type",
333           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
334           DEFAULT_PROP_STREAM_TYPE,
335           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
336   /**
337    * GstAppSrc::max-bytes
338    *
339    * The maximum amount of bytes that can be queued internally.
340    * After the maximum amount of bytes are queued, appsrc will emit the
341    * "enough-data" signal.
342    */
343   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
344       g_param_spec_uint64 ("max-bytes", "Max bytes",
345           "The maximum number of bytes to queue internally (0 = unlimited)",
346           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
347           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
348   /**
349    * GstAppSrc::block
350    *
351    * When max-bytes are queued and after the enough-data signal has been emited,
352    * block any further push-buffer calls until the amount of queued bytes drops
353    * below the max-bytes limit.
354    */
355   g_object_class_install_property (gobject_class, PROP_BLOCK,
356       g_param_spec_boolean ("block", "Block",
357           "Block push-buffer when max-bytes are queued",
358           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
359
360   /**
361    * GstAppSrc::is-live
362    *
363    * Instruct the source to behave like a live source. This includes that it
364    * will only push out buffers in the PLAYING state.
365    */
366   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
367       g_param_spec_boolean ("is-live", "Is Live",
368           "Whether to act as a live source",
369           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370   /**
371    * GstAppSrc::min-latency
372    *
373    * The minimum latency of the source. A value of -1 will use the default
374    * latency calculations of #GstBaseSrc.
375    */
376   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
377       g_param_spec_int64 ("min-latency", "Min Latency",
378           "The minimum latency (-1 = default)",
379           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
380           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381   /**
382    * GstAppSrc::max-latency
383    *
384    * The maximum latency of the source. A value of -1 means an unlimited amout
385    * of latency.
386    */
387   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
388       g_param_spec_int64 ("max-latency", "Max Latency",
389           "The maximum latency (-1 = unlimited)",
390           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   /**
394    * GstAppSrc::emit-signals
395    *
396    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
397    * This option is by default enabled for backwards compatibility reasons but
398    * can disabled when needed because signal emission is expensive.
399    *
400    * Since: 0.10.23
401    */
402   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
403       g_param_spec_boolean ("emit-signals", "Emit signals",
404           "Emit need-data, enough-data and seek-data signals",
405           DEFAULT_PROP_EMIT_SIGNALS,
406           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407
408   /**
409    * GstAppSrc::empty-percent
410    *
411    * Make appsrc emit the "need-data" signal when the amount of bytes in the
412    * queue drops below this percentage of max-bytes.
413    *
414    * Since: 0.10.27
415    */
416   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
417       g_param_spec_uint ("min-percent", "Min Percent",
418           "Emit need-data when queued bytes drops below this percent of max-bytes",
419           0, 100, DEFAULT_PROP_MIN_PERCENT,
420           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
421
422   /**
423    * GstAppSrc::need-data:
424    * @appsrc: the appsrc element that emited the signal
425    * @length: the amount of bytes needed.
426    *
427    * Signal that the source needs more data. In the callback or from another
428    * thread you should call push-buffer or end-of-stream.
429    *
430    * @length is just a hint and when it is set to -1, any number of bytes can be
431    * pushed into @appsrc.
432    *
433    * You can call push-buffer multiple times until the enough-data signal is
434    * fired.
435    */
436   gst_app_src_signals[SIGNAL_NEED_DATA] =
437       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
438       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
439       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
440
441   /**
442    * GstAppSrc::enough-data:
443    * @appsrc: the appsrc element that emited the signal
444    *
445    * Signal that the source has enough data. It is recommended that the
446    * application stops calling push-buffer until the need-data signal is
447    * emited again to avoid excessive buffer queueing.
448    */
449   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
450       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
451       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
452       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
453
454   /**
455    * GstAppSrc::seek-data:
456    * @appsrc: the appsrc element that emited the signal
457    * @offset: the offset to seek to
458    *
459    * Seek to the given offset. The next push-buffer should produce buffers from
460    * the new @offset.
461    * This callback is only called for seekable stream types.
462    *
463    * Returns: %TRUE if the seek succeeded.
464    */
465   gst_app_src_signals[SIGNAL_SEEK_DATA] =
466       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
467       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
468       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
469       G_TYPE_UINT64);
470
471    /**
472     * GstAppSrc::push-buffer:
473     * @appsrc: the appsrc
474     * @buffer: a buffer to push
475     *
476     * Adds a buffer to the queue of buffers that the appsrc element will
477     * push to its source pad. This function does not take ownership of the
478     * buffer so the buffer needs to be unreffed after calling this function.
479     *
480     * When the block property is TRUE, this function can block until free space
481     * becomes available in the queue.
482     */
483   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
484       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
485       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
486           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__OBJECT,
487       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
488
489    /**
490     * GstAppSrc::end-of-stream:
491     * @appsrc: the appsrc
492     *
493     * Notify @appsrc that no more buffer are available. 
494     */
495   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
496       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
497       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
498           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
499       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
500
501   basesrc_class->create = gst_app_src_create;
502   basesrc_class->start = gst_app_src_start;
503   basesrc_class->stop = gst_app_src_stop;
504   basesrc_class->unlock = gst_app_src_unlock;
505   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
506   basesrc_class->do_seek = gst_app_src_do_seek;
507   basesrc_class->is_seekable = gst_app_src_is_seekable;
508   basesrc_class->check_get_range = gst_app_src_check_get_range;
509   basesrc_class->get_size = gst_app_src_do_get_size;
510   basesrc_class->get_size = gst_app_src_do_get_size;
511   basesrc_class->query = gst_app_src_query;
512
513   klass->push_buffer = gst_app_src_push_buffer_action;
514   klass->end_of_stream = gst_app_src_end_of_stream;
515
516   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
517 }
518
519 static void
520 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
521 {
522   GstAppSrcPrivate *priv;
523
524   priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
525       GstAppSrcPrivate);
526
527   priv->mutex = g_mutex_new ();
528   priv->cond = g_cond_new ();
529   priv->queue = g_queue_new ();
530
531   priv->size = DEFAULT_PROP_SIZE;
532   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
533   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
534   priv->format = DEFAULT_PROP_FORMAT;
535   priv->block = DEFAULT_PROP_BLOCK;
536   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
537   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
538   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
539   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
540
541   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
542 }
543
544 static void
545 gst_app_src_flush_queued (GstAppSrc * src)
546 {
547   GstBuffer *buf;
548   GstAppSrcPrivate *priv = src->priv;
549
550   while ((buf = g_queue_pop_head (priv->queue)))
551     gst_buffer_unref (buf);
552   priv->queued_bytes = 0;
553 }
554
555 static void
556 gst_app_src_dispose (GObject * obj)
557 {
558   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
559   GstAppSrcPrivate *priv = appsrc->priv;
560
561   if (priv->caps) {
562     gst_caps_unref (priv->caps);
563     priv->caps = NULL;
564   }
565   gst_app_src_flush_queued (appsrc);
566
567   G_OBJECT_CLASS (parent_class)->dispose (obj);
568 }
569
570 static void
571 gst_app_src_finalize (GObject * obj)
572 {
573   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
574   GstAppSrcPrivate *priv = appsrc->priv;
575
576   g_mutex_free (priv->mutex);
577   g_cond_free (priv->cond);
578   g_queue_free (priv->queue);
579
580   G_OBJECT_CLASS (parent_class)->finalize (obj);
581 }
582
583 static void
584 gst_app_src_set_property (GObject * object, guint prop_id,
585     const GValue * value, GParamSpec * pspec)
586 {
587   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
588   GstAppSrcPrivate *priv = appsrc->priv;
589
590   switch (prop_id) {
591     case PROP_CAPS:
592       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
593       break;
594     case PROP_SIZE:
595       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
596       break;
597     case PROP_STREAM_TYPE:
598       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
599       break;
600     case PROP_MAX_BYTES:
601       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
602       break;
603     case PROP_FORMAT:
604       priv->format = g_value_get_enum (value);
605       break;
606     case PROP_BLOCK:
607       priv->block = g_value_get_boolean (value);
608       break;
609     case PROP_IS_LIVE:
610       gst_base_src_set_live (GST_BASE_SRC (appsrc),
611           g_value_get_boolean (value));
612       break;
613     case PROP_MIN_LATENCY:
614       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
615           FALSE, -1);
616       break;
617     case PROP_MAX_LATENCY:
618       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
619           g_value_get_int64 (value));
620       break;
621     case PROP_EMIT_SIGNALS:
622       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
623       break;
624     case PROP_MIN_PERCENT:
625       priv->min_percent = g_value_get_uint (value);
626       break;
627     default:
628       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
629       break;
630   }
631 }
632
633 static void
634 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
635     GParamSpec * pspec)
636 {
637   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
638   GstAppSrcPrivate *priv = appsrc->priv;
639
640   switch (prop_id) {
641     case PROP_CAPS:
642     {
643       GstCaps *caps;
644
645       /* we're missing a _take_caps() function to transfer ownership */
646       caps = gst_app_src_get_caps (appsrc);
647       gst_value_set_caps (value, caps);
648       if (caps)
649         gst_caps_unref (caps);
650       break;
651     }
652     case PROP_SIZE:
653       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
654       break;
655     case PROP_STREAM_TYPE:
656       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
657       break;
658     case PROP_MAX_BYTES:
659       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
660       break;
661     case PROP_FORMAT:
662       g_value_set_enum (value, priv->format);
663       break;
664     case PROP_BLOCK:
665       g_value_set_boolean (value, priv->block);
666       break;
667     case PROP_IS_LIVE:
668       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
669       break;
670     case PROP_MIN_LATENCY:
671     {
672       guint64 min;
673
674       gst_app_src_get_latency (appsrc, &min, NULL);
675       g_value_set_int64 (value, min);
676       break;
677     }
678     case PROP_MAX_LATENCY:
679     {
680       guint64 max;
681
682       gst_app_src_get_latency (appsrc, &max, NULL);
683       g_value_set_int64 (value, max);
684       break;
685     }
686     case PROP_EMIT_SIGNALS:
687       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
688       break;
689     case PROP_MIN_PERCENT:
690       g_value_set_uint (value, priv->min_percent);
691       break;
692     default:
693       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
694       break;
695   }
696 }
697
698 static gboolean
699 gst_app_src_unlock (GstBaseSrc * bsrc)
700 {
701   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
702   GstAppSrcPrivate *priv = appsrc->priv;
703
704   g_mutex_lock (priv->mutex);
705   GST_DEBUG_OBJECT (appsrc, "unlock start");
706   priv->flushing = TRUE;
707   g_cond_broadcast (priv->cond);
708   g_mutex_unlock (priv->mutex);
709
710   return TRUE;
711 }
712
713 static gboolean
714 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
715 {
716   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
717   GstAppSrcPrivate *priv = appsrc->priv;
718
719   g_mutex_lock (priv->mutex);
720   GST_DEBUG_OBJECT (appsrc, "unlock stop");
721   priv->flushing = FALSE;
722   g_cond_broadcast (priv->cond);
723   g_mutex_unlock (priv->mutex);
724
725   return TRUE;
726 }
727
728 static gboolean
729 gst_app_src_start (GstBaseSrc * bsrc)
730 {
731   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
732   GstAppSrcPrivate *priv = appsrc->priv;
733
734   g_mutex_lock (priv->mutex);
735   GST_DEBUG_OBJECT (appsrc, "starting");
736   priv->started = TRUE;
737   /* set the offset to -1 so that we always do a first seek. This is only used
738    * in random-access mode. */
739   priv->offset = -1;
740   priv->flushing = FALSE;
741   g_mutex_unlock (priv->mutex);
742
743   gst_base_src_set_format (bsrc, priv->format);
744
745   return TRUE;
746 }
747
748 static gboolean
749 gst_app_src_stop (GstBaseSrc * bsrc)
750 {
751   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
752   GstAppSrcPrivate *priv = appsrc->priv;
753
754   g_mutex_lock (priv->mutex);
755   GST_DEBUG_OBJECT (appsrc, "stopping");
756   priv->is_eos = FALSE;
757   priv->flushing = TRUE;
758   priv->started = FALSE;
759   gst_app_src_flush_queued (appsrc);
760   g_mutex_unlock (priv->mutex);
761
762   return TRUE;
763 }
764
765 static gboolean
766 gst_app_src_is_seekable (GstBaseSrc * src)
767 {
768   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
769   GstAppSrcPrivate *priv = appsrc->priv;
770   gboolean res = FALSE;
771
772   switch (priv->stream_type) {
773     case GST_APP_STREAM_TYPE_STREAM:
774       break;
775     case GST_APP_STREAM_TYPE_SEEKABLE:
776     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
777       res = TRUE;
778       break;
779   }
780   return res;
781 }
782
783 static gboolean
784 gst_app_src_check_get_range (GstBaseSrc * src)
785 {
786   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
787   GstAppSrcPrivate *priv = appsrc->priv;
788   gboolean res = FALSE;
789
790   switch (priv->stream_type) {
791     case GST_APP_STREAM_TYPE_STREAM:
792     case GST_APP_STREAM_TYPE_SEEKABLE:
793       break;
794     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
795       res = TRUE;
796       break;
797   }
798   return res;
799 }
800
801 static gboolean
802 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
803 {
804   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
805
806   *size = gst_app_src_get_size (appsrc);
807
808   return TRUE;
809 }
810
811 static gboolean
812 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
813 {
814   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
815   GstAppSrcPrivate *priv = appsrc->priv;
816   gboolean res;
817
818   switch (GST_QUERY_TYPE (query)) {
819     case GST_QUERY_LATENCY:
820     {
821       GstClockTime min, max;
822       gboolean live;
823
824       /* Query the parent class for the defaults */
825       res = gst_base_src_query_latency (src, &live, &min, &max);
826
827       /* overwrite with our values when we need to */
828       g_mutex_lock (priv->mutex);
829       if (priv->min_latency != -1)
830         min = priv->min_latency;
831       if (priv->max_latency != -1)
832         max = priv->max_latency;
833       g_mutex_unlock (priv->mutex);
834
835       gst_query_set_latency (query, live, min, max);
836       break;
837     }
838     default:
839       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
840       break;
841   }
842
843   return res;
844 }
845
846 /* will be called in push mode */
847 static gboolean
848 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
849 {
850   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
851   GstAppSrcPrivate *priv = appsrc->priv;
852   gint64 desired_position;
853   gboolean res = FALSE;
854
855   desired_position = segment->last_stop;
856
857   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
858       desired_position, gst_format_get_name (segment->format));
859
860   /* no need to try to seek in streaming mode */
861   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
862     return TRUE;
863
864   if (priv->callbacks.seek_data)
865     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
866   else {
867     gboolean emit;
868
869     g_mutex_lock (priv->mutex);
870     emit = priv->emit_signals;
871     g_mutex_unlock (priv->mutex);
872
873     if (emit)
874       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
875           desired_position, &res);
876   }
877
878   if (res) {
879     GST_DEBUG_OBJECT (appsrc, "flushing queue");
880     gst_app_src_flush_queued (appsrc);
881     priv->is_eos = FALSE;
882   } else {
883     GST_WARNING_OBJECT (appsrc, "seek failed");
884   }
885
886   return res;
887 }
888
889 /* must be called with the appsrc mutex */
890 static gboolean
891 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
892 {
893   gboolean res = FALSE;
894   gboolean emit;
895   GstAppSrcPrivate *priv = appsrc->priv;
896
897   emit = priv->emit_signals;
898   g_mutex_unlock (priv->mutex);
899
900   GST_DEBUG_OBJECT (appsrc,
901       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
902       priv->offset, offset);
903
904   if (priv->callbacks.seek_data)
905     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
906   else if (emit)
907     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
908         offset, &res);
909
910   g_mutex_lock (priv->mutex);
911
912   return res;
913 }
914
915 /* must be called with the appsrc mutex. After this call things can be
916  * flushing */
917 static void
918 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
919 {
920   gboolean emit;
921   GstAppSrcPrivate *priv = appsrc->priv;
922
923   emit = priv->emit_signals;
924   g_mutex_unlock (priv->mutex);
925
926   /* we have no data, we need some. We fire the signal with the size hint. */
927   if (priv->callbacks.need_data)
928     priv->callbacks.need_data (appsrc, size, priv->user_data);
929   else if (emit)
930     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
931         NULL);
932
933   g_mutex_lock (priv->mutex);
934   /* we can be flushing now because we released the lock */
935 }
936
937 static GstFlowReturn
938 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
939     GstBuffer ** buf)
940 {
941   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
942   GstAppSrcPrivate *priv = appsrc->priv;
943   GstFlowReturn ret;
944   GstCaps *caps;
945
946   GST_OBJECT_LOCK (appsrc);
947   caps = priv->caps ? gst_caps_ref (priv->caps) : NULL;
948   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
949           bsrc->segment.format == GST_FORMAT_BYTES)) {
950     GST_DEBUG_OBJECT (appsrc,
951         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
952         bsrc->segment.duration, priv->size);
953     gst_segment_set_duration (&bsrc->segment, GST_FORMAT_BYTES, priv->size);
954     GST_OBJECT_UNLOCK (appsrc);
955
956     gst_element_post_message (GST_ELEMENT (appsrc),
957         gst_message_new_duration (GST_OBJECT (appsrc), GST_FORMAT_BYTES,
958             priv->size));
959   } else {
960     GST_OBJECT_UNLOCK (appsrc);
961   }
962
963   g_mutex_lock (priv->mutex);
964   /* check flushing first */
965   if (G_UNLIKELY (priv->flushing))
966     goto flushing;
967
968   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
969     /* if we are dealing with a random-access stream, issue a seek if the offset
970      * changed. */
971     if (G_UNLIKELY (priv->offset != offset)) {
972       gboolean res;
973
974       /* do the seek */
975       res = gst_app_src_emit_seek (appsrc, offset);
976
977       if (G_UNLIKELY (!res))
978         /* failing to seek is fatal */
979         goto seek_error;
980
981       priv->offset = offset;
982     }
983   }
984
985   while (TRUE) {
986     /* return data as long as we have some */
987     if (!g_queue_is_empty (priv->queue)) {
988       guint buf_size;
989
990       *buf = g_queue_pop_head (priv->queue);
991       buf_size = GST_BUFFER_SIZE (*buf);
992
993       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
994
995       priv->queued_bytes -= buf_size;
996
997       /* only update the offset when in random_access mode */
998       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
999         priv->offset += buf_size;
1000       if (caps) {
1001         *buf = gst_buffer_make_metadata_writable (*buf);
1002         gst_buffer_set_caps (*buf, caps);
1003       }
1004
1005       /* signal that we removed an item */
1006       g_cond_broadcast (priv->cond);
1007
1008       /* see if we go lower than the empty-percent */
1009       if (priv->min_percent && priv->max_bytes) {
1010         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
1011           /* ignore flushing state, we got a buffer and we will return it now.
1012            * Errors will be handled in the next round */
1013           gst_app_src_emit_need_data (appsrc, size);
1014       }
1015       ret = GST_FLOW_OK;
1016       break;
1017     } else {
1018       gst_app_src_emit_need_data (appsrc, size);
1019
1020       /* we can be flushing now because we released the lock above */
1021       if (G_UNLIKELY (priv->flushing))
1022         goto flushing;
1023
1024       /* if we have a buffer now, continue the loop and try to return it. In
1025        * random-access mode (where a buffer is normally pushed in the above
1026        * signal) we can still be empty because the pushed buffer got flushed or
1027        * when the application pushes the requested buffer later, we support both
1028        * possiblities. */
1029       if (!g_queue_is_empty (priv->queue))
1030         continue;
1031
1032       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1033     }
1034
1035     /* check EOS */
1036     if (G_UNLIKELY (priv->is_eos))
1037       goto eos;
1038
1039     /* nothing to return, wait a while for new data or flushing. */
1040     g_cond_wait (priv->cond, priv->mutex);
1041   }
1042   g_mutex_unlock (priv->mutex);
1043   if (caps)
1044     gst_caps_unref (caps);
1045   return ret;
1046
1047   /* ERRORS */
1048 flushing:
1049   {
1050     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1051     g_mutex_unlock (priv->mutex);
1052     if (caps)
1053       gst_caps_unref (caps);
1054     return GST_FLOW_WRONG_STATE;
1055   }
1056 eos:
1057   {
1058     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1059     g_mutex_unlock (priv->mutex);
1060     if (caps)
1061       gst_caps_unref (caps);
1062     return GST_FLOW_UNEXPECTED;
1063   }
1064 seek_error:
1065   {
1066     g_mutex_unlock (priv->mutex);
1067     if (caps)
1068       gst_caps_unref (caps);
1069     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1070         GST_ERROR_SYSTEM);
1071     return GST_FLOW_ERROR;
1072   }
1073 }
1074
1075 /* external API */
1076
1077 /**
1078  * gst_app_src_set_caps:
1079  * @appsrc: a #GstAppSrc
1080  * @caps: caps to set
1081  *
1082  * Set the capabilities on the appsrc element.  This function takes
1083  * a copy of the caps structure. After calling this method, the source will
1084  * only produce caps that match @caps. @caps must be fixed and the caps on the
1085  * buffers must match the caps or left NULL.
1086  * 
1087  * Since: 0.10.22
1088  */
1089 void
1090 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1091 {
1092   GstCaps *old;
1093   GstAppSrcPrivate *priv;
1094
1095   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1096
1097   priv = appsrc->priv;
1098
1099   GST_OBJECT_LOCK (appsrc);
1100   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1101   if ((old = priv->caps) != caps) {
1102     if (caps)
1103       priv->caps = gst_caps_copy (caps);
1104     else
1105       priv->caps = NULL;
1106     if (old)
1107       gst_caps_unref (old);
1108   }
1109   GST_OBJECT_UNLOCK (appsrc);
1110 }
1111
1112 /**
1113  * gst_app_src_get_caps:
1114  * @appsrc: a #GstAppSrc
1115  *
1116  * Get the configured caps on @appsrc.
1117  *
1118  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1119  * 
1120  * Since: 0.10.22
1121  */
1122 GstCaps *
1123 gst_app_src_get_caps (GstAppSrc * appsrc)
1124 {
1125   GstCaps *caps;
1126   GstAppSrcPrivate *priv;
1127
1128   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1129
1130   priv = appsrc->priv;
1131
1132   GST_OBJECT_LOCK (appsrc);
1133   if ((caps = priv->caps))
1134     gst_caps_ref (caps);
1135   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
1136   GST_OBJECT_UNLOCK (appsrc);
1137
1138   return caps;
1139 }
1140
1141 /**
1142  * gst_app_src_set_size:
1143  * @appsrc: a #GstAppSrc
1144  * @size: the size to set
1145  *
1146  * Set the size of the stream in bytes. A value of -1 means that the size is
1147  * not known. 
1148  * 
1149  * Since: 0.10.22
1150  */
1151 void
1152 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1153 {
1154   GstAppSrcPrivate *priv;
1155
1156   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1157
1158   priv = appsrc->priv;
1159
1160   GST_OBJECT_LOCK (appsrc);
1161   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1162   priv->size = size;
1163   GST_OBJECT_UNLOCK (appsrc);
1164 }
1165
1166 /**
1167  * gst_app_src_get_size:
1168  * @appsrc: a #GstAppSrc
1169  *
1170  * Get the size of the stream in bytes. A value of -1 means that the size is
1171  * not known. 
1172  *
1173  * Returns: the size of the stream previously set with gst_app_src_set_size();
1174  * 
1175  * Since: 0.10.22
1176  */
1177 gint64
1178 gst_app_src_get_size (GstAppSrc * appsrc)
1179 {
1180   gint64 size;
1181   GstAppSrcPrivate *priv;
1182
1183   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1184
1185   priv = appsrc->priv;
1186
1187   GST_OBJECT_LOCK (appsrc);
1188   size = priv->size;
1189   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1190   GST_OBJECT_UNLOCK (appsrc);
1191
1192   return size;
1193 }
1194
1195 /**
1196  * gst_app_src_set_stream_type:
1197  * @appsrc: a #GstAppSrc
1198  * @type: the new state
1199  *
1200  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1201  * be connected to.
1202  *
1203  * A stream_type stream 
1204  * 
1205  * Since: 0.10.22
1206  */
1207 void
1208 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1209 {
1210   GstAppSrcPrivate *priv;
1211
1212   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1213
1214   priv = appsrc->priv;
1215
1216   GST_OBJECT_LOCK (appsrc);
1217   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1218   priv->stream_type = type;
1219   GST_OBJECT_UNLOCK (appsrc);
1220 }
1221
1222 /**
1223  * gst_app_src_get_stream_type:
1224  * @appsrc: a #GstAppSrc
1225  *
1226  * Get the stream type. Control the stream type of @appsrc
1227  * with gst_app_src_set_stream_type().
1228  *
1229  * Returns: the stream type.
1230  * 
1231  * Since: 0.10.22
1232  */
1233 GstAppStreamType
1234 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1235 {
1236   gboolean stream_type;
1237   GstAppSrcPrivate *priv;
1238
1239   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1240
1241   priv = appsrc->priv;
1242
1243   GST_OBJECT_LOCK (appsrc);
1244   stream_type = priv->stream_type;
1245   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1246   GST_OBJECT_UNLOCK (appsrc);
1247
1248   return stream_type;
1249 }
1250
1251 /**
1252  * gst_app_src_set_max_bytes:
1253  * @appsrc: a #GstAppSrc
1254  * @max: the maximum number of bytes to queue
1255  *
1256  * Set the maximum amount of bytes that can be queued in @appsrc.
1257  * After the maximum amount of bytes are queued, @appsrc will emit the
1258  * "enough-data" signal.
1259  * 
1260  * Since: 0.10.22
1261  */
1262 void
1263 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1264 {
1265   GstAppSrcPrivate *priv;
1266
1267   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1268
1269   priv = appsrc->priv;
1270
1271   g_mutex_lock (priv->mutex);
1272   if (max != priv->max_bytes) {
1273     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1274     priv->max_bytes = max;
1275     /* signal the change */
1276     g_cond_broadcast (priv->cond);
1277   }
1278   g_mutex_unlock (priv->mutex);
1279 }
1280
1281 /**
1282  * gst_app_src_get_max_bytes:
1283  * @appsrc: a #GstAppSrc
1284  *
1285  * Get the maximum amount of bytes that can be queued in @appsrc.
1286  *
1287  * Returns: The maximum amount of bytes that can be queued.
1288  * 
1289  * Since: 0.10.22
1290  */
1291 guint64
1292 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1293 {
1294   guint64 result;
1295   GstAppSrcPrivate *priv;
1296
1297   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1298
1299   priv = appsrc->priv;
1300
1301   g_mutex_lock (priv->mutex);
1302   result = priv->max_bytes;
1303   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1304   g_mutex_unlock (priv->mutex);
1305
1306   return result;
1307 }
1308
1309 static void
1310 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1311     gboolean do_max, guint64 max)
1312 {
1313   GstAppSrcPrivate *priv = appsrc->priv;
1314   gboolean changed = FALSE;
1315
1316   g_mutex_lock (priv->mutex);
1317   if (do_min && priv->min_latency != min) {
1318     priv->min_latency = min;
1319     changed = TRUE;
1320   }
1321   if (do_max && priv->max_latency != max) {
1322     priv->max_latency = max;
1323     changed = TRUE;
1324   }
1325   g_mutex_unlock (priv->mutex);
1326
1327   if (changed) {
1328     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1329     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1330         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1331   }
1332 }
1333
1334 /**
1335  * gst_app_src_set_latency:
1336  * @appsrc: a #GstAppSrc
1337  * @min: the min latency
1338  * @max: the min latency
1339  *
1340  * Configure the @min and @max latency in @src. If @min is set to -1, the
1341  * default latency calculations for pseudo-live sources will be used.
1342  * 
1343  * Since: 0.10.22
1344  */
1345 void
1346 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1347 {
1348   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1349 }
1350
1351 /**
1352  * gst_app_src_get_latency:
1353  * @appsrc: a #GstAppSrc
1354  * @min: the min latency
1355  * @max: the min latency
1356  *
1357  * Retrieve the min and max latencies in @min and @max respectively.
1358  * 
1359  * Since: 0.10.22
1360  */
1361 void
1362 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1363 {
1364   GstAppSrcPrivate *priv;
1365
1366   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1367
1368   priv = appsrc->priv;
1369
1370   g_mutex_lock (priv->mutex);
1371   if (min)
1372     *min = priv->min_latency;
1373   if (max)
1374     *max = priv->max_latency;
1375   g_mutex_unlock (priv->mutex);
1376 }
1377
1378 /**
1379  * gst_app_src_set_emit_signals:
1380  * @appsrc: a #GstAppSrc
1381  * @emit: the new state
1382  *
1383  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1384  * by default disabled because signal emission is expensive and unneeded when
1385  * the application prefers to operate in pull mode.
1386  *
1387  * Since: 0.10.23
1388  */
1389 void
1390 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1391 {
1392   GstAppSrcPrivate *priv;
1393
1394   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1395
1396   priv = appsrc->priv;
1397
1398   g_mutex_lock (priv->mutex);
1399   priv->emit_signals = emit;
1400   g_mutex_unlock (priv->mutex);
1401 }
1402
1403 /**
1404  * gst_app_src_get_emit_signals:
1405  * @appsrc: a #GstAppSrc
1406  *
1407  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1408  *
1409  * Returns: %TRUE if @appsrc is emiting the "new-preroll" and "new-buffer"
1410  * signals.
1411  *
1412  * Since: 0.10.23
1413  */
1414 gboolean
1415 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1416 {
1417   gboolean result;
1418   GstAppSrcPrivate *priv;
1419
1420   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1421
1422   priv = appsrc->priv;
1423
1424   g_mutex_lock (priv->mutex);
1425   result = priv->emit_signals;
1426   g_mutex_unlock (priv->mutex);
1427
1428   return result;
1429 }
1430
1431 static GstFlowReturn
1432 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1433     gboolean steal_ref)
1434 {
1435   gboolean first = TRUE;
1436   GstAppSrcPrivate *priv;
1437
1438   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1439   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1440
1441   priv = appsrc->priv;
1442
1443   g_mutex_lock (priv->mutex);
1444
1445   while (TRUE) {
1446     /* can't accept buffers when we are flushing or EOS */
1447     if (priv->flushing)
1448       goto flushing;
1449
1450     if (priv->is_eos)
1451       goto eos;
1452
1453     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1454       GST_DEBUG_OBJECT (appsrc,
1455           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1456           priv->queued_bytes, priv->max_bytes);
1457
1458       if (first) {
1459         gboolean emit;
1460
1461         emit = priv->emit_signals;
1462         /* only signal on the first push */
1463         g_mutex_unlock (priv->mutex);
1464
1465         if (priv->callbacks.enough_data)
1466           priv->callbacks.enough_data (appsrc, priv->user_data);
1467         else if (emit)
1468           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1469               NULL);
1470
1471         g_mutex_lock (priv->mutex);
1472         /* continue to check for flushing/eos after releasing the lock */
1473         first = FALSE;
1474         continue;
1475       }
1476       if (priv->block) {
1477         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1478         /* we are filled, wait until a buffer gets popped or when we
1479          * flush. */
1480         g_cond_wait (priv->cond, priv->mutex);
1481       } else {
1482         /* no need to wait for free space, we just pump more data into the
1483          * queue hoping that the caller reacts to the enough-data signal and
1484          * stops pushing buffers. */
1485         break;
1486       }
1487     } else
1488       break;
1489   }
1490
1491   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1492   if (!steal_ref)
1493     gst_buffer_ref (buffer);
1494   g_queue_push_tail (priv->queue, buffer);
1495   priv->queued_bytes += GST_BUFFER_SIZE (buffer);
1496   g_cond_broadcast (priv->cond);
1497   g_mutex_unlock (priv->mutex);
1498
1499   return GST_FLOW_OK;
1500
1501   /* ERRORS */
1502 flushing:
1503   {
1504     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1505     if (steal_ref)
1506       gst_buffer_unref (buffer);
1507     g_mutex_unlock (priv->mutex);
1508     return GST_FLOW_WRONG_STATE;
1509   }
1510 eos:
1511   {
1512     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1513     if (steal_ref)
1514       gst_buffer_unref (buffer);
1515     g_mutex_unlock (priv->mutex);
1516     return GST_FLOW_UNEXPECTED;
1517   }
1518 }
1519
1520 /**
1521  * gst_app_src_push_buffer:
1522  * @appsrc: a #GstAppSrc
1523  * @buffer: a #GstBuffer to push
1524  *
1525  * Adds a buffer to the queue of buffers that the appsrc element will
1526  * push to its source pad.  This function takes ownership of the buffer.
1527  *
1528  * When the block property is TRUE, this function can block until free
1529  * space becomes available in the queue.
1530  *
1531  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1532  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1533  * #GST_FLOW_UNEXPECTED when EOS occured.
1534  * 
1535  * Since: 0.10.22
1536  */
1537 GstFlowReturn
1538 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1539 {
1540   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1541 }
1542
1543 /* push a buffer without stealing the ref of the buffer. This is used for the
1544  * action signal. */
1545 static GstFlowReturn
1546 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1547 {
1548   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1549 }
1550
1551 /**
1552  * gst_app_src_end_of_stream:
1553  * @appsrc: a #GstAppSrc
1554  *
1555  * Indicates to the appsrc element that the last buffer queued in the
1556  * element is the last buffer of the stream.
1557  *
1558  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1559  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1560  * 
1561  * Since: 0.10.22
1562  */
1563 GstFlowReturn
1564 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1565 {
1566   GstAppSrcPrivate *priv;
1567
1568   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1569
1570   priv = appsrc->priv;
1571
1572   g_mutex_lock (priv->mutex);
1573   /* can't accept buffers when we are flushing. We can accept them when we are 
1574    * EOS although it will not do anything. */
1575   if (priv->flushing)
1576     goto flushing;
1577
1578   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1579   priv->is_eos = TRUE;
1580   g_cond_broadcast (priv->cond);
1581   g_mutex_unlock (priv->mutex);
1582
1583   return GST_FLOW_OK;
1584
1585   /* ERRORS */
1586 flushing:
1587   {
1588     g_mutex_unlock (priv->mutex);
1589     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1590     return GST_FLOW_WRONG_STATE;
1591   }
1592 }
1593
1594 /**
1595  * gst_app_src_set_callbacks:
1596  * @appsrc: a #GstAppSrc
1597  * @callbacks: the callbacks
1598  * @user_data: a user_data argument for the callbacks
1599  * @notify: a destroy notify function
1600  *
1601  * Set callbacks which will be executed when data is needed, enough data has
1602  * been collected or when a seek should be performed.
1603  * This is an alternative to using the signals, it has lower overhead and is thus
1604  * less expensive, but also less flexible.
1605  *
1606  * If callbacks are installed, no signals will be emited for performance
1607  * reasons.
1608  *
1609  * Since: 0.10.23
1610  */
1611 void
1612 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1613     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1614 {
1615   GDestroyNotify old_notify;
1616   GstAppSrcPrivate *priv;
1617
1618   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1619   g_return_if_fail (callbacks != NULL);
1620
1621   priv = appsrc->priv;
1622
1623   GST_OBJECT_LOCK (appsrc);
1624   old_notify = priv->notify;
1625
1626   if (old_notify) {
1627     gpointer old_data;
1628
1629     old_data = priv->user_data;
1630
1631     priv->user_data = NULL;
1632     priv->notify = NULL;
1633     GST_OBJECT_UNLOCK (appsrc);
1634
1635     old_notify (old_data);
1636
1637     GST_OBJECT_LOCK (appsrc);
1638   }
1639   priv->callbacks = *callbacks;
1640   priv->user_data = user_data;
1641   priv->notify = notify;
1642   GST_OBJECT_UNLOCK (appsrc);
1643 }
1644
1645 /*** GSTURIHANDLER INTERFACE *************************************************/
1646
1647 static GstURIType
1648 gst_app_src_uri_get_type (void)
1649 {
1650   return GST_URI_SRC;
1651 }
1652
1653 static gchar **
1654 gst_app_src_uri_get_protocols (void)
1655 {
1656   static gchar *protocols[] = { (char *) "appsrc", NULL };
1657
1658   return protocols;
1659 }
1660
1661 static const gchar *
1662 gst_app_src_uri_get_uri (GstURIHandler * handler)
1663 {
1664   return "appsrc";
1665 }
1666
1667 static gboolean
1668 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1669 {
1670   gchar *protocol;
1671   gboolean ret;
1672
1673   protocol = gst_uri_get_protocol (uri);
1674   ret = !strcmp (protocol, "appsrc");
1675   g_free (protocol);
1676
1677   return ret;
1678 }
1679
1680 static void
1681 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1682 {
1683   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1684
1685   iface->get_type = gst_app_src_uri_get_type;
1686   iface->get_protocols = gst_app_src_uri_get_protocols;
1687   iface->get_uri = gst_app_src_uri_get_uri;
1688   iface->set_uri = gst_app_src_uri_set_uri;
1689 }