1
2 /*
3 * Copyright (C) 2015-2020 Alibaba Group Holding Limited
4 *
5 */
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <stdint.h>
10 #include <string.h>
11 #include <signal.h>
12 #include <stdlib.h>
13
14 #include "uvoice_os.h"
15 #include "uvoice_ws.h"
16
17 #include "nopoll.h"
18 #include "lwip/netif.h"
19
20
21 #ifdef UVOICE_ON_BK7251
22 #define NOPOLL_CLIENT_TASK_STACK_SIZE 8192
23 #else
24 #define NOPOLL_CLIENT_TASK_STACK_SIZE 4096
25 #endif
26 #define NOPOLL_CLIENT_TASK_PRIORITY UVOICE_TASK_PRI_LOWER
27
28 #define WAIT_MS_MAX 200
29 #define WAIT_MS_MIN 20
30 #define PING_INTERVAL_COUNT 100 /* max: WAIT_MS_MAX * PING_INTERVAL_COUNT < 45 seconds */
31
32 #define NOPOLL_PING_PERIOD_SEC 20
33 #define NOPOLL_PONG_TIMEOUT_MSEC 10000 /* keep NOPOLL_PONG_TIMEOUT_MSEC < NOPOLL_PING_PERIOD_SEC */
34
35 #define MERGE_TEXT_FRAME_SUPPORT 1
36
37 enum {
38 NOPOLL_CMD_EXIT,
39 NOPOLL_CMD_OPEN,
40 NOPOLL_CMD_SEND_TEXT,
41 NOPOLL_CMD_SEND_BINARY,
42 NOPOLL_CMD_CLOSE
43 };
44
45 typedef struct {
46 int type;
47 void *data;
48 } nopoll_msg_t;
49
50 typedef struct {
51 uint8_t *buff;
52 uint32_t size;
53 ws_bin_type_t type;
54 } nopoll_xfer_t;
55
56 typedef struct {
57 char server[128];
58 char port[8];
59 char schema[8];
60 char *cacert;
61 char *path;
62 } nopoll_conn_info_t;
63
64 typedef struct {
65 noPollCtx *ctx;
66 noPollConn *conn;
67 noPollConnOpts *conn_opts;
68 nopoll_conn_info_t conn_info;
69 char *msg_buffer;
70 int msg_count;
71 char *prev_text;
72 int prev_text_size;
73 uint8_t recv_disable:1;
74 uint8_t pong_recv:1;
75 int heartbeat_type;
76 int prev_recv_type;
77 os_mutex_t lock;
78 os_sem_t exit_sem;
79 os_task_t task;
80 os_queue_t queue;
81 ws_cb_ops_t callback;
82 ws_conn_state_t ws_conn_state;
83 long long conn_active_time;
84 } ws_handler_t;
85
86 static ws_handler_t *g_ws_handler;
87
88 static int nopoll_client_close(ws_handler_t *handler);
89
active_time_update(ws_handler_t * handler)90 static void active_time_update(ws_handler_t *handler)
91 {
92 handler->conn_active_time = os_current_time();
93 }
94
active_time_get(ws_handler_t * handler)95 static int active_time_get(ws_handler_t *handler)
96 {
97 return handler->conn_active_time / 1000;
98 }
99
msg_count_add(ws_handler_t * handler)100 static void inline msg_count_add(ws_handler_t *handler)
101 {
102 os_mutex_lock(handler->lock, OS_WAIT_FOREVER);
103 handler->msg_count++;
104 os_mutex_unlock(handler->lock);
105 }
106
msg_count_dec(ws_handler_t * handler)107 static void inline msg_count_dec(ws_handler_t *handler)
108 {
109 os_mutex_lock(handler->lock, OS_WAIT_FOREVER);
110 handler->msg_count--;
111 os_mutex_unlock(handler->lock);
112 }
113
nopoll_message_handle(ws_handler_t * handler,noPollMsg * msg)114 static void nopoll_message_handle(ws_handler_t *handler, noPollMsg *msg)
115 {
116 char *content;
117 noPollOpCode type;
118 int size;
119 bool final;
120
121 size = nopoll_msg_get_payload_size(msg);
122 type = nopoll_msg_opcode(msg);
123 final = nopoll_msg_is_final(msg);
124 content = (char *)nopoll_msg_get_payload(msg);
125
126 #if MERGE_TEXT_FRAME_SUPPORT
127 if (handler->prev_text_size > 0 && handler->prev_text) {
128 if (size > 0 && type == NOPOLL_TEXT_FRAME) {
129 handler->prev_text = snd_realloc(handler->prev_text,
130 handler->prev_text_size + size, AFM_EXTN);
131 if (!handler->prev_text) {
132 M_LOGE("realloc prev text failed !\n");
133 handler->prev_text_size = 0;
134 return;
135 }
136 memcpy(handler->prev_text + handler->prev_text_size,
137 content, size);
138 handler->prev_text_size += size;
139 if (final) {
140 handler->callback.recv_text_cb(handler->prev_text,
141 handler->prev_text_size);
142 snd_free(handler->prev_text);
143 handler->prev_text = NULL;
144 handler->prev_text_size = 0;
145 }
146 return;
147 } else {
148 M_LOGD("expect text frame, receive type %d\n", type);
149 handler->callback.recv_text_cb(handler->prev_text,
150 handler->prev_text_size);
151 snd_free(handler->prev_text);
152 handler->prev_text = NULL;
153 handler->prev_text_size = 0;
154 }
155 } else if (size > 0 && type == NOPOLL_TEXT_FRAME && !final) {
156 if (handler->prev_text) {
157 snd_free(handler->prev_text);
158 handler->prev_text_size = 0;
159 }
160
161 handler->prev_text = snd_zalloc(size, AFM_EXTN);
162 if (!handler->prev_text) {
163 M_LOGE("alloc prev text buffer failed !\n");
164 return;
165 }
166 handler->prev_text_size = size;
167 memcpy(handler->prev_text, content, size);
168 return;
169 }
170 #endif
171
172 switch (type) {
173 case NOPOLL_TEXT_FRAME:
174 handler->prev_recv_type = NOPOLL_TEXT_FRAME;
175 handler->callback.recv_text_cb(content, size);
176 break;
177 case NOPOLL_BINARY_FRAME:
178 handler->prev_recv_type = NOPOLL_BINARY_FRAME;
179 if (final) {
180 handler->callback.recv_binary_cb(content,
181 size, WS_BIN_DATA_FINISH);
182 } else {
183 handler->callback.recv_binary_cb(content,
184 size, WS_BIN_DATA_START);
185 }
186 break;
187 case NOPOLL_CONTINUATION_FRAME:
188 if (handler->prev_recv_type == NOPOLL_TEXT_FRAME) {
189 handler->callback.recv_text_cb(content, size);
190 } else {
191 if (final)
192 handler->callback.recv_binary_cb(content,
193 size, WS_BIN_DATA_FINISH);
194 else
195 handler->callback.recv_binary_cb(content,
196 size, WS_BIN_DATA_CONTINUE);
197 }
198 break;
199 case NOPOLL_PONG_FRAME:
200 handler->pong_recv = 1;
201 break;
202 default:
203 break;
204 }
205 }
206
__nopoll_conn_open(ws_handler_t * handler,nopoll_xfer_t * data)207 static void __nopoll_conn_open(ws_handler_t *handler,
208 nopoll_xfer_t *data)
209 {
210 nopoll_conn_info_t *conn_info;
211
212 if (!handler) {
213 M_LOGE("ws handler null !\n");
214 return;
215 }
216
217 handler->conn_opts = nopoll_conn_opts_new();
218 if (!handler->conn_opts) {
219 M_LOGE("create connect opts failed !\n");
220 goto __err_exit;
221 }
222
223 conn_info = &handler->conn_info;
224
225 if (!conn_info->cacert) {
226 if (conn_info->path)
227 handler->conn = nopoll_conn_new_opts(handler->ctx,
228 handler->conn_opts,
229 conn_info->server,
230 conn_info->port,
231 NULL,
232 conn_info->path,
233 NULL, NULL);
234 else
235 handler->conn = nopoll_conn_new(handler->ctx,
236 conn_info->server,
237 conn_info->port,
238 conn_info->server,
239 NULL, NULL, NULL);
240 } else {
241 if (!nopoll_conn_opts_set_ssl_certs(handler->conn_opts,
242 NULL, 0,
243 NULL, 0,
244 NULL, 0,
245 conn_info->cacert,
246 strlen(conn_info->cacert) + 1)) {
247 M_LOGE("set ssl certs failed !\n");
248 goto __err_exit;
249 }
250
251 nopoll_conn_opts_ssl_peer_verify(handler->conn_opts,
252 nopoll_false);
253 handler->conn = nopoll_conn_tls_new(handler->ctx,
254 handler->conn_opts, conn_info->server,
255 conn_info->port,
256 NULL,
257 conn_info->path,
258 NULL, NULL);
259 }
260
261 if (!handler->conn) {
262 M_LOGE("create nopoll connection failed !\n");
263 handler->conn_opts = NULL;
264 goto __err_exit;
265 }
266
267 if (!nopoll_conn_wait_until_connection_ready(
268 handler->conn, 3)) {
269 M_LOGE("connection timeout !\n");
270 goto __err_exit;
271 }
272
273 handler->ws_conn_state = WS_CONN_STAT_CONNECTED;
274 handler->callback.connect_cb();
275 return;
276
277 __err_exit:
278 nopoll_conn_unref(handler->conn);
279 nopoll_conn_opts_free(handler->conn_opts);
280 handler->conn = NULL;
281 handler->conn_opts = NULL;
282 handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
283 }
284
__nopoll_conn_close(ws_handler_t * handler,void * data)285 static void __nopoll_conn_close(ws_handler_t *handler, void *data)
286 {
287 if (!handler) {
288 M_LOGE("ws handler null !\n");
289 return -1;
290 }
291
292 nopoll_conn_close(handler->conn);
293 handler->conn = NULL;
294 handler->conn_opts = NULL;
295
296 handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
297 handler->callback.disconnect_cb();
298 }
299
__nopoll_conn_send_continue(noPollConn * conn,const char * content,long length)300 static int __nopoll_conn_send_continue(noPollConn * conn,
301 const char *content, long length)
302 {
303 return __nopoll_conn_send_common(conn,
304 content,
305 length,
306 nopoll_true,
307 0,
308 NOPOLL_CONTINUATION_FRAME);
309 }
310
__nopoll_conn_send_continue_fragment(noPollConn * conn,const char * content,long length)311 static int __nopoll_conn_send_continue_fragment(noPollConn * conn,
312 const char *content, long length)
313 {
314 return __nopoll_conn_send_common(conn,
315 content,
316 length,
317 nopoll_false,
318 0,
319 NOPOLL_CONTINUATION_FRAME);
320 }
321
__nopoll_conn_complete_pending_write(noPollConn * conn)322 static int __nopoll_conn_complete_pending_write(noPollConn *conn)
323 {
324 int try_times = 0;
325 while (try_times < 5 && errno == NOPOLL_EWOULDBLOCK &&
326 nopoll_conn_pending_write_bytes(conn) > 0) {
327 os_msleep(50);
328 if (nopoll_conn_complete_pending_write(conn) == 0)
329 return 0;
330 try_times++;
331 }
332 return 1;
333 }
334
__nopoll_conn_send_text(ws_handler_t * handler,nopoll_xfer_t * data)335 static void __nopoll_conn_send_text(ws_handler_t *handler,
336 nopoll_xfer_t *data)
337 {
338 int ret;
339
340 ret = nopoll_conn_send_text(handler->conn,
341 (char *)data->buff, data->size);
342 if (ret != data->size) {
343 if (__nopoll_conn_complete_pending_write(handler->conn))
344 M_LOGE("size %u ret %d\n", data->size, ret);
345 }
346
347 snd_free(data->buff);
348 snd_free(data);
349 }
350
__nopoll_conn_send_binary(ws_handler_t * handler,nopoll_xfer_t * data)351 static void __nopoll_conn_send_binary(ws_handler_t *handler,
352 nopoll_xfer_t *data)
353 {
354 int ret = 0;
355
356 switch (data->type) {
357 case WS_BIN_DATA_START:
358 ret = nopoll_conn_send_binary_fragment(handler->conn,
359 (char *)data->buff, data->size);
360 break;
361 case WS_BIN_DATA_CONTINUE:
362 ret = __nopoll_conn_send_continue_fragment(handler->conn,
363 (char *)data->buff, data->size);
364 break;
365 case WS_BIN_DATA_FINISH:
366 ret = __nopoll_conn_send_continue(handler->conn,
367 (char *)data->buff, data->size);
368 break;
369 default:
370 M_LOGE("unknown data type\n");
371 break;
372 }
373
374 if (ret != data->size) {
375 if (__nopoll_conn_complete_pending_write(handler->conn))
376 M_LOGE("size %u, ret %d\n", data->size, ret);
377 }
378
379 snd_free(data->buff);
380 snd_free(data);
381 }
382
__nopoll_conn_try_get_msg(ws_handler_t * handler)383 static int __nopoll_conn_try_get_msg(ws_handler_t *handler)
384 {
385 noPollMsg *msg;
386
387 if (handler->recv_disable)
388 return -1;
389
390 if (!nopoll_conn_is_ok(handler->conn)) {
391 if (handler->ws_conn_state == WS_CONN_STAT_CONNECTED) {
392 M_LOGE("connect down, close nopoll client\n");
393 __nopoll_conn_close(handler, NULL);
394 }
395 return -1;
396 }
397
398 msg = nopoll_conn_get_msg(handler->conn);
399 if (!msg)
400 return -1;
401
402 active_time_update(handler);
403
404 nopoll_message_handle(handler, msg);
405
406 nopoll_msg_unref(msg);
407 return 0;
408 }
409
__nopoll_conn_ping_test(ws_handler_t * handler)410 static int __nopoll_conn_ping_test(ws_handler_t *handler)
411 {
412 long long time_active;
413 long long time_now;
414
415 if (!nopoll_conn_is_ok(handler->conn)) {
416 if (handler->ws_conn_state == WS_CONN_STAT_CONNECTED) {
417 M_LOGE("connection is closed\n");
418 __nopoll_conn_close(handler, NULL);
419 }
420 return -1;
421 }
422
423 time_now = os_current_time();
424 time_now /= 1000;
425
426 time_active = active_time_get(handler);
427
428 if (time_now <= (time_active + NOPOLL_PING_PERIOD_SEC))
429 return -1;
430
431 nopoll_conn_send_ping(handler->conn);
432 active_time_update(handler);
433 return 0;
434 }
435
__nopoll_conn_pong_check(ws_handler_t * handler)436 static void __nopoll_conn_pong_check(ws_handler_t *handler)
437 {
438 if (handler->pong_recv)
439 return;
440 M_LOGE("recv pong frame timeout !\n");
441 __nopoll_conn_close(handler, NULL);
442 }
443
nopoll_client_task(void * arg)444 static void nopoll_client_task(void *arg)
445 {
446 ws_handler_t *handler = (ws_handler_t *)arg;
447 struct netif *netif;
448 int wait_time = WAIT_MS_MIN;
449 int ping_count = 0;
450 bool ping_sent = false;
451 int ping_time = 0;
452 nopoll_msg_t msg;
453 unsigned int msg_size = 0;
454 bool task_exit = false;
455 int ret;
456
457 netif = netif_find("en1");
458 if (!netif) {
459 M_LOGE("find netif failed !\n");
460 return;
461 }
462
463 while (!task_exit) {
464 ret = os_queue_recv(&handler->queue, 30, &msg, &msg_size);
465 if (ret || msg_size != sizeof(nopoll_msg_t)) {
466 if (!netif_is_up(netif) || !netif_is_link_up(netif)) {
467 if (handler->ws_conn_state == WS_CONN_STAT_CONNECTED) {
468 M_LOGE("netif down ! close connection\n");
469 __nopoll_conn_close(handler, NULL);
470 }
471 }
472
473 if (__nopoll_conn_try_get_msg(handler)) {
474 wait_time <<= 1;
475 if (wait_time > WAIT_MS_MAX)
476 wait_time = WAIT_MS_MAX;
477 } else {
478 wait_time = WAIT_MS_MIN;
479 }
480
481 if (handler->heartbeat_type != 0) {
482 if (ping_count++ > PING_INTERVAL_COUNT) {
483 ping_count = 0;
484 if (!__nopoll_conn_ping_test(handler)) {
485 if (handler->heartbeat_type == 2)
486 ping_sent = true;
487
488 handler->pong_recv = 0;
489 ping_time = 0;
490 }
491 }
492
493 if (ping_sent) {
494 if (!handler->recv_disable) {
495 if (ping_time > NOPOLL_PONG_TIMEOUT_MSEC) {
496 __nopoll_conn_pong_check(handler);
497 ping_sent = false;
498 ping_time = 0;
499 } else {
500 ping_time += wait_time;
501 }
502 } else {
503 ping_sent = false;
504 ping_time = 0;
505 }
506 }
507 }
508
509 os_msleep(wait_time);
510 continue;
511 }
512
513 msg_count_dec(handler);
514
515 switch (msg.type) {
516 case NOPOLL_CMD_EXIT:
517 task_exit = true;
518 break;
519 case NOPOLL_CMD_OPEN:
520 __nopoll_conn_open(handler, msg.data);
521 break;
522 case NOPOLL_CMD_SEND_TEXT:
523 active_time_update(handler);
524 __nopoll_conn_send_text(handler, msg.data);
525 break;
526 case NOPOLL_CMD_SEND_BINARY:
527 active_time_update(handler);
528 __nopoll_conn_send_binary(handler, msg.data);
529 break;
530 case NOPOLL_CMD_CLOSE:
531 __nopoll_conn_close(handler, msg.data);
532 break;
533 default:
534 M_LOGW("unknown msg\n");
535 break;
536 }
537 }
538 os_sem_signal(handler->exit_sem);
539 }
540
nopoll_client_open(ws_handler_t * handler,ws_conn_info_t * info)541 static int nopoll_client_open(ws_handler_t *handler,
542 ws_conn_info_t *info)
543 {
544 nopoll_conn_info_t *conn_info;
545 nopoll_msg_t msg;
546 int ret;
547
548 if (!handler) {
549 M_LOGE("ws handler null !\n");
550 return -1;
551 }
552
553 if (!handler->ctx) {
554 M_LOGE("nopoll ctx null !\n");
555 return -1;
556 }
557
558 if (handler->conn) {
559 M_LOGW("conn already open, close it\n");
560 nopoll_client_close(handler);
561 }
562
563 conn_info = &handler->conn_info;
564
565 if (conn_info->cacert) {
566 snd_free(conn_info->cacert);
567 conn_info->cacert = NULL;
568 }
569 if (conn_info->path) {
570 snd_free(conn_info->path);
571 conn_info->path = NULL;
572 }
573
574 memcpy(conn_info->server, info->server,
575 strlen(info->server) + 1);
576 snprintf(conn_info->port, sizeof(conn_info->port),
577 "%u", info->port);
578 memcpy(conn_info->schema, info->schema,
579 strlen(info->schema) + 1);
580
581 if (info->cacert) {
582 conn_info->cacert = snd_zalloc(strlen(info->cacert) + 1,
583 AFM_EXTN);
584 if (!conn_info->cacert) {
585 M_LOGE("alloc cacert failed !\n");
586 return -1;
587 }
588 memcpy(conn_info->cacert, info->cacert,
589 strlen(info->cacert) + 1);
590 }
591
592 if (info->path) {
593 conn_info->path = snd_zalloc(strlen(info->path) + 1,
594 AFM_EXTN);
595 if (!conn_info->path) {
596 M_LOGE("alloc path failed !\n");
597 if (conn_info->cacert) {
598 snd_free(conn_info->cacert);
599 conn_info->cacert = NULL;
600 }
601 return -1;
602 }
603 memcpy(conn_info->path, info->path,
604 strlen(info->path) + 1);
605 }
606
607 memcpy(&handler->callback, &info->callback,
608 sizeof(ws_cb_ops_t));
609
610 msg.type = NOPOLL_CMD_OPEN;
611 msg.data = NULL;
612
613 ret = os_queue_send(&handler->queue, &msg,
614 sizeof(nopoll_msg_t));
615 if (ret) {
616 M_LOGE("send msg failed %d ! msg count %d\n",
617 ret, handler->msg_count);
618 return -1;
619 }
620
621 msg_count_add(handler);
622 return 0;
623 }
624
nopoll_client_close(ws_handler_t * handler)625 static int nopoll_client_close(ws_handler_t *handler)
626 {
627 nopoll_conn_info_t *info;
628 nopoll_msg_t msg;
629 int ret;
630
631 if (!handler) {
632 M_LOGE("ws handler null !\n");
633 return -1;
634 }
635
636 info = &handler->conn_info;
637 if (info->cacert) {
638 snd_free(info->cacert);
639 info->cacert = NULL;
640 }
641
642 if (info->path) {
643 snd_free(info->path);
644 info->path = NULL;
645 }
646
647 if (!handler->conn) {
648 M_LOGW("nopoll not open, ignore\n");
649 return -1;
650 }
651
652 msg.type = NOPOLL_CMD_CLOSE;
653 msg.data = NULL;
654
655 ret = os_queue_send(&handler->queue, &msg,
656 sizeof(nopoll_msg_t));
657 if (ret) {
658 M_LOGE("send msg failed %d ! msg count %d\n",
659 ret, handler->msg_count);
660 return -1;
661 }
662
663 msg_count_add(handler);
664 return 0;
665 }
666
nopoll_client_send_text(ws_handler_t * handler,char * text,int len)667 static int nopoll_client_send_text(ws_handler_t *handler,
668 char *text, int len)
669 {
670 nopoll_xfer_t *xfer;
671 nopoll_msg_t msg;
672 int ret;
673
674 if (!handler) {
675 M_LOGE("ws handler null !\n");
676 return -1;
677 }
678
679 if (len <= 0) {
680 M_LOGE("text len %d invalid !\n", len);
681 return -1;
682 }
683
684 xfer = snd_zalloc(sizeof(nopoll_xfer_t), AFM_EXTN);
685 if (!xfer) {
686 M_LOGE("alloc nopoll xfer failed !\n");
687 return -1;
688 }
689
690 xfer->buff = snd_zalloc(len, AFM_MAIN);
691 if (!xfer->buff) {
692 M_LOGE("alloc buffer failed !\n");
693 snd_free(xfer);
694 return -1;
695 }
696
697 memcpy(xfer->buff, text, len);
698 xfer->size = len;
699
700 msg.type = NOPOLL_CMD_SEND_TEXT;
701 msg.data = xfer;
702
703 ret = os_queue_send(&handler->queue, &msg,
704 sizeof(nopoll_msg_t));
705 if (ret) {
706 M_LOGE("send msg failed %d ! msg count %d\n",
707 ret, handler->msg_count);
708 snd_free(xfer->buff);
709 snd_free(xfer);
710 return -1;
711 }
712
713 msg_count_add(handler);
714 return 0;
715 }
716
nopoll_client_send_binary(ws_handler_t * handler,void * pdata,int len,ws_bin_type_t type)717 static int nopoll_client_send_binary(ws_handler_t *handler,
718 void *pdata, int len, ws_bin_type_t type)
719 {
720 nopoll_xfer_t *xfer;
721 nopoll_msg_t msg;
722 int ret;
723
724 if (!handler) {
725 M_LOGE("ws handler null !\n");
726 return -1;
727 }
728
729 if (len < 0) {
730 M_LOGE("bin len %d invalid !\n", len);
731 return -1;
732 }
733
734 xfer = snd_zalloc(sizeof(nopoll_xfer_t), AFM_EXTN);
735 if (!xfer) {
736 M_LOGE("alloc nopoll send data failed !\n");
737 return -1;
738 }
739
740 if (len == 0)
741 len = 32;
742
743 xfer->buff = snd_zalloc(len, AFM_MAIN);
744 if (!xfer->buff) {
745 M_LOGE("alloc buffer failed !\n");
746 snd_free(xfer);
747 return -1;
748 }
749 memcpy(xfer->buff, pdata, len);
750
751 xfer->size = len;
752 xfer->type = type;
753
754 msg.type = NOPOLL_CMD_SEND_BINARY;
755 msg.data = xfer;
756
757 ret = os_queue_send(&handler->queue, &msg,
758 sizeof(nopoll_msg_t));
759 if (ret) {
760 M_LOGE("send msg failed %d ! msg count %d\n",
761 ret, handler->msg_count);
762 snd_free(xfer->buff);
763 snd_free(xfer);
764 return -1;
765 }
766
767 msg_count_add(handler);
768 return 0;
769 }
770
nopoll_client_create(ws_handler_t * handler)771 static int nopoll_client_create(ws_handler_t *handler)
772 {
773 if (!handler)
774 return -1;
775
776 if (handler->ctx) {
777 M_LOGW("nopoll client exist, ignore\n");
778 goto __exit;
779 }
780
781 handler->ctx = nopoll_ctx_new();
782 if (!handler->ctx) {
783 M_LOGE("create nopoll ctx failed !\n");
784 return -1;
785 }
786
787 if (os_task_create(&handler->task,
788 "nopoll_client_task",
789 nopoll_client_task,
790 handler,
791 NOPOLL_CLIENT_TASK_STACK_SIZE,
792 NOPOLL_CLIENT_TASK_PRIORITY)) {
793 M_LOGE("create nopoll client task failed !\n");
794 return -1;
795 }
796
797 M_LOGI("nopoll client create\n");
798
799 __exit:
800 return 0;
801 }
802
nopoll_client_release(ws_handler_t * handler)803 static int nopoll_client_release(ws_handler_t *handler)
804 {
805 nopoll_msg_t msg;
806 int ret;
807
808 if (!handler) {
809 M_LOGE("ws handler null !\n");
810 return -1;
811 }
812
813 nopoll_ctx_unref(handler->ctx);
814 handler->ctx = NULL;
815
816 msg.type = NOPOLL_CMD_EXIT;
817 msg.data = NULL;
818
819 ret = os_queue_send(&handler->queue, &msg,
820 sizeof(nopoll_msg_t));
821 if (ret) {
822 M_LOGE("send msg failed %d ! msg count %d\n",
823 ret, handler->msg_count);
824 return -1;
825 }
826
827 msg_count_add(handler);
828
829 if (os_sem_wait(handler->exit_sem, 5000)) {
830 M_LOGE("wait exit sem timeout !\n");
831 return -1;
832 }
833
834 M_LOGI("nopoll client release\n");
835 return 0;
836 }
837
uvoice_ws_connect(ws_conn_info_t * info)838 int uvoice_ws_connect(ws_conn_info_t *info)
839 {
840 ws_handler_t *handler = g_ws_handler;
841
842 if (!handler) {
843 M_LOGE("ws handler null !\n");
844 return -1;
845 }
846
847 if (!info) {
848 M_LOGE("info null !\n");
849 return -1;
850 }
851
852 if (!info->server) {
853 M_LOGE("server info null !\n");
854 return -1;
855 }
856
857 if (handler->ws_conn_state != WS_CONN_STAT_DISCONNECTED) {
858 M_LOGW("ws connected or connecting, ignore\n");
859 goto __exit;
860 }
861
862 handler->ws_conn_state = WS_CONN_STAT_CONNECTING;
863
864 M_LOGD("server:%s;port:%u;schema:%s;path:%s\n",
865 info->server,
866 info->port,
867 info->schema ? info->schema : "null",
868 info->path ? info->path : "null");
869
870 if (nopoll_client_create(handler)) {
871 M_LOGE("create nopoll client failed !\n");
872 handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
873 return -1;
874 }
875
876 if (nopoll_client_open(handler, info)) {
877 M_LOGE("open nopoll client failed !\n");
878 nopoll_client_release(handler);
879 handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
880 return -1;
881 }
882
883 __exit:
884 return 0;
885 }
886
uvoice_ws_disconnect(void)887 int uvoice_ws_disconnect(void)
888 {
889 ws_handler_t *handler = g_ws_handler;
890
891 if (!handler) {
892 M_LOGE("ws handler null !\n");
893 return -1;
894 }
895
896 if (handler->ws_conn_state != WS_CONN_STAT_CONNECTED) {
897 M_LOGE("ws not connected !\n");
898 return -1;
899 }
900
901 M_LOGD("disconnect ws\n");
902
903 nopoll_client_close(handler);
904
905 nopoll_client_release(handler);
906
907 return 0;
908 }
909
uvoice_ws_conn_state(ws_conn_state_t * state)910 int uvoice_ws_conn_state(ws_conn_state_t *state)
911 {
912 ws_handler_t *handler = g_ws_handler;
913
914 if (!handler) {
915 M_LOGE("ws handler null !\n");
916 return -1;
917 }
918
919 if (!state) {
920 M_LOGE("arg null !\n");
921 return -1;
922 }
923
924 *state = handler->ws_conn_state;
925 return 0;
926 }
927
uvoice_ws_send_text(char * text,int len)928 int uvoice_ws_send_text(char *text, int len)
929 {
930 ws_handler_t *handler = g_ws_handler;
931
932 if (!handler) {
933 M_LOGE("ws handler null !\n");
934 return -1;
935 }
936
937 if (handler->ws_conn_state != WS_CONN_STAT_CONNECTED) {
938 M_LOGE("ws not connected !\n");
939 return -1;
940 }
941
942 return nopoll_client_send_text(handler, text, len);
943 }
944
uvoice_ws_send_binary(char * data,int len,ws_bin_type_t type)945 int uvoice_ws_send_binary(char *data, int len, ws_bin_type_t type)
946 {
947 ws_handler_t *handler = g_ws_handler;
948
949 if (!handler) {
950 M_LOGE("ws handler null !\n");
951 return -1;
952 }
953
954 if (handler->ws_conn_state != WS_CONN_STAT_CONNECTED) {
955 M_LOGE("ws not connected !\n");
956 return -1;
957 }
958
959 return nopoll_client_send_binary(handler, data, len, type);
960 }
961
uvoice_ws_recv_disable(int disable)962 int uvoice_ws_recv_disable(int disable)
963 {
964 ws_handler_t *handler = g_ws_handler;
965
966 if (!handler) {
967 M_LOGE("ws handler null !\n");
968 return -1;
969 }
970
971 if (disable)
972 handler->recv_disable = 1;
973 else
974 handler->recv_disable = 0;
975
976 M_LOGD("set recv_disable %d\n", handler->recv_disable);
977 return 0;
978 }
979
980 /*
981 * ping-pong heartbeat type:
982 * 0 - disable
983 * 1 - send ping frame but don't check pong frame
984 * 2 - send ping frame and check pong frame
985 */
uvoice_ws_heartbeat_set(int type)986 int uvoice_ws_heartbeat_set(int type)
987 {
988 ws_handler_t *handler = g_ws_handler;
989
990 if (!handler) {
991 M_LOGE("ws handler null !\n");
992 return -1;
993 }
994
995 if (type < 0 || type > 2) {
996 M_LOGE("type %d invalid !\n", type);
997 return -1;
998 }
999
1000 if (handler->heartbeat_type != type) {
1001 handler->heartbeat_type = type;
1002 M_LOGD("set heartbeat_type %d\n",
1003 handler->heartbeat_type);
1004 }
1005 return 0;
1006 }
1007
uvoice_ws_init(void)1008 int uvoice_ws_init(void)
1009 {
1010 ws_handler_t *handler = g_ws_handler;
1011 int ret;
1012
1013 if (handler)
1014 return 0;
1015
1016 handler = snd_zalloc(sizeof(ws_handler_t), AFM_EXTN);
1017 if (!handler) {
1018 M_LOGE("alloc nopoll handler failed !\n");
1019 return -1;
1020 }
1021
1022 handler->msg_buffer = snd_zalloc(sizeof(nopoll_msg_t) * 8,
1023 AFM_MAIN);
1024 if (!handler->msg_buffer) {
1025 M_LOGE("alloc msg buffer failed !\n");
1026 snd_free(handler);
1027 return -1;
1028 }
1029
1030 ret = os_queue_new(&handler->queue, handler->msg_buffer,
1031 sizeof(nopoll_msg_t) * 8,
1032 sizeof(nopoll_msg_t));
1033 if (ret) {
1034 M_LOGE("create event queue failed %d!\n", ret);
1035 snd_free(handler->msg_buffer);
1036 snd_free(handler);
1037 return -1;
1038 }
1039
1040 handler->exit_sem = os_sem_new(0);
1041 handler->lock = os_mutex_new();
1042
1043 handler->ws_conn_state = WS_CONN_STAT_DISCONNECTED;
1044
1045 g_ws_handler = handler;
1046
1047 M_LOGI("ws init\n");
1048 return 0;
1049 }
1050
uvoice_ws_deinit(void)1051 int uvoice_ws_deinit(void)
1052 {
1053 ws_handler_t *handler = g_ws_handler;
1054
1055 if (!handler) {
1056 M_LOGE("ws handler null !\n");
1057 return -1;
1058 }
1059
1060 if (handler->ws_conn_state != WS_CONN_STAT_DISCONNECTED)
1061 uvoice_ws_disconnect();
1062
1063 os_mutex_free(handler->lock);
1064 os_sem_free(handler->exit_sem);
1065 os_queue_free(&handler->queue);
1066 snd_free(handler->msg_buffer);
1067 snd_free(handler);
1068 g_ws_handler = NULL;
1069
1070 M_LOGI("ws free\n");
1071 return 0;
1072 }
1073
1074