1 /*
2  * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3  */
4 
5 #include "amp_platform.h"
6 #include "aos_system.h"
7 #include "amp_defines.h"
8 #include "amp_task.h"
9 #include "aos/kv.h"
10 #include "aiot_state_api.h"
11 #include "aiot_sysdep_api.h"
12 #include "aiot_mqtt_api.h"
13 #include "aiot_dm_api.h"
14 #include "aiot_subdev_api.h"
15 #include "be_inl.h"
16 #include "module_aiot.h"
17 #ifdef AOS_COMP_UAGENT
18 #include "uagent.h"
19 #endif
20 #define MOD_STR "AIOT_MQTT"
21 
22 /* 位于portfiles/aiot_port文件夹下的系统适配函数集合 */
23 extern aiot_sysdep_portfile_t g_aiot_sysdep_portfile;
24 
25 /* 位于external/ali_ca_cert.c中的服务器证书 */
26 extern const char *ali_ca_cert;
27 
28 uint8_t g_app_mqtt_process_thread_running = 0;
29 uint8_t g_app_mqtt_recv_thread_running = 0;
30 
__amp_strdup(char * src,int len)31 static char *__amp_strdup(char *src, int len)
32 {
33     char   *dst;
34 
35     if (src == NULL) {
36         return NULL;
37     }
38 
39     dst = aos_malloc(len+1);
40     if (dst == NULL) {
41         return NULL;
42     }
43 
44     memcpy(dst, src, len);
45     dst[len] = '\0';
46     return dst;
47 }
48 
49 /* 执行aiot_mqtt_process的线程, 包含心跳发送和QoS1消息重发 */
aiot_app_mqtt_process_thread(void * args)50 void *aiot_app_mqtt_process_thread(void *args)
51 {
52     int32_t res = STATE_SUCCESS;
53 
54     while (g_app_mqtt_process_thread_running) {
55         res = aiot_mqtt_process(args);
56         if (res == STATE_USER_INPUT_EXEC_DISABLED) {
57             break;
58         }
59         aos_msleep(1000);
60     }
61     aos_task_exit(0);
62     return;
63 }
64 
65 /* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */
aiot_app_mqtt_recv_thread(void * args)66 void *aiot_app_mqtt_recv_thread(void *args)
67 {
68     int32_t res = STATE_SUCCESS;
69 
70     while (g_app_mqtt_recv_thread_running) {
71         res = aiot_mqtt_recv(args);
72         if (res < STATE_SUCCESS) {
73             if (res == STATE_USER_INPUT_EXEC_DISABLED) {
74                 break;
75             }
76             aos_msleep(1000);
77         }
78     }
79     aos_task_exit(0);
80     return;
81 }
82 
83 /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且无对应用户回调处理时被调用 */
aiot_app_mqtt_recv_handler(void * handle,const aiot_mqtt_recv_t * packet,void * userdata)84 void aiot_app_mqtt_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata)
85 {
86     void (*callback)(void *userdata) = (void (*)(void *))userdata;
87 
88     switch (packet->type) {
89         case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: {
90             // amp_debug(MOD_STR, "heartbeat response");
91             /* TODO: 处理服务器对心跳的回应, 一般不处理 */
92         }
93         break;
94 
95         case AIOT_MQTTRECV_SUB_ACK: {
96             amp_debug(MOD_STR, "suback, res: -0x%04X, packet id: %d, max qos: %d",
97                    -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos);
98             /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */
99         }
100         break;
101 
102         case AIOT_MQTTRECV_PUB: {
103             //amp_debug(MOD_STR, "pub, qos: %d, topic: %.*s", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
104             //amp_debug(MOD_STR, "pub, payload: %.*s", packet->data.pub.payload_len, packet->data.pub.payload);
105             /* TODO: 处理服务器下发的业务报文 */
106             iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
107             iot_mqtt_message_t message;
108             memset(&message, 0, sizeof(iot_mqtt_message_t));
109             if (udata && udata->callback) {
110                 message.option = AIOT_MQTTOPT_RECV_HANDLER;
111                 message.recv.type = packet->type;
112                 message.recv.code = AIOT_MQTT_MESSAGE;
113                 message.recv.topic = __amp_strdup(packet->data.pub.topic, packet->data.pub.topic_len);
114                 message.recv.payload = __amp_strdup(packet->data.pub.payload, packet->data.pub.payload_len);
115                 message.recv.topic_len = packet->data.pub.topic_len;
116                 message.recv.payload_len = packet->data.pub.payload_len;
117                 udata->callback(&message, udata);
118                 aos_free(message.recv.topic);
119                 aos_free(message.recv.payload);
120             }
121         }
122         break;
123 
124         case AIOT_MQTTRECV_PUB_ACK: {
125             amp_debug(MOD_STR, "puback, packet id: %d", packet->data.pub_ack.packet_id);
126             /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */
127         }
128         break;
129 
130         default: {
131 
132         }
133     }
134 }
135 
136 /* MQTT事件回调函数, 当网络连接/重连/断开时被触发, 事件定义见core/aiot_mqtt_api.h */
aiot_app_mqtt_event_handler(void * handle,const aiot_mqtt_event_t * event,void * userdata)137 void aiot_app_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata)
138 {
139     iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
140     iot_mqtt_message_t message;
141 
142     memset(&message, 0, sizeof(iot_mqtt_message_t));
143     message.option = AIOT_MQTTOPT_EVENT_HANDLER;
144     message.event.type = event->type;
145 
146     switch (event->type) {
147         /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功 */
148         case AIOT_MQTTEVT_CONNECT: {
149             amp_debug(MOD_STR, "AIOT_MQTTEVT_CONNECT");
150             /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
151             message.event.code = AIOT_MQTT_CONNECT;
152         }
153         break;
154 
155         /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
156         case AIOT_MQTTEVT_RECONNECT: {
157             amp_debug(MOD_STR, "AIOT_MQTTEVT_RECONNECT");
158             /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
159             message.event.code = AIOT_MQTT_RECONNECT;
160         }
161         break;
162 
163         /* SDK因为网络的状况而被动断开了连接, network是底层读写失败, heartbeat是没有按预期得到服务端心跳应答 */
164         case AIOT_MQTTEVT_DISCONNECT: {
165             char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") :
166                           ("heartbeat disconnect");
167             amp_debug(MOD_STR, "AIOT_MQTTEVT_DISCONNECT: %s", cause);
168             /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
169             message.event.code = AIOT_MQTT_DISCONNECT;
170         }
171         break;
172 
173         default: {
174         return;
175         }
176     }
177 
178     if (udata && udata->callback)
179         udata->callback(&message, udata);
180 }
181 
182 /* 属性上报函数演示 */
aiot_app_send_property_post(void * dm_handle,char * params)183 int32_t aiot_app_send_property_post(void *dm_handle, char *params)
184 {
185     aiot_dm_msg_t msg;
186 
187     memset(&msg, 0, sizeof(aiot_dm_msg_t));
188     msg.type = AIOT_DMMSG_PROPERTY_POST;
189     msg.data.property_post.params = params;
190 
191     return aiot_dm_send(dm_handle, &msg);
192 }
193 
194 /* 事件上报函数演示 */
aiot_app_send_event_post(void * dm_handle,char * event_id,char * params)195 int32_t aiot_app_send_event_post(void *dm_handle, char *event_id, char *params)
196 {
197     aiot_dm_msg_t msg;
198 
199     memset(&msg, 0, sizeof(aiot_dm_msg_t));
200     msg.type = AIOT_DMMSG_EVENT_POST;
201     msg.data.event_post.event_id = event_id;
202     msg.data.event_post.params = params;
203 
204     return aiot_dm_send(dm_handle, &msg);
205 }
206 
aiot_mqtt_client_start(void ** handle,int keepaliveSec,iot_mqtt_userdata_t * userdata)207 int32_t aiot_mqtt_client_start(void **handle, int keepaliveSec, iot_mqtt_userdata_t *userdata)
208 {
209     int32_t res = STATE_SUCCESS;
210     void       *mqtt_handle = NULL;
211     char       *url = "iot-as-mqtt.cn-shanghai.aliyuncs.com"; /* 阿里云平台上海站点的域名后缀 */
212     char        host[100] = {0}; /* 用这个数组拼接设备连接的云平台站点全地址, 规则是 ${productKey}.iot-as-mqtt.cn-shanghai.aliyuncs.com */
213     uint16_t    port = 443;      /* 无论设备是否使用TLS连接阿里云平台, 目的端口都是443 */
214     aiot_sysdep_network_cred_t cred; /* 安全凭据结构体, 如果要用TLS, 这个结构体中配置CA证书等参数 */
215 
216     /* get device tripple info */
217     char product_key[IOTX_PRODUCT_KEY_LEN] = {0};
218     char device_name[IOTX_DEVICE_NAME_LEN] = {0};
219     char device_secret[IOTX_DEVICE_SECRET_LEN] = {0};
220 
221     int productkey_len = IOTX_PRODUCT_KEY_LEN;
222     int devicename_len = IOTX_DEVICE_NAME_LEN;
223     int devicesecret_len = IOTX_DEVICE_SECRET_LEN;
224 
225     aos_kv_get(AMP_CUSTOMER_PRODUCTKEY, product_key, &productkey_len);
226     aos_kv_get(AMP_CUSTOMER_DEVICENAME, device_name, &devicename_len);
227     aos_kv_get(AMP_CUSTOMER_DEVICESECRET, device_secret, &devicesecret_len);
228     /* end get device tripple info */
229 
230     /* 配置SDK的底层依赖 */
231     aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
232     /* 配置SDK的日志输出 */
233     // aiot_state_set_logcb(demo_state_logcb);
234 
235     /* 创建SDK的安全凭据, 用于建立TLS连接 */
236     memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
237     cred.option = AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA;  /* 使用RSA证书校验MQTT服务端 */
238     cred.max_tls_fragment = 16384; /* 最大的分片长度为16K, 其它可选值还有4K, 2K, 1K, 0.5K */
239     cred.sni_enabled = 1;                               /* TLS建连时, 支持Server Name Indicator */
240     cred.x509_server_cert = ali_ca_cert;                 /* 用来验证MQTT服务端的RSA根证书 */
241     cred.x509_server_cert_len = strlen(ali_ca_cert);     /* 用来验证MQTT服务端的RSA根证书长度 */
242 
243     /* 创建1个MQTT客户端实例并内部初始化默认参数 */
244     mqtt_handle = aiot_mqtt_init();
245 
246     if (mqtt_handle == NULL) {
247         amp_debug(MOD_STR, "aiot_mqtt_init failed");
248         aos_free(mqtt_handle);
249         return -1;
250     }
251 
252     /* TODO: 如果以下代码不被注释, 则例程会用TCP而不是TLS连接云平台 */
253     {
254         memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
255         cred.option = AIOT_SYSDEP_NETWORK_CRED_NONE;
256     }
257 
258     snprintf(host, 100, "%s.%s", product_key, url);
259     /* 配置MQTT服务器地址 */
260     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host);
261     /* 配置MQTT服务器端口 */
262     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
263     /* 配置设备productKey */
264     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key);
265     /* 配置设备deviceName */
266     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name);
267     /* 配置设备deviceSecret */
268     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret);
269     /* 配置网络连接的安全凭据, 上面已经创建好了 */
270     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
271     /* 配置MQTT心跳间隔 */
272     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_KEEPALIVE_SEC, (void *)&keepaliveSec);
273     /* 配置回调参数 */
274     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERDATA, userdata);
275     /* 配置MQTT默认消息接收回调函数 */
276     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)aiot_app_mqtt_recv_handler);
277     /* 配置MQTT事件回调函数 */
278     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)aiot_app_mqtt_event_handler);
279 
280     /* 与服务器建立MQTT连接 */
281     res = aiot_mqtt_connect(mqtt_handle);
282     if (res < STATE_SUCCESS)
283     {
284         /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
285         aiot_mqtt_deinit(&mqtt_handle);
286         amp_debug(MOD_STR, "aiot_mqtt_connect failed: -0x%04X", -res);
287         aos_task_exit(0);
288         return -1;
289     }
290 
291     /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活, 以及重发QoS1的未应答报文 */
292     g_app_mqtt_process_thread_running = 1;
293 
294     aos_task_t mqtt_process_task;
295 
296     if (aos_task_new_ext(&mqtt_process_task, "mqtt_process", aiot_app_mqtt_process_thread, mqtt_handle, 1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
297         amp_debug(MOD_STR, "management mqtt process task create failed!");
298         aiot_mqtt_deinit(&mqtt_handle);
299         aos_task_exit(0);
300         return -1;
301     }
302     amp_debug(MOD_STR, "app mqtt process start");
303 
304     /* 创建一个单独的线程用于执行aiot_mqtt_recv, 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */
305     g_app_mqtt_recv_thread_running = 1;
306 
307     aos_task_t mqtt_rec_task;
308 
309     if (aos_task_new_ext(&mqtt_rec_task, "mqtt_rec", aiot_app_mqtt_recv_thread, mqtt_handle, 1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
310         amp_debug(MOD_STR, "management mqtt rec task create failed!");
311         aiot_mqtt_deinit(&mqtt_handle);
312         aos_task_exit(0);
313         return -1;
314     }
315     amp_debug(MOD_STR, "app mqtt rec start");
316 
317     *handle = mqtt_handle;
318  #ifdef AOS_COMP_UAGENT
319     res = uagent_mqtt_client_set(mqtt_handle);
320     if (res != 0) {
321         amp_debug(MOD_STR, "uAgent mqtt handle set failed ret = %d\n", res);
322     }
323 
324     res = uagent_ext_comm_start(product_key, device_name);
325     if (res != 0) {
326         amp_debug(MOD_STR, "uAgent ext comm  start failed ret = %d\n", res);
327     }
328 #endif
329 
330     return 0;
331 }
332 
333 /* mqtt stop */
aiot_mqtt_client_stop(void ** handle)334 int32_t aiot_mqtt_client_stop(void **handle)
335 {
336     int32_t res = STATE_SUCCESS;
337     void *mqtt_handle = NULL;
338 
339     mqtt_handle = *handle;
340 
341     g_app_mqtt_process_thread_running = 0;
342     g_app_mqtt_recv_thread_running = 0;
343 
344     /* 断开MQTT连接 */
345     res = aiot_mqtt_disconnect(mqtt_handle);
346     if (res < STATE_SUCCESS) {
347         aiot_mqtt_deinit(&mqtt_handle);
348         amp_debug(MOD_STR, "aiot_mqtt_disconnect failed: -0x%04X", -res);
349         return -1;
350     }
351 
352     /* 销毁MQTT实例 */
353     res = aiot_mqtt_deinit(&mqtt_handle);
354     if (res < STATE_SUCCESS) {
355         amp_debug(MOD_STR, "aiot_mqtt_deinit failed: -0x%04X", -res);
356         return -1;
357     }
358 
359     return res;
360 }
361