2 #include <VP_Com/vp_com_socket.h>
3 #include <VP_Com/vp_com_error.h>
5 #include <VP_Os/vp_os_malloc.h>
6 #include <VP_Os/vp_os_print.h>
7 #include <VP_Os/vp_os_signal.h>
15 typedef int socklen_t;
16 #define MSG_NOSIGNAL 0
21 C_RESULT vp_com_open_socket(vp_com_socket_t* sck, Read* read, Write* write)
23 C_RESULT res = VP_COM_OK;
25 BOOL reuseaddroption = TRUE;
26 BOOL exclusiveaddroption = FALSE;
30 struct sockaddr_in name = { 0 };
31 struct sockaddr_in local_address = { 0 };
32 struct sockaddr_in remote_address = { 0 };
34 int res_setsockopt=0,res_connect=0,res_bind=0;
36 switch( sck->protocol )
39 s = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
40 res = ( s == INVALID_SOCKET ) ? VP_COM_ERROR : VP_COM_OK;
44 s = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
45 sck->scn = inet_addr(sck->serverHost); // Cache destination in int format
46 res = ( s == INVALID_SOCKET ) ? VP_COM_ERROR : VP_COM_OK;
50 sck->type = VP_COM_CLIENT;
51 res = VP_COM_PARAMERROR;
57 PRINT("\nSocket opening failed\n");
62 // res_setsockopt = setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(char*)&reuseaddroption,sizeof(reuseaddroption));
63 res_setsockopt = setsockopt(s,SOL_SOCKET,SO_EXCLUSIVEADDRUSE,(char*)&exclusiveaddroption,sizeof(exclusiveaddroption));
65 name.sin_family = AF_INET;
66 name.sin_port = htons( sck->port );
70 remote_address.sin_family = AF_INET;
71 remote_address.sin_port = htons( sck->port );
72 remote_address.sin_addr.s_addr = inet_addr(sck->serverHost);
74 if ( sck->protocol ==VP_COM_UDP)
76 local_address.sin_addr.s_addr= INADDR_ANY;
77 local_address.sin_family = AF_INET;
78 local_address.sin_port = htons( sck->port ); /* Bind to any available port */
79 res_bind = bind(s,(const struct sockaddr*)&local_address,sizeof(local_address));
80 err = WSAGetLastError();
81 res = (res_bind==0)? VP_COM_OK : VP_COM_ERROR; /* Convert from Win32 error code to VP SDK error code */
84 if (VP_SUCCEEDED(res))// && (sck->protocol !=VP_COM_UDP))
86 res_connect = connect( s, (struct sockaddr*)&remote_address, sizeof( remote_address ) );
87 if( res_connect == -1 ){ res = VP_COM_ERROR; err = WSAGetLastError(); }
93 /* Local TCP/UDP address on which we wait for connections */
94 local_address.sin_family = AF_INET;
95 local_address.sin_port = htons( sck->port );
96 local_address.sin_addr.s_addr = INADDR_ANY; /* Accept connections on any network interface */
97 res_bind = bind( s, (const struct sockaddr*)&local_address, sizeof(local_address) );
98 res = (res_bind==0)? VP_COM_OK : VP_COM_ERROR ; /* Convert from Win32 error code to VP SDK error code */
102 res = VP_COM_PARAMERROR;
108 sck->priv = (void*) s;
110 switch( sck->protocol )
113 if(read) *read = (Read) vp_com_read_socket;
114 if(write) *write = (Write) vp_com_write_socket;
118 if(read) *read = (Read) vp_com_read_udp_socket;
119 if(write) *write = (Write) vp_com_write_udp_socket;
123 if(read) *read = NULL;
124 if(write) *write = NULL;
133 if (sck->block != VP_COM_DEFAULT &&
134 sck->block != VP_COM_WAITALL &&
135 sck->block != VP_COM_DONTWAIT)
137 sck->block = VP_COM_DEFAULT;
143 C_RESULT vp_com_close_socket(vp_com_socket_t* socket)
146 return VP_COM_PARAMERROR;
148 // shutdown( (int) socket->priv, SHUT_RDWR );
149 closesocket( (int) socket->priv );
156 C_RESULT vp_com_wait_socket(vp_com_socket_t* server, vp_com_socket_t* client, int32_t queue_length)
160 int l = sizeof(struct sockaddr_in);
161 struct sockaddr_in raddr = { 0 }; // remote address
163 C_RESULT res = VP_COM_OK;
166 return VP_COM_PARAMERROR;
168 s = (int) server->priv;
172 server->queue_length = queue_length;
174 listen(s, queue_length);
175 c = accept( s, (struct sockaddr*)&raddr, &l );
179 if(VP_SUCCEEDED( res ))
181 vp_os_memcpy( client, server, sizeof(vp_com_socket_t) );
182 client->priv = (void*) c;
190 C_RESULT vp_com_sockopt_ip(vp_com_t* vp_com, vp_com_socket_t* socket, VP_COM_SOCKET_OPTIONS options)
192 C_RESULT res = VP_COM_ERROR;
193 int s = (int) socket->priv;
195 if( options & VP_COM_NON_BLOCKING )
197 /*#ifndef USE_MINGW32
200 PRINT("Setting socket %d to non blocking\n", s);
201 res = ioctl( s, FIONBIO, &arg ) < 0 ? C_FAIL : C_OK;
205 PRINT("error setting non blocking\n");
208 if( options & VP_COM_NO_DELAY )
212 PRINT("Disabling the Nagle (TCP No Delay) algorithm for socket %d\n", s);
214 res = setsockopt( s, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag) ) < 0 ? C_FAIL : C_OK;
217 PRINT("error disabling the Nagle algorithm\n");
222 flags = fcntl(s, F_GETFL, 0);
228 flags = fcntl(s, F_SETFL, flags );
234 DEBUG_PRINT_SDK("Get Socket Options failed because of %d\n", errno);
241 /////////////////////////////////////////////////////////////////////////////////////////////////////////////
245 C_RESULT vp_com_read_udp_socket(vp_com_socket_t* sck, int8_t* buffer, int32_t* size)
248 int s = (int) sck->priv;
249 struct sockaddr_in from;
252 socklen_t from_len = sizeof(from);
255 if (VP_COM_WAITALL == sck->block)
256 flags |= MSG_WAITALL;
257 else if (VP_COM_DONTWAIT == sck->block)
260 ioctlsocket (s, FIONBIO, &iMode);
267 *size = recvfrom(s, (char*)buffer, *size, flags, (struct sockaddr*)&from, &from_len );
273 switch( err=WSAGetLastError() )
276 PRINT("MSG_NOSIGNAL is not supported on this platform\n");
286 PRINT("Connection timed out\n");
290 PRINT("Connection with peer is not enabled\n");
298 //sck->scn = from.sin_addr.s_addr;
299 //sck->port = ntohs(from.sin_port);
307 if (VP_COM_DONTWAIT == sck->block)
310 ioctlsocket (s, FIONBIO, &iMode);
317 C_RESULT vp_com_make_udp_target( vp_com_socket_t* sck )
319 C_RESULT res = C_FAIL;
321 if( sck->protocol == VP_COM_UDP )
323 sck->scn = inet_addr(sck->serverHost); // We use scn field to store ip in order to avoid a call to inet_addr each time we call write
330 C_RESULT vp_com_write_udp_socket(vp_com_socket_t* sck, const int8_t* buffer, int32_t* size)
333 int s = (int) sck->priv;
334 struct sockaddr_in to;
337 if (VP_COM_WAITALL == sck->block)
338 flags |= MSG_WAITALL;
339 else if (VP_COM_DONTWAIT == sck->block)
342 ioctlsocket (s, FIONBIO, &iMode);
349 vp_os_memset( (char*)&to, 0, sizeof(to) );
350 to.sin_family = AF_INET;
351 to.sin_addr.s_addr = sck->scn;
352 to.sin_port = htons(sck->port);
354 //*size = sendto( s, (char*)buffer, *size, 0, (struct sockaddr*)&to, sizeof(to) );
355 *size = send( s, (char*)buffer, *size, flags);
361 switch( WSAGetLastError() )
364 PRINT("MSG_NOSIGNAL is not supported on this platform\n");
375 PRINT("Connection with peer is not enabled\n");
387 if (VP_COM_DONTWAIT == sck->block)
390 ioctlsocket (s, FIONBIO, &iMode);
396 C_RESULT vp_com_read_socket(vp_com_socket_t* socket, int8_t* buffer, int32_t* size)
399 SOCKET s = (SOCKET) socket->priv;
402 if (VP_COM_WAITALL == socket->block)
403 flags |= MSG_WAITALL;
404 else if (VP_COM_DONTWAIT == socket->block)
407 ioctlsocket (s, FIONBIO, &iMode);
413 *size = /*read*/recv(s, buffer, *size, flags);
416 if( errno == EAGAIN )
431 if (VP_COM_DONTWAIT == socket->block)
434 ioctlsocket (s, FIONBIO, &iMode);
440 C_RESULT vp_com_write_socket(vp_com_socket_t* socket, const int8_t* buffer, int32_t* size)
443 SOCKET s = (SOCKET) socket->priv;
446 if (VP_COM_WAITALL == socket->block)
447 flags |= MSG_WAITALL;
448 else if (VP_COM_DONTWAIT == socket->block)
451 ioctlsocket (s, FIONBIO, &iMode);
457 *size = send(s, buffer, *size, flags);
460 if( errno == EAGAIN )
475 if (VP_COM_DONTWAIT == socket->block)
478 ioctlsocket (s, FIONBIO, &iMode);
486 /////////////////////////////////////////////////////////////////////////////////////////////////////////////
488 static vp_os_mutex_t server_initialisation_mutex;
489 static vp_os_cond_t server_initialisation_wait;
490 static bool_t server_init_not_finished = FALSE;
492 C_RESULT vp_com_init_server(void)
494 server_init_not_finished = TRUE;
495 vp_os_mutex_init(&server_initialisation_mutex);
496 vp_os_cond_init(&server_initialisation_wait, &server_initialisation_mutex);
501 C_RESULT vp_com_wait_for_server_up(void)
503 if( server_init_not_finished )
505 vp_os_mutex_lock(&server_initialisation_mutex);
506 vp_os_cond_wait(&server_initialisation_wait);
507 vp_os_mutex_unlock(&server_initialisation_mutex);
513 C_RESULT vp_com_timed_wait_for_server_up(uint32_t ms)
517 if( server_init_not_finished )
519 vp_os_mutex_lock(&server_initialisation_mutex);
520 res = vp_os_cond_timed_wait(&server_initialisation_wait, ms);
521 vp_os_mutex_unlock(&server_initialisation_mutex);
527 extern int32_t vp_com_fill_read_fs(vp_com_socket_t* sockets, int32_t num_sockets, int32_t max, fd_set* read_fs );
528 extern void vp_com_close_client_sockets(vp_com_socket_t* client_sockets, int32_t num_client_sockets);
529 extern C_RESULT vp_com_client_open_socket(vp_com_socket_t* server_socket, vp_com_socket_t* client_socket);
530 extern void vp_com_client_receive( vp_com_socket_t *client_socket );
532 DEFINE_THREAD_ROUTINE_STACK( vp_com_server, thread_params, VP_COM_THREAD_SERVER_STACK_SIZE )
535 vp_com_socket_t client_sockets[VP_COM_THREAD_NUM_MAX_CLIENTS];
536 struct timeval tv, *ptv;
538 // This thread setup connection then loop & wait for a socket event
539 vp_com_server_thread_param_t* params = (vp_com_server_thread_param_t*) thread_params;
541 int32_t i, rc, ncs, s, max = 0, num_server_sockets = params->num_servers, num_client_sockets = 0;
542 vp_com_socket_t* server_sockets = params->servers;
545 vp_os_memset( client_sockets, 0, sizeof( client_sockets ));
547 if(VP_FAILED(vp_com_init(params->com)))
549 DEBUG_PRINT_SDK("[VP_COM_SERVER] Failed to init com\n");
550 vp_com_shutdown(params->com);
552 else if(VP_FAILED(vp_com_local_config(params->com, params->config)))
554 DEBUG_PRINT_SDK("[VP_COM_SERVER] Failed to configure com\n");
555 vp_com_shutdown(params->com);
557 else if(VP_FAILED(vp_com_connect(params->com, params->connection, 1)))
559 DEBUG_PRINT_SDK("[VP_COM_SERVER] Failed to connect\n");
560 vp_com_shutdown(params->com);
564 vp_os_mutex_lock(&server_initialisation_mutex);
565 vp_os_cond_signal(&server_initialisation_wait);
566 vp_os_mutex_unlock(&server_initialisation_mutex);
568 server_init_not_finished = FALSE;
570 for( i = 0; i < num_server_sockets; i++ )
572 if(VP_FAILED( vp_com_open_socket(&server_sockets[i], NULL, NULL) ))
574 DEBUG_PRINT_SDK("[VP_COM_SERVER] Unable to open server socket\n");
575 server_sockets[i].is_disable = TRUE;
579 listen((int32_t)server_sockets[i].priv, server_sockets[i].queue_length);
585 while( params->run == TRUE )
587 if( params->timer_enable == FALSE || ( params->wait_sec == 0 && params->wait_usec == 0 ) )
593 tv.tv_sec = params->wait_sec;
594 tv.tv_usec = params->wait_usec;
599 max = vp_com_fill_read_fs( &server_sockets[0], num_server_sockets, 0, &read_fs );
600 max = vp_com_fill_read_fs( &client_sockets[0], num_client_sockets, max, &read_fs );
602 rc = select( max + 1, &read_fs, NULL, NULL, ptv );
603 if( rc == -1 && ( errno == EINTR || errno == EAGAIN ) )
608 DEBUG_PRINT_SDK("[VP_COM_SERVER] select timeout\n");
610 vp_com_close_client_sockets(&client_sockets[0], num_client_sockets);
611 num_client_sockets = 0;
613 params->timer_enable = FALSE;
614 vp_os_memset( client_sockets, 0, sizeof( client_sockets ));
617 for( i = 0; i < num_server_sockets && rc != 0; i++ )
619 s = (int32_t) server_sockets[i].priv;
621 if( ( !server_sockets[i].is_disable ) && FD_ISSET( s, &read_fs) )
625 // Recycle previously released sockets
626 for( ncs = 0; ncs < num_client_sockets && client_sockets[ncs].priv != NULL; ncs++ );
628 if( ncs < VP_COM_THREAD_NUM_MAX_CLIENTS)
630 if( VP_SUCCEEDED(vp_com_client_open_socket(&server_sockets[i], &client_sockets[ncs])) && ( ncs == num_client_sockets ) )
631 num_client_sockets ++;
636 for( i = 0; i < num_client_sockets && rc != 0; i++ )
638 s = (int32_t) client_sockets[i].priv;
639 if( ( !client_sockets[i].is_disable ) && FD_ISSET( s, &read_fs) )
643 vp_com_client_receive( &client_sockets[i] );
648 for( i = 0; i < num_server_sockets; i++ )
650 vp_com_close_socket(&server_sockets[i]);
654 vp_com_disconnect(params->com);
655 vp_com_shutdown(params->com);