1 /*
2  * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3  */
4 
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <stdarg.h>
9 
10 #include "amp_config.h"
11 #include "amp_platform.h"
12 #include "aos_system.h"
13 #include "aos_network.h"
14 #include "amp_memory.h"
15 #include "amp_defines.h"
16 #include "module_mqtt.h"
17 #include "aiot_mqtt_api.h"
18 #include "amp_task.h"
19 #include "amp_list.h"
20 #include "quickjs_addon_common.h"
21 
22 #define MOD_STR "MQTT"
23 #define MQTT_TASK_YIELD_TIMEOUT 200
24 static char g_mqtt_close_flag = 0;
25 static char g_mqtt_conn_flag = 0;
26 static aos_sem_t g_mqtt_close_sem = NULL;
27 
28 #ifndef countof
29 #define countof(x) (sizeof(x) / sizeof((x)[0]))
30 #endif
31 
32 static JSClassID js_mqtt_class_id;
33 static JSClassDef js_mqtt_class = {
34     "MQTT",
35 };
36 
37 typedef struct {
38     aos_mqtt_handle_t *aos_mqtt_handle;
39     char *topic;
40     char *payload;
41     char *service_id;
42     char *params;
43     JSValue js_cb_ref;
44     int ret_code;
45     int topic_len;
46     int payload_len;
47     int params_len;
48     uint64_t msg_id;
49     aiot_mqtt_option_t option;
50     aiot_mqtt_event_type_t event_type;
51     aiot_mqtt_recv_type_t recv_type;
52 } aos_mqtt_notify_param_t;
53 
__amp_strdup(char * src)54 static char *__amp_strdup(char *src)
55 {
56     char   *dst;
57     size_t  len = 0;
58 
59     if (src == NULL) {
60         return NULL;
61     }
62 
63     len = strlen(src);
64 
65     dst = amp_malloc(len + 1);
66     if (dst == NULL) {
67         return NULL;
68     }
69 
70     memcpy(dst, src, len);
71     dst[len] = '\0';
72     return dst;
73 }
74 
aos_mqtt_notify(void * pdata)75 static void aos_mqtt_notify(void *pdata)
76 {
77     aos_mqtt_notify_param_t *param = (aos_mqtt_notify_param_t *)pdata;
78 
79     JSContext *ctx = js_get_context();
80 
81     if (param->option == AIOT_MQTTOPT_EVENT_HANDLER) {
82         switch (param->event_type) {
83         case AIOT_MQTTEVT_CONNECT:
84         case AIOT_MQTTEVT_RECONNECT:
85         case AIOT_MQTTEVT_DISCONNECT:
86         {
87             JSValue obj = JS_NewObject(ctx);
88             JS_SetPropertyStr(ctx, obj, "code", JS_NewInt32(ctx, param->ret_code));
89             JSValue mqtt = JS_NewObjectClass(ctx, js_mqtt_class_id);
90             JS_SetOpaque(mqtt, param->aos_mqtt_handle);
91             JS_SetPropertyStr(ctx, obj, "handle", mqtt);
92 
93             JSValue val = JS_Call(ctx, param->js_cb_ref, JS_UNDEFINED, 1, &obj);
94             JS_FreeValue(ctx, val);
95             JS_FreeValue(ctx, obj);
96         }
97             break;
98         default:
99             amp_free(param);
100             return;
101         }
102     } else if (param->option == AIOT_MQTTOPT_RECV_HANDLER) {
103         switch (param->recv_type) {
104         case AIOT_MQTTRECV_PUB:
105         {
106             JSValue obj = JS_NewObject(ctx);
107             JS_SetPropertyStr(ctx, obj, "code", JS_NewInt32(ctx, param->ret_code));
108             JSValue mqtt = JS_NewObjectClass(ctx, js_mqtt_class_id);
109             JS_SetOpaque(mqtt, param->aos_mqtt_handle);
110             JS_SetPropertyStr(ctx, obj, "handle", mqtt);
111             JS_SetPropertyStr(ctx, obj, "topic", JS_NewString(ctx, param->topic));
112             JS_SetPropertyStr(ctx, obj, "payload", JS_NewString(ctx, param->payload));
113 
114             JSValue val = JS_Call(ctx, param->js_cb_ref, JS_UNDEFINED, 1, &obj);
115             JS_FreeValue(ctx, val);
116             JS_FreeValue(ctx, obj);
117 
118             amp_free(param->topic);
119             amp_free(param->payload);
120         }
121             break;
122         default:
123             amp_free(param);
124             return;
125         }
126     }
127 
128     amp_free(param);
129 }
130 
aos_mqtt_message_cb(aos_mqtt_message_t * message,void * userdata)131 static void aos_mqtt_message_cb(aos_mqtt_message_t *message, void *userdata)
132 {
133     aos_mqtt_userdata_t *udata = (aos_mqtt_userdata_t *)userdata;
134     aos_mqtt_handle_t *aos_mqtt_handle;
135     aos_mqtt_notify_param_t *param;
136 
137     if (!message || !udata)
138         return;
139 
140     param = amp_malloc(sizeof(aos_mqtt_notify_param_t));
141     if (!param) {
142         amp_error(MOD_STR, "alloc device notify param fail");
143         return;
144     }
145     memset(param, 0, sizeof(aos_mqtt_notify_param_t));
146     param->aos_mqtt_handle = (aos_mqtt_handle_t *)udata->handle;
147     param->option = message->option;
148     param->js_cb_ref = param->aos_mqtt_handle->js_cb_ref[MQTT_JSCALLBACK_START_CLIENT_REF];
149 
150     if (message->option == AIOT_MQTTOPT_EVENT_HANDLER) {
151         switch (message->event.type) {
152         case AIOT_MQTTEVT_CONNECT:
153         case AIOT_MQTTEVT_RECONNECT:
154         case AIOT_MQTTEVT_DISCONNECT:
155             param->ret_code = message->event.code;
156             param->event_type = message->event.type;
157             break;
158         default:
159             amp_free(param);
160             return;
161         }
162     } else if (message->option == AIOT_MQTTOPT_RECV_HANDLER) {
163         switch (message->recv.type) {
164         case AIOT_MQTTRECV_PUB:
165             param->ret_code = message->recv.code;
166             param->topic_len = message->recv.topic_len;
167             param->payload_len = message->recv.payload_len;
168             param->topic = __amp_strdup(message->recv.topic);
169             param->payload = __amp_strdup(message->recv.payload);
170             param->recv_type = message->recv.type;
171             break;
172         default:
173             amp_free(param);
174             return;
175         }
176     } else {
177         amp_free(param);
178         return;
179     }
180 
181     amp_task_schedule_call(aos_mqtt_notify, param);
182 }
183 
mqtt_connect_task(void * pdata)184 static void mqtt_connect_task(void *pdata)
185 {
186     int ret;
187     JSContext *ctx = js_get_context();
188     aos_mqtt_handle_t *aos_mqtt_handle = (aos_mqtt_handle_t *)pdata;
189     aos_mqtt_userdata_t *userdata = amp_malloc(sizeof(aos_mqtt_userdata_t));
190     if (userdata == NULL) {
191         amp_error(MOD_STR, "amp mqtt handle malloc failed");
192         amp_free(aos_mqtt_handle);
193         return;
194     }
195 
196     userdata->callback = aos_mqtt_message_cb;
197     userdata->handle = aos_mqtt_handle;
198 
199     ret = mqtt_client_start(&aos_mqtt_handle->mqtt_handle, userdata);
200     if (ret < 0) {
201         amp_error(MOD_STR, "mqtt client init failed");
202         goto out;
203     }
204 
205     g_mqtt_conn_flag = 1;
206     aos_mqtt_handle->res = ret;
207 
208     while (!g_mqtt_close_flag) {
209         aos_msleep(1000);
210     }
211 
212 out:
213     JS_FreeValue(ctx, aos_mqtt_handle->js_cb_ref[MQTT_JSCALLBACK_START_CLIENT_REF]);
214     JS_FreeCString(ctx, aos_mqtt_handle->host);
215     JS_FreeCString(ctx, aos_mqtt_handle->clientid);
216     JS_FreeCString(ctx, aos_mqtt_handle->username);
217     JS_FreeCString(ctx, aos_mqtt_handle->password);
218 
219     aos_free(userdata);
220     aos_free(aos_mqtt_handle);
221     aos_sem_signal(&g_mqtt_close_sem);
222     aos_task_exit(0);
223 }
224 
native_mqtt_start(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)225 static JSValue native_mqtt_start(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
226 {
227     int ret;
228     uint32_t val = 0;
229     aos_mqtt_handle_t *aos_mqtt_handle = NULL;
230     aos_task_t mqtt_task;
231     JSValue js_cb_ref;
232 
233     /* check paramters */
234     JSValue options = argv[0];
235     JSValue cb = argv[1];
236 
237     if (!JS_IsObject(options) || !JS_IsFunction(ctx, cb)) {
238         amp_warn(MOD_STR, "parameter must be object and function\n");
239         ret = -1;
240         goto out;
241     }
242 
243     /* get device certificate */
244     JSValue j_host = JS_GetPropertyStr(ctx, argv[0], "host");
245     JSValue j_port = JS_GetPropertyStr(ctx, argv[0], "port");
246     JSValue j_client_id = JS_GetPropertyStr(ctx, argv[0], "client_id");
247     JSValue j_username = JS_GetPropertyStr(ctx, argv[0], "username");
248     JSValue j_password = JS_GetPropertyStr(ctx, argv[0], "password");
249     JSValue j_keepaliveSec = JS_GetPropertyStr(ctx, argv[0], "keepalive_interval");
250 
251     if (!JS_IsString(j_host) || !JS_IsNumber(j_port) || !JS_IsString(j_client_id) ||
252         !JS_IsString(j_username) || !JS_IsString(j_password) || !JS_IsNumber(j_keepaliveSec))
253     {
254         amp_warn(MOD_STR,
255                  "Parameter 1 must be an object like {host: string, "
256                  "port: uint, client_id: string, username: string, "
257                  "password: string, keepalive_interval: uint}\n");
258         ret = -2;
259         goto out;
260     }
261 
262     aos_mqtt_handle = (aos_mqtt_handle_t *)amp_malloc(sizeof(aos_mqtt_handle_t));
263     if (!aos_mqtt_handle) {
264         amp_error(MOD_STR, "allocate memory failed\n");
265         goto out;
266     }
267 
268     aos_mqtt_handle->host = JS_ToCString(ctx, j_host);
269     JS_ToUint32(ctx, &val, j_port);
270     aos_mqtt_handle->port = (uint16_t)val;
271 
272     aos_mqtt_handle->clientid = JS_ToCString(ctx, j_client_id);
273     aos_mqtt_handle->username = JS_ToCString(ctx, j_username);
274     aos_mqtt_handle->password = JS_ToCString(ctx, j_password);
275 
276     JS_ToUint32(ctx, &val, j_keepaliveSec);
277     aos_mqtt_handle->keepaliveSec = (uint16_t)val;
278 
279     amp_debug(MOD_STR, "host: %s, port: %d\n", aos_mqtt_handle->host, aos_mqtt_handle->port);
280     amp_debug(MOD_STR, "client_id: %s, username: %s, password: %s\n", aos_mqtt_handle->clientid, aos_mqtt_handle->username, aos_mqtt_handle->password);
281 
282     js_cb_ref = JS_DupValue(ctx, cb);
283     aos_mqtt_handle->js_cb_ref[MQTT_JSCALLBACK_START_CLIENT_REF] = js_cb_ref;
284 
285     /* create task to IOT_MQTT_Yield() */
286     ret = aos_task_new_ext(&mqtt_task, "amp mqtt task", mqtt_connect_task, aos_mqtt_handle, 1024 * 4, MQTT_TSK_PRIORITY);
287     if (ret < 0) {
288         amp_warn(MOD_STR, "jse_osal_create_task failed\n");
289         JS_FreeValue(ctx, js_cb_ref);
290         aos_free(aos_mqtt_handle);
291         ret = -4;
292     }
293 
294 out:
295     return JS_NewInt32(ctx, ret);
296 }
297 
298 /* subscribe */
native_mqtt_subscribe(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)299 static JSValue native_mqtt_subscribe(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
300 {
301     int res = -1;
302     aos_mqtt_handle_t *amp_mqtt_handle = NULL;
303     const char *topic = NULL;
304     uint8_t qos = 0;
305     int js_cb_ref = 0;
306 
307     amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
308     if (amp_mqtt_handle == NULL) {
309         amp_warn(MOD_STR, "mqtt handle is null");
310         goto out;
311     }
312 
313     topic = JS_ToCString(ctx, argv[1]);
314     JS_ToInt32(ctx, &qos, argv[2]);
315 
316     res = aiot_mqtt_sub(amp_mqtt_handle->mqtt_handle, topic, NULL, qos, NULL);
317     if (res < 0) {
318         amp_error(MOD_STR, "aiot app mqtt subscribe failed, ret = -0x%04X", res);
319     }
320 
321     JS_FreeCString(ctx, topic);
322 
323 out:
324     return JS_NewInt32(ctx, res);
325 }
326 
327 /* unsubscribe */
native_mqtt_unsubscribe(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)328 static JSValue native_mqtt_unsubscribe(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
329 {
330     int res = -1;
331     aos_mqtt_handle_t *amp_mqtt_handle = NULL;
332     const char *topic;
333     uint8_t qos = 0;
334     int js_cb_ref = 0;
335 
336     amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
337     if (amp_mqtt_handle == NULL) {
338         amp_warn(MOD_STR, "mqtt handle is null");
339         goto out;
340     }
341 
342     topic = JS_ToCString(ctx, argv[1]);
343 
344     amp_debug(MOD_STR, "unsubscribe topic: %s", topic);
345 
346     res = aiot_mqtt_unsub(amp_mqtt_handle->mqtt_handle, topic);
347     if (res < 0) {
348         amp_error(MOD_STR, "aiot app mqtt unsubscribe failed\n");
349     }
350     JS_FreeCString(ctx, topic);
351 
352 out:
353     return JS_NewInt32(ctx, res);
354 }
355 
356 /* publish */
native_mqtt_publish(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)357 static JSValue native_mqtt_publish(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
358 {
359     int res = -1;
360     aos_mqtt_handle_t *amp_mqtt_handle = NULL;
361     const char *topic;
362     const char *payload;
363     uint16_t payload_len = 0;
364     uint8_t qos = 0;
365     int js_cb_ref = 0;
366 
367     amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
368     if (amp_mqtt_handle == NULL) {
369         amp_warn(MOD_STR, "mqtt handle is null");
370         goto out;
371     }
372 
373     topic = JS_ToCString(ctx, argv[1]);
374     payload = JS_ToCString(ctx, argv[2]);
375     JS_ToInt32(ctx, &qos, argv[1]);
376     payload_len = strlen(payload);
377 
378     amp_debug(MOD_STR, "publish topic: %s, payload: %s, qos is: %d", topic, payload, qos);
379 
380     res = aiot_mqtt_pub(amp_mqtt_handle->mqtt_handle, topic, payload, payload_len, qos);
381     if (res < 0) {
382         amp_error(MOD_STR, "aiot app mqtt publish failed");
383     }
384     JS_FreeCString(ctx, topic);
385     JS_FreeCString(ctx, payload);
386 
387 out:
388     return JS_NewInt32(ctx, res);
389 }
390 
native_mqtt_close(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)391 static JSValue native_mqtt_close(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
392 {
393     int res = -1;
394     int js_cb_ref = 0;
395     aos_mqtt_handle_t *amp_mqtt_handle = NULL;
396 
397     amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
398     if (amp_mqtt_handle == NULL) {
399         amp_warn(MOD_STR, "mqtt handle is null");
400         goto out;
401     }
402 
403     res = mqtt_client_stop(&amp_mqtt_handle->mqtt_handle);
404     if (res < 0) {
405         amp_debug(MOD_STR, "mqtt client stop failed");
406     }
407 
408 out:
409     /* release mqtt in mqtt_yield_task() */
410     g_mqtt_close_flag = 1;
411     aos_sem_wait(&g_mqtt_close_sem, MQTT_TASK_YIELD_TIMEOUT + 50);
412     g_mqtt_close_flag = 0;
413     return JS_NewInt32(ctx, 1);
414 }
415 
module_mqtt_source_clean(void)416 static void module_mqtt_source_clean(void)
417 {
418     if (g_mqtt_close_flag) {
419         aos_sem_wait(&g_mqtt_close_sem, MQTT_TASK_YIELD_TIMEOUT + 50);
420         g_mqtt_close_flag = 0;
421     }
422 }
423 
424 static const JSCFunctionListEntry js_mqtt_funcs[] = {
425     JS_CFUNC_DEF("start", 2, native_mqtt_start),
426     JS_CFUNC_DEF("subscribe", 3, native_mqtt_subscribe),
427     JS_CFUNC_DEF("unsubscribe", 3, native_mqtt_unsubscribe),
428     JS_CFUNC_DEF("publish", 5, native_mqtt_publish),
429     JS_CFUNC_DEF("close", 2, native_mqtt_close),
430 };
431 
js_mqtt_init(JSContext * ctx,JSModuleDef * m)432 static int js_mqtt_init(JSContext *ctx, JSModuleDef *m)
433 {
434     JSValue proto;
435 
436     JS_NewClassID(&js_mqtt_class_id);
437 
438     JS_NewClass(JS_GetRuntime(ctx), js_mqtt_class_id, &js_mqtt_class);
439     proto = JS_NewObject(ctx);
440     JS_SetPropertyFunctionList(ctx, proto, js_mqtt_funcs, countof(js_mqtt_funcs));
441     JS_SetClassProto(ctx, js_mqtt_class_id, proto);
442     return JS_SetModuleExportList(ctx, m, js_mqtt_funcs, countof(js_mqtt_funcs));
443 }
444 
js_init_module_mqtt(JSContext * ctx,const char * module_name)445 JSModuleDef *js_init_module_mqtt(JSContext *ctx, const char *module_name)
446 {
447     JSModuleDef *m;
448     m = JS_NewCModule(ctx, module_name, js_mqtt_init);
449     if (!m)
450         return NULL;
451     JS_AddModuleExportList(ctx, m, js_mqtt_funcs, countof(js_mqtt_funcs));
452     return m;
453 }
454 
module_mqtt_register(void)455 void module_mqtt_register(void)
456 {
457     amp_debug(MOD_STR, "module_mqtt_register");
458     JSContext *ctx = js_get_context();
459     js_init_module_mqtt(ctx, "MQTT");
460 }
461