1 /*
2 * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3 */
4
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <stdarg.h>
9
10 #include "amp_config.h"
11 #include "amp_platform.h"
12 #include "aos_system.h"
13 #include "aos_network.h"
14 #include "amp_memory.h"
15 #include "amp_defines.h"
16 #include "module_mqtt.h"
17 #include "aiot_mqtt_api.h"
18 #include "amp_task.h"
19 #include "amp_list.h"
20 #include "quickjs_addon_common.h"
21
22 #define MOD_STR "MQTT"
23 #define MQTT_TASK_YIELD_TIMEOUT 200
24 static char g_mqtt_close_flag = 0;
25 static char g_mqtt_conn_flag = 0;
26 static aos_sem_t g_mqtt_close_sem = NULL;
27
28 #ifndef countof
29 #define countof(x) (sizeof(x) / sizeof((x)[0]))
30 #endif
31
32 static JSClassID js_mqtt_class_id;
33 static JSClassDef js_mqtt_class = {
34 "MQTT",
35 };
36
37 typedef struct {
38 aos_mqtt_handle_t *aos_mqtt_handle;
39 char *topic;
40 char *payload;
41 char *service_id;
42 char *params;
43 JSValue js_cb_ref;
44 int ret_code;
45 int topic_len;
46 int payload_len;
47 int params_len;
48 uint64_t msg_id;
49 aiot_mqtt_option_t option;
50 aiot_mqtt_event_type_t event_type;
51 aiot_mqtt_recv_type_t recv_type;
52 } aos_mqtt_notify_param_t;
53
__amp_strdup(char * src)54 static char *__amp_strdup(char *src)
55 {
56 char *dst;
57 size_t len = 0;
58
59 if (src == NULL) {
60 return NULL;
61 }
62
63 len = strlen(src);
64
65 dst = amp_malloc(len + 1);
66 if (dst == NULL) {
67 return NULL;
68 }
69
70 memcpy(dst, src, len);
71 dst[len] = '\0';
72 return dst;
73 }
74
aos_mqtt_notify(void * pdata)75 static void aos_mqtt_notify(void *pdata)
76 {
77 aos_mqtt_notify_param_t *param = (aos_mqtt_notify_param_t *)pdata;
78
79 JSContext *ctx = js_get_context();
80
81 if (param->option == AIOT_MQTTOPT_EVENT_HANDLER) {
82 switch (param->event_type) {
83 case AIOT_MQTTEVT_CONNECT:
84 case AIOT_MQTTEVT_RECONNECT:
85 case AIOT_MQTTEVT_DISCONNECT:
86 {
87 JSValue obj = JS_NewObject(ctx);
88 JS_SetPropertyStr(ctx, obj, "code", JS_NewInt32(ctx, param->ret_code));
89 JSValue mqtt = JS_NewObjectClass(ctx, js_mqtt_class_id);
90 JS_SetOpaque(mqtt, param->aos_mqtt_handle);
91 JS_SetPropertyStr(ctx, obj, "handle", mqtt);
92
93 JSValue val = JS_Call(ctx, param->js_cb_ref, JS_UNDEFINED, 1, &obj);
94 JS_FreeValue(ctx, val);
95 JS_FreeValue(ctx, obj);
96 }
97 break;
98 default:
99 amp_free(param);
100 return;
101 }
102 } else if (param->option == AIOT_MQTTOPT_RECV_HANDLER) {
103 switch (param->recv_type) {
104 case AIOT_MQTTRECV_PUB:
105 {
106 JSValue obj = JS_NewObject(ctx);
107 JS_SetPropertyStr(ctx, obj, "code", JS_NewInt32(ctx, param->ret_code));
108 JSValue mqtt = JS_NewObjectClass(ctx, js_mqtt_class_id);
109 JS_SetOpaque(mqtt, param->aos_mqtt_handle);
110 JS_SetPropertyStr(ctx, obj, "handle", mqtt);
111 JS_SetPropertyStr(ctx, obj, "topic", JS_NewString(ctx, param->topic));
112 JS_SetPropertyStr(ctx, obj, "payload", JS_NewString(ctx, param->payload));
113
114 JSValue val = JS_Call(ctx, param->js_cb_ref, JS_UNDEFINED, 1, &obj);
115 JS_FreeValue(ctx, val);
116 JS_FreeValue(ctx, obj);
117
118 amp_free(param->topic);
119 amp_free(param->payload);
120 }
121 break;
122 default:
123 amp_free(param);
124 return;
125 }
126 }
127
128 amp_free(param);
129 }
130
aos_mqtt_message_cb(aos_mqtt_message_t * message,void * userdata)131 static void aos_mqtt_message_cb(aos_mqtt_message_t *message, void *userdata)
132 {
133 aos_mqtt_userdata_t *udata = (aos_mqtt_userdata_t *)userdata;
134 aos_mqtt_handle_t *aos_mqtt_handle;
135 aos_mqtt_notify_param_t *param;
136
137 if (!message || !udata)
138 return;
139
140 param = amp_malloc(sizeof(aos_mqtt_notify_param_t));
141 if (!param) {
142 amp_error(MOD_STR, "alloc device notify param fail");
143 return;
144 }
145 memset(param, 0, sizeof(aos_mqtt_notify_param_t));
146 param->aos_mqtt_handle = (aos_mqtt_handle_t *)udata->handle;
147 param->option = message->option;
148 param->js_cb_ref = param->aos_mqtt_handle->js_cb_ref[MQTT_JSCALLBACK_START_CLIENT_REF];
149
150 if (message->option == AIOT_MQTTOPT_EVENT_HANDLER) {
151 switch (message->event.type) {
152 case AIOT_MQTTEVT_CONNECT:
153 case AIOT_MQTTEVT_RECONNECT:
154 case AIOT_MQTTEVT_DISCONNECT:
155 param->ret_code = message->event.code;
156 param->event_type = message->event.type;
157 break;
158 default:
159 amp_free(param);
160 return;
161 }
162 } else if (message->option == AIOT_MQTTOPT_RECV_HANDLER) {
163 switch (message->recv.type) {
164 case AIOT_MQTTRECV_PUB:
165 param->ret_code = message->recv.code;
166 param->topic_len = message->recv.topic_len;
167 param->payload_len = message->recv.payload_len;
168 param->topic = __amp_strdup(message->recv.topic);
169 param->payload = __amp_strdup(message->recv.payload);
170 param->recv_type = message->recv.type;
171 break;
172 default:
173 amp_free(param);
174 return;
175 }
176 } else {
177 amp_free(param);
178 return;
179 }
180
181 amp_task_schedule_call(aos_mqtt_notify, param);
182 }
183
mqtt_connect_task(void * pdata)184 static void mqtt_connect_task(void *pdata)
185 {
186 int ret;
187 JSContext *ctx = js_get_context();
188 aos_mqtt_handle_t *aos_mqtt_handle = (aos_mqtt_handle_t *)pdata;
189 aos_mqtt_userdata_t *userdata = amp_malloc(sizeof(aos_mqtt_userdata_t));
190 if (userdata == NULL) {
191 amp_error(MOD_STR, "amp mqtt handle malloc failed");
192 amp_free(aos_mqtt_handle);
193 return;
194 }
195
196 userdata->callback = aos_mqtt_message_cb;
197 userdata->handle = aos_mqtt_handle;
198
199 ret = mqtt_client_start(&aos_mqtt_handle->mqtt_handle, userdata);
200 if (ret < 0) {
201 amp_error(MOD_STR, "mqtt client init failed");
202 goto out;
203 }
204
205 g_mqtt_conn_flag = 1;
206 aos_mqtt_handle->res = ret;
207
208 while (!g_mqtt_close_flag) {
209 aos_msleep(1000);
210 }
211
212 out:
213 JS_FreeValue(ctx, aos_mqtt_handle->js_cb_ref[MQTT_JSCALLBACK_START_CLIENT_REF]);
214 JS_FreeCString(ctx, aos_mqtt_handle->host);
215 JS_FreeCString(ctx, aos_mqtt_handle->clientid);
216 JS_FreeCString(ctx, aos_mqtt_handle->username);
217 JS_FreeCString(ctx, aos_mqtt_handle->password);
218
219 aos_free(userdata);
220 aos_free(aos_mqtt_handle);
221 aos_sem_signal(&g_mqtt_close_sem);
222 aos_task_exit(0);
223 }
224
native_mqtt_start(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)225 static JSValue native_mqtt_start(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
226 {
227 int ret;
228 uint32_t val = 0;
229 aos_mqtt_handle_t *aos_mqtt_handle = NULL;
230 aos_task_t mqtt_task;
231 JSValue js_cb_ref;
232
233 /* check paramters */
234 JSValue options = argv[0];
235 JSValue cb = argv[1];
236
237 if (!JS_IsObject(options) || !JS_IsFunction(ctx, cb)) {
238 amp_warn(MOD_STR, "parameter must be object and function\n");
239 ret = -1;
240 goto out;
241 }
242
243 /* get device certificate */
244 JSValue j_host = JS_GetPropertyStr(ctx, argv[0], "host");
245 JSValue j_port = JS_GetPropertyStr(ctx, argv[0], "port");
246 JSValue j_client_id = JS_GetPropertyStr(ctx, argv[0], "client_id");
247 JSValue j_username = JS_GetPropertyStr(ctx, argv[0], "username");
248 JSValue j_password = JS_GetPropertyStr(ctx, argv[0], "password");
249 JSValue j_keepaliveSec = JS_GetPropertyStr(ctx, argv[0], "keepalive_interval");
250
251 if (!JS_IsString(j_host) || !JS_IsNumber(j_port) || !JS_IsString(j_client_id) ||
252 !JS_IsString(j_username) || !JS_IsString(j_password) || !JS_IsNumber(j_keepaliveSec))
253 {
254 amp_warn(MOD_STR,
255 "Parameter 1 must be an object like {host: string, "
256 "port: uint, client_id: string, username: string, "
257 "password: string, keepalive_interval: uint}\n");
258 ret = -2;
259 goto out;
260 }
261
262 aos_mqtt_handle = (aos_mqtt_handle_t *)amp_malloc(sizeof(aos_mqtt_handle_t));
263 if (!aos_mqtt_handle) {
264 amp_error(MOD_STR, "allocate memory failed\n");
265 goto out;
266 }
267
268 aos_mqtt_handle->host = JS_ToCString(ctx, j_host);
269 JS_ToUint32(ctx, &val, j_port);
270 aos_mqtt_handle->port = (uint16_t)val;
271
272 aos_mqtt_handle->clientid = JS_ToCString(ctx, j_client_id);
273 aos_mqtt_handle->username = JS_ToCString(ctx, j_username);
274 aos_mqtt_handle->password = JS_ToCString(ctx, j_password);
275
276 JS_ToUint32(ctx, &val, j_keepaliveSec);
277 aos_mqtt_handle->keepaliveSec = (uint16_t)val;
278
279 amp_debug(MOD_STR, "host: %s, port: %d\n", aos_mqtt_handle->host, aos_mqtt_handle->port);
280 amp_debug(MOD_STR, "client_id: %s, username: %s, password: %s\n", aos_mqtt_handle->clientid, aos_mqtt_handle->username, aos_mqtt_handle->password);
281
282 js_cb_ref = JS_DupValue(ctx, cb);
283 aos_mqtt_handle->js_cb_ref[MQTT_JSCALLBACK_START_CLIENT_REF] = js_cb_ref;
284
285 /* create task to IOT_MQTT_Yield() */
286 ret = aos_task_new_ext(&mqtt_task, "amp mqtt task", mqtt_connect_task, aos_mqtt_handle, 1024 * 4, MQTT_TSK_PRIORITY);
287 if (ret < 0) {
288 amp_warn(MOD_STR, "jse_osal_create_task failed\n");
289 JS_FreeValue(ctx, js_cb_ref);
290 aos_free(aos_mqtt_handle);
291 ret = -4;
292 }
293
294 out:
295 return JS_NewInt32(ctx, ret);
296 }
297
298 /* subscribe */
native_mqtt_subscribe(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)299 static JSValue native_mqtt_subscribe(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
300 {
301 int res = -1;
302 aos_mqtt_handle_t *amp_mqtt_handle = NULL;
303 const char *topic = NULL;
304 uint8_t qos = 0;
305 int js_cb_ref = 0;
306
307 amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
308 if (amp_mqtt_handle == NULL) {
309 amp_warn(MOD_STR, "mqtt handle is null");
310 goto out;
311 }
312
313 topic = JS_ToCString(ctx, argv[1]);
314 JS_ToInt32(ctx, &qos, argv[2]);
315
316 res = aiot_mqtt_sub(amp_mqtt_handle->mqtt_handle, topic, NULL, qos, NULL);
317 if (res < 0) {
318 amp_error(MOD_STR, "aiot app mqtt subscribe failed, ret = -0x%04X", res);
319 }
320
321 JS_FreeCString(ctx, topic);
322
323 out:
324 return JS_NewInt32(ctx, res);
325 }
326
327 /* unsubscribe */
native_mqtt_unsubscribe(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)328 static JSValue native_mqtt_unsubscribe(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
329 {
330 int res = -1;
331 aos_mqtt_handle_t *amp_mqtt_handle = NULL;
332 const char *topic;
333 uint8_t qos = 0;
334 int js_cb_ref = 0;
335
336 amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
337 if (amp_mqtt_handle == NULL) {
338 amp_warn(MOD_STR, "mqtt handle is null");
339 goto out;
340 }
341
342 topic = JS_ToCString(ctx, argv[1]);
343
344 amp_debug(MOD_STR, "unsubscribe topic: %s", topic);
345
346 res = aiot_mqtt_unsub(amp_mqtt_handle->mqtt_handle, topic);
347 if (res < 0) {
348 amp_error(MOD_STR, "aiot app mqtt unsubscribe failed\n");
349 }
350 JS_FreeCString(ctx, topic);
351
352 out:
353 return JS_NewInt32(ctx, res);
354 }
355
356 /* publish */
native_mqtt_publish(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)357 static JSValue native_mqtt_publish(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
358 {
359 int res = -1;
360 aos_mqtt_handle_t *amp_mqtt_handle = NULL;
361 const char *topic;
362 const char *payload;
363 uint16_t payload_len = 0;
364 uint8_t qos = 0;
365 int js_cb_ref = 0;
366
367 amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
368 if (amp_mqtt_handle == NULL) {
369 amp_warn(MOD_STR, "mqtt handle is null");
370 goto out;
371 }
372
373 topic = JS_ToCString(ctx, argv[1]);
374 payload = JS_ToCString(ctx, argv[2]);
375 JS_ToInt32(ctx, &qos, argv[1]);
376 payload_len = strlen(payload);
377
378 amp_debug(MOD_STR, "publish topic: %s, payload: %s, qos is: %d", topic, payload, qos);
379
380 res = aiot_mqtt_pub(amp_mqtt_handle->mqtt_handle, topic, payload, payload_len, qos);
381 if (res < 0) {
382 amp_error(MOD_STR, "aiot app mqtt publish failed");
383 }
384 JS_FreeCString(ctx, topic);
385 JS_FreeCString(ctx, payload);
386
387 out:
388 return JS_NewInt32(ctx, res);
389 }
390
native_mqtt_close(JSContext * ctx,JSValueConst this_val,int argc,JSValueConst * argv)391 static JSValue native_mqtt_close(JSContext *ctx, JSValueConst this_val, int argc, JSValueConst *argv)
392 {
393 int res = -1;
394 int js_cb_ref = 0;
395 aos_mqtt_handle_t *amp_mqtt_handle = NULL;
396
397 amp_mqtt_handle = JS_GetOpaque2(ctx, argv[0], js_mqtt_class_id);
398 if (amp_mqtt_handle == NULL) {
399 amp_warn(MOD_STR, "mqtt handle is null");
400 goto out;
401 }
402
403 res = mqtt_client_stop(&_mqtt_handle->mqtt_handle);
404 if (res < 0) {
405 amp_debug(MOD_STR, "mqtt client stop failed");
406 }
407
408 out:
409 /* release mqtt in mqtt_yield_task() */
410 g_mqtt_close_flag = 1;
411 aos_sem_wait(&g_mqtt_close_sem, MQTT_TASK_YIELD_TIMEOUT + 50);
412 g_mqtt_close_flag = 0;
413 return JS_NewInt32(ctx, 1);
414 }
415
module_mqtt_source_clean(void)416 static void module_mqtt_source_clean(void)
417 {
418 if (g_mqtt_close_flag) {
419 aos_sem_wait(&g_mqtt_close_sem, MQTT_TASK_YIELD_TIMEOUT + 50);
420 g_mqtt_close_flag = 0;
421 }
422 }
423
424 static const JSCFunctionListEntry js_mqtt_funcs[] = {
425 JS_CFUNC_DEF("start", 2, native_mqtt_start),
426 JS_CFUNC_DEF("subscribe", 3, native_mqtt_subscribe),
427 JS_CFUNC_DEF("unsubscribe", 3, native_mqtt_unsubscribe),
428 JS_CFUNC_DEF("publish", 5, native_mqtt_publish),
429 JS_CFUNC_DEF("close", 2, native_mqtt_close),
430 };
431
js_mqtt_init(JSContext * ctx,JSModuleDef * m)432 static int js_mqtt_init(JSContext *ctx, JSModuleDef *m)
433 {
434 JSValue proto;
435
436 JS_NewClassID(&js_mqtt_class_id);
437
438 JS_NewClass(JS_GetRuntime(ctx), js_mqtt_class_id, &js_mqtt_class);
439 proto = JS_NewObject(ctx);
440 JS_SetPropertyFunctionList(ctx, proto, js_mqtt_funcs, countof(js_mqtt_funcs));
441 JS_SetClassProto(ctx, js_mqtt_class_id, proto);
442 return JS_SetModuleExportList(ctx, m, js_mqtt_funcs, countof(js_mqtt_funcs));
443 }
444
js_init_module_mqtt(JSContext * ctx,const char * module_name)445 JSModuleDef *js_init_module_mqtt(JSContext *ctx, const char *module_name)
446 {
447 JSModuleDef *m;
448 m = JS_NewCModule(ctx, module_name, js_mqtt_init);
449 if (!m)
450 return NULL;
451 JS_AddModuleExportList(ctx, m, js_mqtt_funcs, countof(js_mqtt_funcs));
452 return m;
453 }
454
module_mqtt_register(void)455 void module_mqtt_register(void)
456 {
457 amp_debug(MOD_STR, "module_mqtt_register");
458 JSContext *ctx = js_get_context();
459 js_init_module_mqtt(ctx, "MQTT");
460 }
461