2 * @file vp_api_supervisor.c
3 * @brief VP Api. Pipeline supervisor
7 ///////////////////////////////////////////////
10 #include <VP_Api/vp_api_supervisor.h>
11 #include <VP_Api/vp_api_config.h>
12 #include <VP_Api/vp_api.h>
13 #include <VP_Os/vp_os_print.h>
14 #include <VP_Os/vp_os_malloc.h>
15 #include <VP_Os/vp_os_assert.h>
18 ///////////////////////////////////////////////
21 /** Global array containing pipeline handles.
23 static PIPELINE_ADDRESS pipelines[VP_API_MAX_NUM_PIPELINES] = {(PIPELINE_ADDRESS)NULL};
27 * @fn vp_api_get_message(vp_api_io_pipeline_t *, DEST_HANDLE *, PIPELINE_MSG *, void *, void *)
28 * @brief Get a message.
30 * Get a message from the pipeline fifo. Internally called by vp_api_handle_messages()
31 * @param pipeline Pipeline definition
32 * @param dest Message destination
33 * @param msg_id Message identifier
34 * @param callback Optional callback function called after processing of the message
35 * @param param Optional message parameters
36 * @return C_RESULT : VP_SUCCESS
37 * @author Julien Floret <julien.floret.ext\@parrot.com>
41 vp_api_get_message(vp_api_io_pipeline_t *pipeline, DEST_HANDLE *dest, PIPELINE_MSG *msg_id, void **callback, void **param);
44 ///////////////////////////////////////////////
47 static PIPELINE_HANDLE nb_pipelines = 0;
50 C_RESULT vp_api_add_pipeline(vp_api_io_pipeline_t *pipeline, PIPELINE_HANDLE *handle)
52 C_RESULT res = VP_SUCCESS;
55 VP_OS_ASSERT(nb_pipelines < VP_API_MAX_NUM_PIPELINES);
57 while(pipelines[i] != ((PIPELINE_ADDRESS)NULL))
62 pipelines[i] = (PIPELINE_ADDRESS) pipeline;
70 C_RESULT vp_api_remove_pipeline(vp_api_io_pipeline_t *pipeline, PIPELINE_HANDLE *handle)
72 C_RESULT res = VP_SUCCESS;
74 pipelines[*handle] = (PIPELINE_ADDRESS) NULL;
81 vp_api_io_pipeline_t * vp_api_get_pipeline(PIPELINE_HANDLE handle)
83 vp_api_io_pipeline_t *pipeline = (vp_api_io_pipeline_t *) pipelines[handle];
88 C_RESULT vp_api_post_message(DEST_HANDLE dest, PIPELINE_MSG msg_id, void *callback, void *param)
90 C_RESULT res = VP_SUCCESS;
92 vp_api_io_pipeline_t *pipeline = (vp_api_io_pipeline_t *) pipelines[dest.pipeline];
94 /* Do not send the message if the pipeline does not exist yet
95 This happens when calling the callback function of the 'video_channel' configuration
96 value at drone startup. */
97 if (pipeline==NULL) { return VP_FAILURE; }
99 VP_OS_ASSERT(pipeline->fifo.nb_waiting >= 0);
101 vp_os_mutex_lock(&pipeline->fifo.mutex);
103 if((pipeline->fifo.ppost + sizeof(DEST_HANDLE) + sizeof(PIPELINE_MSG) + sizeof(void *) + sizeof(void *)) >= (pipeline->fifo.pbase + VP_API_PIPELINE_FIFO_SIZE))
104 pipeline->fifo.ppost = pipeline->fifo.pbase;
106 vp_os_memcpy(pipeline->fifo.ppost, &dest, sizeof(DEST_HANDLE));
107 pipeline->fifo.ppost += sizeof(DEST_HANDLE);
109 vp_os_memcpy(pipeline->fifo.ppost, &msg_id, sizeof(PIPELINE_MSG));
110 pipeline->fifo.ppost += sizeof(PIPELINE_MSG);
113 vp_os_memcpy(pipeline->fifo.ppost, &callback, sizeof(void *));
115 vp_os_memset(pipeline->fifo.ppost, 0, sizeof(void *));
116 pipeline->fifo.ppost += sizeof(void *);
119 vp_os_memcpy(pipeline->fifo.ppost, ¶m, sizeof(void *));
122 vp_os_memset(pipeline->fifo.ppost, 0, sizeof(void *));
123 pipeline->fifo.ppost += sizeof(void *);
125 pipeline->fifo.nb_waiting ++;
127 vp_os_mutex_unlock(&pipeline->fifo.mutex);
133 static C_RESULT vp_api_get_message(vp_api_io_pipeline_t *pipeline, DEST_HANDLE *dest, PIPELINE_MSG *msg_id, void **callback, void **param)
135 C_RESULT res = VP_SUCCESS;
137 VP_OS_ASSERT(pipeline->fifo.nb_waiting > 0);
139 vp_os_mutex_lock(&pipeline->fifo.mutex);
141 if((pipeline->fifo.pget + sizeof(PIPELINE_MSG) + sizeof(void *) + sizeof(void *)) >= (pipeline->fifo.pbase + VP_API_PIPELINE_FIFO_SIZE))
142 pipeline->fifo.pget = pipeline->fifo.pbase;
145 vp_os_memcpy(dest, pipeline->fifo.pget, sizeof(DEST_HANDLE));
146 pipeline->fifo.pget += sizeof(DEST_HANDLE);
149 vp_os_memcpy(msg_id, pipeline->fifo.pget, sizeof(PIPELINE_MSG));
150 pipeline->fifo.pget += sizeof(PIPELINE_MSG);
153 vp_os_memcpy(callback, pipeline->fifo.pget, sizeof(void *));
154 pipeline->fifo.pget += sizeof(void *);
157 vp_os_memcpy(param, pipeline->fifo.pget, sizeof(void *));
158 pipeline->fifo.pget += sizeof(void *);
160 pipeline->fifo.nb_waiting --;
162 vp_os_mutex_unlock(&pipeline->fifo.mutex);
168 C_RESULT vp_api_handle_messages(vp_api_io_pipeline_t *pipeline)
170 C_RESULT res = VP_SUCCESS;
174 void *callback = NULL;
178 while(pipeline->fifo.nb_waiting > 0)
180 if(VP_FAILED(vp_api_get_message(pipeline, &dest, &msg_id, &callback, ¶m)))
184 if(dest.stage == VP_API_DEST_PIPELINE_LEVEL)
186 pipeline->handle_msg(pipeline, msg_id, callback, param);
188 else if(dest.stage == VP_API_DEST_STAGE_BROADCAST)
190 for(i=0; i < pipeline->nb_stages; i++)
192 pipeline->stages[i].funcs.handle_msg(pipeline->stages[i].cfg, msg_id, callback, param);
195 else if((dest.stage >=0) && (dest.stage < (int16_t)pipeline->nb_stages))
197 if (pipeline->stages[dest.stage].funcs.handle_msg != NULL)
198 pipeline->stages[dest.stage].funcs.handle_msg(pipeline->stages[dest.stage].cfg, msg_id, callback, param);