1 
2 /*
3  * Copyright (C) 2015-2020 Alibaba Group Holding Limited
4  *
5  */
6 
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <stdint.h>
10 #include <string.h>
11 #include <signal.h>
12 #include <stdlib.h>
13 
14 #include "uvoice_os.h"
15 #include "uvoice_ws.h"
16 
17 #include "nopoll.h"
18 #include "lwip/netif.h"
19 
20 
21 #ifdef UVOICE_ON_BK7251
22 #define NOPOLL_CLIENT_TASK_STACK_SIZE	8192
23 #else
24 #define NOPOLL_CLIENT_TASK_STACK_SIZE	4096
25 #endif
26 #define NOPOLL_CLIENT_TASK_PRIORITY		UVOICE_TASK_PRI_LOWER
27 
28 #define WAIT_MS_MAX						200
29 #define WAIT_MS_MIN						20
30 #define PING_INTERVAL_COUNT				100 /* max: WAIT_MS_MAX * PING_INTERVAL_COUNT < 45 seconds */
31 
32 #define NOPOLL_PING_PERIOD_SEC			20
33 #define NOPOLL_PONG_TIMEOUT_MSEC		10000 /* keep NOPOLL_PONG_TIMEOUT_MSEC < NOPOLL_PING_PERIOD_SEC */
34 
35 #define MERGE_TEXT_FRAME_SUPPORT		1
36 
37 enum {
38 	NOPOLL_CMD_EXIT,
39 	NOPOLL_CMD_OPEN,
40 	NOPOLL_CMD_SEND_TEXT,
41 	NOPOLL_CMD_SEND_BINARY,
42 	NOPOLL_CMD_CLOSE
43 };
44 
45 typedef struct {
46 	int type;
47 	void *data;
48 } nopoll_msg_t;
49 
50 typedef struct {
51 	uint8_t *buff;
52 	uint32_t size;
53 	ws_bin_type_t type;
54 } nopoll_xfer_t;
55 
56 typedef struct {
57 	char server[128];
58 	char port[8];
59 	char schema[8];
60 	char *cacert;
61 	char *path;
62 } nopoll_conn_info_t;
63 
64 typedef struct {
65 	noPollCtx *ctx;
66 	noPollConn *conn;
67 	noPollConnOpts *conn_opts;
68 	nopoll_conn_info_t conn_info;
69 	char *msg_buffer;
70 	int msg_count;
71 	char *prev_text;
72 	int prev_text_size;
73 	uint8_t recv_disable:1;
74 	uint8_t pong_recv:1;
75 	int heartbeat_type;
76 	int prev_recv_type;
77 	os_mutex_t lock;
78 	os_sem_t exit_sem;
79 	os_task_t task;
80 	os_queue_t queue;
81 	ws_cb_ops_t callback;
82 	ws_conn_state_t ws_conn_state;
83 	long long conn_active_time;
84 } ws_handler_t;
85 
86 static ws_handler_t *g_ws_handler;
87 
88 static int nopoll_client_close(ws_handler_t *handler);
89 
active_time_update(ws_handler_t * handler)90 static void active_time_update(ws_handler_t *handler)
91 {
92 	handler->conn_active_time = os_current_time();
93 }
94 
active_time_get(ws_handler_t * handler)95 static int active_time_get(ws_handler_t *handler)
96 {
97 	return handler->conn_active_time / 1000;
98 }
99 
msg_count_add(ws_handler_t * handler)100 static void inline msg_count_add(ws_handler_t *handler)
101 {
102 	os_mutex_lock(handler->lock, OS_WAIT_FOREVER);
103 	handler->msg_count++;
104 	os_mutex_unlock(handler->lock);
105 }
106 
msg_count_dec(ws_handler_t * handler)107 static void inline msg_count_dec(ws_handler_t *handler)
108 {
109 	os_mutex_lock(handler->lock, OS_WAIT_FOREVER);
110 	handler->msg_count--;
111 	os_mutex_unlock(handler->lock);
112 }
113 
nopoll_message_handle(ws_handler_t * handler,noPollMsg * msg)114 static void nopoll_message_handle(ws_handler_t *handler, noPollMsg *msg)
115 {
116 	char *content;
117 	noPollOpCode type;
118 	int size;
119 	bool final;
120 
121 	size = nopoll_msg_get_payload_size(msg);
122 	type = nopoll_msg_opcode(msg);
123 	final = nopoll_msg_is_final(msg);
124 	content = (char *)nopoll_msg_get_payload(msg);
125 
126 #if MERGE_TEXT_FRAME_SUPPORT
127 	if (handler->prev_text_size > 0 && handler->prev_text) {
128 		if (size > 0 && type == NOPOLL_TEXT_FRAME) {
129 			handler->prev_text = snd_realloc(handler->prev_text,
130 				handler->prev_text_size + size, AFM_EXTN);
131 			if (!handler->prev_text) {
132 				M_LOGE("realloc prev text failed !\n");
133 				handler->prev_text_size = 0;
134 				return;
135 			}
136 			memcpy(handler->prev_text + handler->prev_text_size,
137 				content, size);
138 			handler->prev_text_size += size;
139 			if (final) {
140 				handler->callback.recv_text_cb(handler->prev_text,
141 					handler->prev_text_size);
142 				snd_free(handler->prev_text);
143 				handler->prev_text = NULL;
144 				handler->prev_text_size = 0;
145 			}
146 			return;
147 		} else {
148 			M_LOGD("expect text frame, receive type %d\n", type);
149 			handler->callback.recv_text_cb(handler->prev_text,
150 				handler->prev_text_size);
151 			snd_free(handler->prev_text);
152 			handler->prev_text = NULL;
153 			handler->prev_text_size = 0;
154 		}
155 	} else if (size > 0 && type == NOPOLL_TEXT_FRAME && !final) {
156 		if (handler->prev_text) {
157 			snd_free(handler->prev_text);
158 			handler->prev_text_size = 0;
159 		}
160 
161 		handler->prev_text = snd_zalloc(size, AFM_EXTN);
162 		if (!handler->prev_text) {
163 			M_LOGE("alloc prev text buffer failed !\n");
164 			return;
165 		}
166 		handler->prev_text_size = size;
167 		memcpy(handler->prev_text, content, size);
168 		return;
169 	}
170 #endif
171 
172 	switch (type) {
173 	case NOPOLL_TEXT_FRAME:
174 		handler->prev_recv_type = NOPOLL_TEXT_FRAME;
175 		handler->callback.recv_text_cb(content, size);
176 		break;
177 	case NOPOLL_BINARY_FRAME:
178 		handler->prev_recv_type = NOPOLL_BINARY_FRAME;
179 		if (final) {
180 			handler->callback.recv_binary_cb(content,
181 				size, WS_BIN_DATA_FINISH);
182 		} else {
183 			handler->callback.recv_binary_cb(content,
184 				size, WS_BIN_DATA_START);
185 		}
186 		break;
187 	case NOPOLL_CONTINUATION_FRAME:
188 		if (handler->prev_recv_type == NOPOLL_TEXT_FRAME) {
189 			handler->callback.recv_text_cb(content, size);
190 		} else {
191 			if (final)
192 				handler->callback.recv_binary_cb(content,
193 					size, WS_BIN_DATA_FINISH);
194 			else
195 				handler->callback.recv_binary_cb(content,
196 					size, WS_BIN_DATA_CONTINUE);
197 		}
198 		break;
199 	case NOPOLL_PONG_FRAME:
200 		handler->pong_recv = 1;
201 		break;
202 	default:
203 		break;
204 	}
205 }
206 
__nopoll_conn_open(ws_handler_t * handler,nopoll_xfer_t * data)207 static void __nopoll_conn_open(ws_handler_t *handler,
208 	nopoll_xfer_t *data)
209 {
210 	nopoll_conn_info_t *conn_info;
211 
212 	if (!handler) {
213 		M_LOGE("ws handler null !\n");
214 		return;
215 	}
216 
217 	handler->conn_opts = nopoll_conn_opts_new();
218 	if (!handler->conn_opts) {
219 		M_LOGE("create connect opts failed !\n");
220 		goto __err_exit;
221 	}
222 
223 	conn_info = &handler->conn_info;
224 
225 	if (!conn_info->cacert) {
226 		if (conn_info->path)
227 			handler->conn = nopoll_conn_new_opts(handler->ctx,
228 				handler->conn_opts,
229 				conn_info->server,
230 				conn_info->port,
231 				NULL,
232 				conn_info->path,
233 				NULL, NULL);
234 		else
235 			handler->conn = nopoll_conn_new(handler->ctx,
236 				conn_info->server,
237 				conn_info->port,
238 				conn_info->server,
239 				NULL, NULL, NULL);
240 	} else {
241 		if (!nopoll_conn_opts_set_ssl_certs(handler->conn_opts,
242 			NULL, 0,
243 			NULL, 0,
244 			NULL, 0,
245 			conn_info->cacert,
246 			strlen(conn_info->cacert) + 1)) {
247 			M_LOGE("set ssl certs failed !\n");
248 			goto __err_exit;
249 		}
250 
251 		nopoll_conn_opts_ssl_peer_verify(handler->conn_opts,
252 			nopoll_false);
253 		handler->conn = nopoll_conn_tls_new(handler->ctx,
254 			handler->conn_opts, conn_info->server,
255 			conn_info->port,
256 			NULL,
257 			conn_info->path,
258 			NULL, NULL);
259 	}
260 
261 	if (!handler->conn) {
262 		M_LOGE("create nopoll connection failed !\n");
263 		handler->conn_opts = NULL;
264 		goto __err_exit;
265 	}
266 
267 	if (!nopoll_conn_wait_until_connection_ready(
268 		handler->conn, 3)) {
269 		M_LOGE("connection timeout !\n");
270 		goto __err_exit;
271 	}
272 
273 	handler->ws_conn_state = WS_CONN_STAT_CONNECTED;
274 	handler->callback.connect_cb();
275 	return;
276 
277 __err_exit:
278 	nopoll_conn_unref(handler->conn);
279 	nopoll_conn_opts_free(handler->conn_opts);
280 	handler->conn = NULL;
281 	handler->conn_opts = NULL;
282 	handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
283 }
284 
__nopoll_conn_close(ws_handler_t * handler,void * data)285 static void __nopoll_conn_close(ws_handler_t *handler, void *data)
286 {
287 	if (!handler) {
288 		M_LOGE("ws handler null !\n");
289 		return -1;
290 	}
291 
292 	nopoll_conn_close(handler->conn);
293 	handler->conn = NULL;
294 	handler->conn_opts = NULL;
295 
296 	handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
297 	handler->callback.disconnect_cb();
298 }
299 
__nopoll_conn_send_continue(noPollConn * conn,const char * content,long length)300 static int __nopoll_conn_send_continue(noPollConn * conn,
301 	const char *content, long length)
302 {
303 	return __nopoll_conn_send_common(conn,
304 		content,
305 		length,
306 		nopoll_true,
307 		0,
308 		NOPOLL_CONTINUATION_FRAME);
309 }
310 
__nopoll_conn_send_continue_fragment(noPollConn * conn,const char * content,long length)311 static int __nopoll_conn_send_continue_fragment(noPollConn * conn,
312 	const char *content, long length)
313 {
314 	return __nopoll_conn_send_common(conn,
315 		content,
316 		length,
317 		nopoll_false,
318 		0,
319 		NOPOLL_CONTINUATION_FRAME);
320 }
321 
__nopoll_conn_complete_pending_write(noPollConn * conn)322 static int __nopoll_conn_complete_pending_write(noPollConn *conn)
323 {
324 	int try_times = 0;
325 	while (try_times < 5 && errno == NOPOLL_EWOULDBLOCK &&
326 		nopoll_conn_pending_write_bytes(conn) > 0) {
327 		os_msleep(50);
328 		if (nopoll_conn_complete_pending_write(conn) == 0)
329 			return 0;
330 		try_times++;
331 	}
332 	return 1;
333 }
334 
__nopoll_conn_send_text(ws_handler_t * handler,nopoll_xfer_t * data)335 static void __nopoll_conn_send_text(ws_handler_t *handler,
336 	nopoll_xfer_t *data)
337 {
338 	int ret;
339 
340 	ret = nopoll_conn_send_text(handler->conn,
341 		(char *)data->buff, data->size);
342 	if (ret != data->size) {
343 		if (__nopoll_conn_complete_pending_write(handler->conn))
344 			M_LOGE("size %u ret %d\n", data->size, ret);
345 	}
346 
347 	snd_free(data->buff);
348 	snd_free(data);
349 }
350 
__nopoll_conn_send_binary(ws_handler_t * handler,nopoll_xfer_t * data)351 static void __nopoll_conn_send_binary(ws_handler_t *handler,
352 	nopoll_xfer_t *data)
353 {
354 	int ret = 0;
355 
356 	switch (data->type) {
357 	case WS_BIN_DATA_START:
358 		ret = nopoll_conn_send_binary_fragment(handler->conn,
359 			(char *)data->buff, data->size);
360 		break;
361 	case WS_BIN_DATA_CONTINUE:
362 		ret = __nopoll_conn_send_continue_fragment(handler->conn,
363 			(char *)data->buff, data->size);
364 		break;
365 	case WS_BIN_DATA_FINISH:
366 		ret = __nopoll_conn_send_continue(handler->conn,
367 			(char *)data->buff, data->size);
368 		break;
369 	default:
370 		M_LOGE("unknown data type\n");
371 		break;
372 	}
373 
374 	if (ret != data->size) {
375 		if (__nopoll_conn_complete_pending_write(handler->conn))
376 			M_LOGE("size %u, ret %d\n", data->size, ret);
377 	}
378 
379 	snd_free(data->buff);
380 	snd_free(data);
381 }
382 
__nopoll_conn_try_get_msg(ws_handler_t * handler)383 static int __nopoll_conn_try_get_msg(ws_handler_t *handler)
384 {
385 	noPollMsg *msg;
386 
387 	if (handler->recv_disable)
388 		return -1;
389 
390 	if (!nopoll_conn_is_ok(handler->conn)) {
391 		if (handler->ws_conn_state == WS_CONN_STAT_CONNECTED) {
392 			M_LOGE("connect down, close nopoll client\n");
393 			__nopoll_conn_close(handler, NULL);
394 		}
395 		return -1;
396 	}
397 
398 	msg = nopoll_conn_get_msg(handler->conn);
399 	if (!msg)
400 		return -1;
401 
402 	active_time_update(handler);
403 
404 	nopoll_message_handle(handler, msg);
405 
406 	nopoll_msg_unref(msg);
407 	return 0;
408 }
409 
__nopoll_conn_ping_test(ws_handler_t * handler)410 static int __nopoll_conn_ping_test(ws_handler_t *handler)
411 {
412 	long long time_active;
413 	long long time_now;
414 
415 	if (!nopoll_conn_is_ok(handler->conn)) {
416 		if (handler->ws_conn_state == WS_CONN_STAT_CONNECTED) {
417 			M_LOGE("connection is closed\n");
418 			__nopoll_conn_close(handler, NULL);
419 		}
420 		return -1;
421 	}
422 
423 	time_now = os_current_time();
424 	time_now /= 1000;
425 
426 	time_active = active_time_get(handler);
427 
428 	if (time_now <= (time_active + NOPOLL_PING_PERIOD_SEC))
429 		return -1;
430 
431 	nopoll_conn_send_ping(handler->conn);
432 	active_time_update(handler);
433 	return 0;
434 }
435 
__nopoll_conn_pong_check(ws_handler_t * handler)436 static void __nopoll_conn_pong_check(ws_handler_t *handler)
437 {
438 	if (handler->pong_recv)
439 		return;
440 	M_LOGE("recv pong frame timeout !\n");
441 	__nopoll_conn_close(handler, NULL);
442 }
443 
nopoll_client_task(void * arg)444 static void nopoll_client_task(void *arg)
445 {
446 	ws_handler_t *handler = (ws_handler_t *)arg;
447 	struct netif *netif;
448 	int wait_time = WAIT_MS_MIN;
449 	int ping_count = 0;
450 	bool ping_sent = false;
451 	int ping_time = 0;
452 	nopoll_msg_t msg;
453 	unsigned int msg_size = 0;
454 	bool task_exit = false;
455 	int ret;
456 
457 	netif = netif_find("en1");
458 	if (!netif) {
459 		M_LOGE("find netif failed !\n");
460 		return;
461 	}
462 
463 	while (!task_exit) {
464 		ret = os_queue_recv(&handler->queue, 30, &msg, &msg_size);
465 		if (ret || msg_size != sizeof(nopoll_msg_t)) {
466 			if (!netif_is_up(netif) || !netif_is_link_up(netif)) {
467 				if (handler->ws_conn_state == WS_CONN_STAT_CONNECTED) {
468 					M_LOGE("netif down ! close connection\n");
469 					__nopoll_conn_close(handler, NULL);
470 				}
471 			}
472 
473 			if (__nopoll_conn_try_get_msg(handler)) {
474 				wait_time <<= 1;
475 				if (wait_time > WAIT_MS_MAX)
476 					wait_time = WAIT_MS_MAX;
477 			} else {
478 				wait_time = WAIT_MS_MIN;
479 			}
480 
481 			if (handler->heartbeat_type != 0) {
482 				if (ping_count++ > PING_INTERVAL_COUNT) {
483 					ping_count = 0;
484 					if (!__nopoll_conn_ping_test(handler)) {
485 						if (handler->heartbeat_type == 2)
486 							ping_sent = true;
487 
488 						handler->pong_recv = 0;
489 						ping_time = 0;
490 					}
491 				}
492 
493 				if (ping_sent) {
494 					if (!handler->recv_disable) {
495 						if (ping_time > NOPOLL_PONG_TIMEOUT_MSEC) {
496 							__nopoll_conn_pong_check(handler);
497 							ping_sent = false;
498 							ping_time = 0;
499 						} else {
500 							ping_time += wait_time;
501 						}
502 					} else {
503 						ping_sent = false;
504 						ping_time = 0;
505 					}
506 				}
507 			}
508 
509 			os_msleep(wait_time);
510 			continue;
511 		}
512 
513 		msg_count_dec(handler);
514 
515 		switch (msg.type) {
516 		case NOPOLL_CMD_EXIT:
517 			task_exit = true;
518 			break;
519 		case NOPOLL_CMD_OPEN:
520 			__nopoll_conn_open(handler, msg.data);
521 			break;
522 		case NOPOLL_CMD_SEND_TEXT:
523 			active_time_update(handler);
524 			__nopoll_conn_send_text(handler, msg.data);
525 			break;
526 		case NOPOLL_CMD_SEND_BINARY:
527 			active_time_update(handler);
528 			__nopoll_conn_send_binary(handler, msg.data);
529 			break;
530 		case NOPOLL_CMD_CLOSE:
531 			__nopoll_conn_close(handler, msg.data);
532 			break;
533 		default:
534 			M_LOGW("unknown msg\n");
535 			break;
536 		}
537 	}
538 	os_sem_signal(handler->exit_sem);
539 }
540 
nopoll_client_open(ws_handler_t * handler,ws_conn_info_t * info)541 static int nopoll_client_open(ws_handler_t *handler,
542 	ws_conn_info_t *info)
543 {
544 	nopoll_conn_info_t *conn_info;
545 	nopoll_msg_t msg;
546 	int ret;
547 
548 	if (!handler) {
549 		M_LOGE("ws handler null !\n");
550 		return -1;
551 	}
552 
553 	if (!handler->ctx) {
554 		M_LOGE("nopoll ctx null !\n");
555 		return -1;
556 	}
557 
558 	if (handler->conn) {
559 		M_LOGW("conn already open, close it\n");
560 		nopoll_client_close(handler);
561 	}
562 
563 	conn_info = &handler->conn_info;
564 
565 	if (conn_info->cacert) {
566 		snd_free(conn_info->cacert);
567 		conn_info->cacert = NULL;
568 	}
569 	if (conn_info->path) {
570 		snd_free(conn_info->path);
571 		conn_info->path = NULL;
572 	}
573 
574 	memcpy(conn_info->server, info->server,
575 		strlen(info->server) + 1);
576 	snprintf(conn_info->port, sizeof(conn_info->port),
577 		"%u", info->port);
578 	memcpy(conn_info->schema, info->schema,
579 		strlen(info->schema) + 1);
580 
581 	if (info->cacert) {
582 		conn_info->cacert = snd_zalloc(strlen(info->cacert) + 1,
583 			AFM_EXTN);
584 		if (!conn_info->cacert) {
585 			M_LOGE("alloc cacert failed !\n");
586 			return -1;
587 		}
588 		memcpy(conn_info->cacert, info->cacert,
589 			strlen(info->cacert) + 1);
590 	}
591 
592 	if (info->path) {
593 		conn_info->path = snd_zalloc(strlen(info->path) + 1,
594 			AFM_EXTN);
595 		if (!conn_info->path) {
596 			M_LOGE("alloc path failed !\n");
597 			if (conn_info->cacert) {
598 				snd_free(conn_info->cacert);
599 				conn_info->cacert = NULL;
600 			}
601 			return -1;
602 		}
603 		memcpy(conn_info->path, info->path,
604 			strlen(info->path) + 1);
605 	}
606 
607 	memcpy(&handler->callback, &info->callback,
608 		sizeof(ws_cb_ops_t));
609 
610 	msg.type = NOPOLL_CMD_OPEN;
611 	msg.data = NULL;
612 
613 	ret = os_queue_send(&handler->queue, &msg,
614 		sizeof(nopoll_msg_t));
615 	if (ret) {
616 		M_LOGE("send msg failed %d ! msg count %d\n",
617 			ret, handler->msg_count);
618 		return -1;
619 	}
620 
621 	msg_count_add(handler);
622 	return 0;
623 }
624 
nopoll_client_close(ws_handler_t * handler)625 static int nopoll_client_close(ws_handler_t *handler)
626 {
627 	nopoll_conn_info_t *info;
628 	nopoll_msg_t msg;
629 	int ret;
630 
631 	if (!handler) {
632 		M_LOGE("ws handler null !\n");
633 		return -1;
634 	}
635 
636 	info = &handler->conn_info;
637 	if (info->cacert) {
638 		snd_free(info->cacert);
639 		info->cacert = NULL;
640 	}
641 
642 	if (info->path) {
643 		snd_free(info->path);
644 		info->path = NULL;
645 	}
646 
647 	if (!handler->conn) {
648 		M_LOGW("nopoll not open, ignore\n");
649 		return -1;
650 	}
651 
652 	msg.type = NOPOLL_CMD_CLOSE;
653 	msg.data = NULL;
654 
655 	ret = os_queue_send(&handler->queue, &msg,
656 		sizeof(nopoll_msg_t));
657 	if (ret) {
658 		M_LOGE("send msg failed %d ! msg count %d\n",
659 			ret, handler->msg_count);
660 		return -1;
661 	}
662 
663 	msg_count_add(handler);
664 	return 0;
665 }
666 
nopoll_client_send_text(ws_handler_t * handler,char * text,int len)667 static int nopoll_client_send_text(ws_handler_t *handler,
668 	char *text, int len)
669 {
670 	nopoll_xfer_t *xfer;
671 	nopoll_msg_t msg;
672 	int ret;
673 
674 	if (!handler) {
675 		M_LOGE("ws handler null !\n");
676 		return -1;
677 	}
678 
679 	if (len <= 0) {
680 		M_LOGE("text len %d invalid !\n", len);
681 		return -1;
682 	}
683 
684 	xfer = snd_zalloc(sizeof(nopoll_xfer_t), AFM_EXTN);
685 	if (!xfer) {
686 		M_LOGE("alloc nopoll xfer failed !\n");
687 		return -1;
688 	}
689 
690 	xfer->buff = snd_zalloc(len, AFM_MAIN);
691 	if (!xfer->buff) {
692 		M_LOGE("alloc buffer failed !\n");
693 		snd_free(xfer);
694 		return -1;
695 	}
696 
697 	memcpy(xfer->buff, text, len);
698 	xfer->size = len;
699 
700 	msg.type = NOPOLL_CMD_SEND_TEXT;
701 	msg.data = xfer;
702 
703 	ret = os_queue_send(&handler->queue, &msg,
704 		sizeof(nopoll_msg_t));
705 	if (ret) {
706 		M_LOGE("send msg failed %d ! msg count %d\n",
707 			ret, handler->msg_count);
708 		snd_free(xfer->buff);
709 		snd_free(xfer);
710 		return -1;
711 	}
712 
713 	msg_count_add(handler);
714 	return 0;
715 }
716 
nopoll_client_send_binary(ws_handler_t * handler,void * pdata,int len,ws_bin_type_t type)717 static int nopoll_client_send_binary(ws_handler_t *handler,
718 	void *pdata, int len, ws_bin_type_t type)
719 {
720 	nopoll_xfer_t *xfer;
721 	nopoll_msg_t msg;
722 	int ret;
723 
724 	if (!handler) {
725 		M_LOGE("ws handler null !\n");
726 		return -1;
727 	}
728 
729 	if (len < 0) {
730 		M_LOGE("bin len %d invalid !\n", len);
731 		return -1;
732 	}
733 
734 	xfer = snd_zalloc(sizeof(nopoll_xfer_t), AFM_EXTN);
735 	if (!xfer) {
736 		M_LOGE("alloc nopoll send data failed !\n");
737 		return -1;
738 	}
739 
740 	if (len == 0)
741 		len = 32;
742 
743 	xfer->buff = snd_zalloc(len, AFM_MAIN);
744 	if (!xfer->buff) {
745 		M_LOGE("alloc buffer failed !\n");
746 		snd_free(xfer);
747 		return -1;
748 	}
749 	memcpy(xfer->buff, pdata, len);
750 
751 	xfer->size = len;
752 	xfer->type = type;
753 
754 	msg.type = NOPOLL_CMD_SEND_BINARY;
755 	msg.data = xfer;
756 
757 	ret = os_queue_send(&handler->queue, &msg,
758 		sizeof(nopoll_msg_t));
759 	if (ret) {
760 		M_LOGE("send msg failed %d ! msg count %d\n",
761 			ret, handler->msg_count);
762 		snd_free(xfer->buff);
763 		snd_free(xfer);
764 		return -1;
765 	}
766 
767 	msg_count_add(handler);
768 	return 0;
769 }
770 
nopoll_client_create(ws_handler_t * handler)771 static int nopoll_client_create(ws_handler_t *handler)
772 {
773 	if (!handler)
774 		return -1;
775 
776 	if (handler->ctx) {
777 		M_LOGW("nopoll client exist, ignore\n");
778 		goto __exit;
779 	}
780 
781 	handler->ctx = nopoll_ctx_new();
782 	if (!handler->ctx) {
783 		M_LOGE("create nopoll ctx failed !\n");
784 		return -1;
785 	}
786 
787 	if (os_task_create(&handler->task,
788 		"nopoll_client_task",
789 		nopoll_client_task,
790 		handler,
791 		NOPOLL_CLIENT_TASK_STACK_SIZE,
792 		NOPOLL_CLIENT_TASK_PRIORITY)) {
793 		M_LOGE("create nopoll client task failed !\n");
794 		return -1;
795 	}
796 
797 	M_LOGI("nopoll client create\n");
798 
799 __exit:
800 	return 0;
801 }
802 
nopoll_client_release(ws_handler_t * handler)803 static int nopoll_client_release(ws_handler_t *handler)
804 {
805 	nopoll_msg_t msg;
806 	int ret;
807 
808 	if (!handler) {
809 		M_LOGE("ws handler null !\n");
810 		return -1;
811 	}
812 
813 	nopoll_ctx_unref(handler->ctx);
814 	handler->ctx = NULL;
815 
816 	msg.type = NOPOLL_CMD_EXIT;
817 	msg.data = NULL;
818 
819 	ret = os_queue_send(&handler->queue, &msg,
820 		sizeof(nopoll_msg_t));
821 	if (ret) {
822 		M_LOGE("send msg failed %d ! msg count %d\n",
823 			ret, handler->msg_count);
824 		return -1;
825 	}
826 
827 	msg_count_add(handler);
828 
829 	if (os_sem_wait(handler->exit_sem, 5000)) {
830 		M_LOGE("wait exit sem timeout !\n");
831 		return -1;
832 	}
833 
834 	M_LOGI("nopoll client release\n");
835 	return 0;
836 }
837 
uvoice_ws_connect(ws_conn_info_t * info)838 int uvoice_ws_connect(ws_conn_info_t *info)
839 {
840 	ws_handler_t *handler = g_ws_handler;
841 
842 	if (!handler) {
843 		M_LOGE("ws handler null !\n");
844 		return -1;
845 	}
846 
847 	if (!info) {
848 		M_LOGE("info null !\n");
849 		return -1;
850 	}
851 
852 	if (!info->server) {
853 		M_LOGE("server info null !\n");
854 		return -1;
855 	}
856 
857 	if (handler->ws_conn_state != WS_CONN_STAT_DISCONNECTED) {
858 		M_LOGW("ws connected or connecting, ignore\n");
859 		goto __exit;
860 	}
861 
862 	handler->ws_conn_state = WS_CONN_STAT_CONNECTING;
863 
864 	M_LOGD("server:%s;port:%u;schema:%s;path:%s\n",
865 		info->server,
866 		info->port,
867 		info->schema ? info->schema : "null",
868 		info->path ? info->path : "null");
869 
870 	if (nopoll_client_create(handler)) {
871 		M_LOGE("create nopoll client failed !\n");
872 		handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
873 		return -1;
874 	}
875 
876 	if (nopoll_client_open(handler, info)) {
877 		M_LOGE("open nopoll client failed !\n");
878 		nopoll_client_release(handler);
879 		handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
880 		return -1;
881 	}
882 
883 __exit:
884 	return 0;
885 }
886 
uvoice_ws_disconnect(void)887 int uvoice_ws_disconnect(void)
888 {
889 	ws_handler_t *handler = g_ws_handler;
890 
891 	if (!handler) {
892 		M_LOGE("ws handler null !\n");
893 		return -1;
894 	}
895 
896 	if (handler->ws_conn_state != WS_CONN_STAT_CONNECTED) {
897 		M_LOGE("ws not connected !\n");
898 		return -1;
899 	}
900 
901 	M_LOGD("disconnect ws\n");
902 
903 	nopoll_client_close(handler);
904 
905 	nopoll_client_release(handler);
906 
907 	return 0;
908 }
909 
uvoice_ws_conn_state(ws_conn_state_t * state)910 int uvoice_ws_conn_state(ws_conn_state_t *state)
911 {
912 	ws_handler_t *handler = g_ws_handler;
913 
914 	if (!handler) {
915 		M_LOGE("ws handler null !\n");
916 		return -1;
917 	}
918 
919 	if (!state) {
920 		M_LOGE("arg null !\n");
921 		return -1;
922 	}
923 
924 	*state = handler->ws_conn_state;
925 	return 0;
926 }
927 
uvoice_ws_send_text(char * text,int len)928 int uvoice_ws_send_text(char *text, int len)
929 {
930 	ws_handler_t *handler = g_ws_handler;
931 
932 	if (!handler) {
933 		M_LOGE("ws handler null !\n");
934 		return -1;
935 	}
936 
937 	if (handler->ws_conn_state != WS_CONN_STAT_CONNECTED) {
938 		M_LOGE("ws not connected !\n");
939 		return -1;
940 	}
941 
942 	return nopoll_client_send_text(handler, text, len);
943 }
944 
uvoice_ws_send_binary(char * data,int len,ws_bin_type_t type)945 int uvoice_ws_send_binary(char *data, int len, ws_bin_type_t type)
946 {
947 	ws_handler_t *handler = g_ws_handler;
948 
949 	if (!handler) {
950 		M_LOGE("ws handler null !\n");
951 		return -1;
952 	}
953 
954 	if (handler->ws_conn_state != WS_CONN_STAT_CONNECTED) {
955 		M_LOGE("ws not connected !\n");
956 		return -1;
957 	}
958 
959 	return nopoll_client_send_binary(handler, data, len, type);
960 }
961 
uvoice_ws_recv_disable(int disable)962 int uvoice_ws_recv_disable(int disable)
963 {
964 	ws_handler_t *handler = g_ws_handler;
965 
966 	if (!handler) {
967 		M_LOGE("ws handler null !\n");
968 		return -1;
969 	}
970 
971 	if (disable)
972 		handler->recv_disable = 1;
973 	else
974 		handler->recv_disable = 0;
975 
976 	M_LOGD("set recv_disable %d\n", handler->recv_disable);
977 	return 0;
978 }
979 
980 /*
981  * ping-pong heartbeat type:
982  * 0 - disable
983  * 1 - send ping frame but don't check pong frame
984  * 2 - send ping frame and check pong frame
985  */
uvoice_ws_heartbeat_set(int type)986 int uvoice_ws_heartbeat_set(int type)
987 {
988 	ws_handler_t *handler = g_ws_handler;
989 
990 	if (!handler) {
991 		M_LOGE("ws handler null !\n");
992 		return -1;
993 	}
994 
995 	if (type < 0 || type > 2) {
996 		M_LOGE("type %d invalid !\n", type);
997 		return -1;
998 	}
999 
1000 	if (handler->heartbeat_type != type) {
1001 		handler->heartbeat_type = type;
1002 		M_LOGD("set heartbeat_type %d\n",
1003 			handler->heartbeat_type);
1004 	}
1005 	return 0;
1006 }
1007 
uvoice_ws_init(void)1008 int uvoice_ws_init(void)
1009 {
1010 	ws_handler_t *handler = g_ws_handler;
1011 	int ret;
1012 
1013 	if (handler)
1014 		return 0;
1015 
1016 	handler = snd_zalloc(sizeof(ws_handler_t), AFM_EXTN);
1017 	if (!handler) {
1018 		M_LOGE("alloc nopoll handler failed !\n");
1019 		return -1;
1020 	}
1021 
1022 	handler->msg_buffer = snd_zalloc(sizeof(nopoll_msg_t) * 8,
1023 		AFM_MAIN);
1024 	if (!handler->msg_buffer) {
1025 		M_LOGE("alloc msg buffer failed !\n");
1026 		snd_free(handler);
1027 		return -1;
1028 	}
1029 
1030 	ret = os_queue_new(&handler->queue, handler->msg_buffer,
1031 		sizeof(nopoll_msg_t) * 8,
1032 		sizeof(nopoll_msg_t));
1033 	if (ret) {
1034 		M_LOGE("create event queue failed %d!\n", ret);
1035 		snd_free(handler->msg_buffer);
1036 		snd_free(handler);
1037 		return -1;
1038 	}
1039 
1040 	handler->exit_sem = os_sem_new(0);
1041 	handler->lock = os_mutex_new();
1042 
1043 	handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
1044 
1045 	g_ws_handler = handler;
1046 
1047 	M_LOGI("ws init\n");
1048 	return 0;
1049 }
1050 
uvoice_ws_deinit(void)1051 int uvoice_ws_deinit(void)
1052 {
1053 	ws_handler_t *handler = g_ws_handler;
1054 
1055 	if (!handler) {
1056 		M_LOGE("ws handler null !\n");
1057 		return -1;
1058 	}
1059 
1060 	if (handler->ws_conn_state != WS_CONN_STAT_DISCONNECTED)
1061 		uvoice_ws_disconnect();
1062 
1063 	os_mutex_free(handler->lock);
1064 	os_sem_free(handler->exit_sem);
1065 	os_queue_free(&handler->queue);
1066 	snd_free(handler->msg_buffer);
1067 	snd_free(handler);
1068 	g_ws_handler = NULL;
1069 
1070 	M_LOGI("ws free\n");
1071 	return 0;
1072 }
1073 
1074