1 /*
2 * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3 */
4
5 #include "amp_platform.h"
6 #include "aos_system.h"
7 #include "amp_defines.h"
8 #include "amp_task.h"
9 #include "aos/kv.h"
10 #include "aiot_state_api.h"
11 #include "aiot_sysdep_api.h"
12 #include "aiot_mqtt_api.h"
13 #include "aiot_dm_api.h"
14 #include "aiot_subdev_api.h"
15 #include "be_inl.h"
16 #include "module_aiot.h"
17 #ifdef AOS_COMP_UAGENT
18 #include "uagent.h"
19 #endif
20 #define MOD_STR "AIOT_MQTT"
21
22 /* 位于portfiles/aiot_port文件夹下的系统适配函数集合 */
23 extern aiot_sysdep_portfile_t g_aiot_sysdep_portfile;
24
25 /* 位于external/ali_ca_cert.c中的服务器证书 */
26 extern const char *ali_ca_cert;
27
28 uint8_t g_app_mqtt_process_thread_running = 0;
29 uint8_t g_app_mqtt_recv_thread_running = 0;
30
__amp_strdup(char * src,int len)31 static char *__amp_strdup(char *src, int len)
32 {
33 char *dst;
34
35 if (src == NULL) {
36 return NULL;
37 }
38
39 dst = aos_malloc(len+1);
40 if (dst == NULL) {
41 return NULL;
42 }
43
44 memcpy(dst, src, len);
45 dst[len] = '\0';
46 return dst;
47 }
48
49 /* 执行aiot_mqtt_process的线程, 包含心跳发送和QoS1消息重发 */
aiot_app_mqtt_process_thread(void * args)50 void *aiot_app_mqtt_process_thread(void *args)
51 {
52 int32_t res = STATE_SUCCESS;
53
54 while (g_app_mqtt_process_thread_running) {
55 res = aiot_mqtt_process(args);
56 if (res == STATE_USER_INPUT_EXEC_DISABLED) {
57 break;
58 }
59 aos_msleep(1000);
60 }
61 aos_task_exit(0);
62 return;
63 }
64
65 /* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */
aiot_app_mqtt_recv_thread(void * args)66 void *aiot_app_mqtt_recv_thread(void *args)
67 {
68 int32_t res = STATE_SUCCESS;
69
70 while (g_app_mqtt_recv_thread_running) {
71 res = aiot_mqtt_recv(args);
72 if (res < STATE_SUCCESS) {
73 if (res == STATE_USER_INPUT_EXEC_DISABLED) {
74 break;
75 }
76 aos_msleep(1000);
77 }
78 }
79 aos_task_exit(0);
80 return;
81 }
82
83 /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且无对应用户回调处理时被调用 */
aiot_app_mqtt_recv_handler(void * handle,const aiot_mqtt_recv_t * packet,void * userdata)84 void aiot_app_mqtt_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata)
85 {
86 void (*callback)(void *userdata) = (void (*)(void *))userdata;
87
88 switch (packet->type) {
89 case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: {
90 // amp_debug(MOD_STR, "heartbeat response");
91 /* TODO: 处理服务器对心跳的回应, 一般不处理 */
92 }
93 break;
94
95 case AIOT_MQTTRECV_SUB_ACK: {
96 amp_debug(MOD_STR, "suback, res: -0x%04X, packet id: %d, max qos: %d",
97 -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos);
98 /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */
99 }
100 break;
101
102 case AIOT_MQTTRECV_PUB: {
103 //amp_debug(MOD_STR, "pub, qos: %d, topic: %.*s", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic);
104 //amp_debug(MOD_STR, "pub, payload: %.*s", packet->data.pub.payload_len, packet->data.pub.payload);
105 /* TODO: 处理服务器下发的业务报文 */
106 iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
107 iot_mqtt_message_t message;
108 memset(&message, 0, sizeof(iot_mqtt_message_t));
109 if (udata && udata->callback) {
110 message.option = AIOT_MQTTOPT_RECV_HANDLER;
111 message.recv.type = packet->type;
112 message.recv.code = AIOT_MQTT_MESSAGE;
113 message.recv.topic = __amp_strdup(packet->data.pub.topic, packet->data.pub.topic_len);
114 message.recv.payload = __amp_strdup(packet->data.pub.payload, packet->data.pub.payload_len);
115 message.recv.topic_len = packet->data.pub.topic_len;
116 message.recv.payload_len = packet->data.pub.payload_len;
117 udata->callback(&message, udata);
118 aos_free(message.recv.topic);
119 aos_free(message.recv.payload);
120 }
121 }
122 break;
123
124 case AIOT_MQTTRECV_PUB_ACK: {
125 amp_debug(MOD_STR, "puback, packet id: %d", packet->data.pub_ack.packet_id);
126 /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */
127 }
128 break;
129
130 default: {
131
132 }
133 }
134 }
135
136 /* MQTT事件回调函数, 当网络连接/重连/断开时被触发, 事件定义见core/aiot_mqtt_api.h */
aiot_app_mqtt_event_handler(void * handle,const aiot_mqtt_event_t * event,void * userdata)137 void aiot_app_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata)
138 {
139 iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
140 iot_mqtt_message_t message;
141
142 memset(&message, 0, sizeof(iot_mqtt_message_t));
143 message.option = AIOT_MQTTOPT_EVENT_HANDLER;
144 message.event.type = event->type;
145
146 switch (event->type) {
147 /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功 */
148 case AIOT_MQTTEVT_CONNECT: {
149 amp_debug(MOD_STR, "AIOT_MQTTEVT_CONNECT");
150 /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
151 message.event.code = AIOT_MQTT_CONNECT;
152 }
153 break;
154
155 /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
156 case AIOT_MQTTEVT_RECONNECT: {
157 amp_debug(MOD_STR, "AIOT_MQTTEVT_RECONNECT");
158 /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
159 message.event.code = AIOT_MQTT_RECONNECT;
160 }
161 break;
162
163 /* SDK因为网络的状况而被动断开了连接, network是底层读写失败, heartbeat是没有按预期得到服务端心跳应答 */
164 case AIOT_MQTTEVT_DISCONNECT: {
165 char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") :
166 ("heartbeat disconnect");
167 amp_debug(MOD_STR, "AIOT_MQTTEVT_DISCONNECT: %s", cause);
168 /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
169 message.event.code = AIOT_MQTT_DISCONNECT;
170 }
171 break;
172
173 default: {
174 return;
175 }
176 }
177
178 if (udata && udata->callback)
179 udata->callback(&message, udata);
180 }
181
182 /* 属性上报函数演示 */
aiot_app_send_property_post(void * dm_handle,char * params)183 int32_t aiot_app_send_property_post(void *dm_handle, char *params)
184 {
185 aiot_dm_msg_t msg;
186
187 memset(&msg, 0, sizeof(aiot_dm_msg_t));
188 msg.type = AIOT_DMMSG_PROPERTY_POST;
189 msg.data.property_post.params = params;
190
191 return aiot_dm_send(dm_handle, &msg);
192 }
193
194 /* 事件上报函数演示 */
aiot_app_send_event_post(void * dm_handle,char * event_id,char * params)195 int32_t aiot_app_send_event_post(void *dm_handle, char *event_id, char *params)
196 {
197 aiot_dm_msg_t msg;
198
199 memset(&msg, 0, sizeof(aiot_dm_msg_t));
200 msg.type = AIOT_DMMSG_EVENT_POST;
201 msg.data.event_post.event_id = event_id;
202 msg.data.event_post.params = params;
203
204 return aiot_dm_send(dm_handle, &msg);
205 }
206
aiot_mqtt_client_start(void ** handle,int keepaliveSec,iot_mqtt_userdata_t * userdata)207 int32_t aiot_mqtt_client_start(void **handle, int keepaliveSec, iot_mqtt_userdata_t *userdata)
208 {
209 int32_t res = STATE_SUCCESS;
210 void *mqtt_handle = NULL;
211 char *url = "iot-as-mqtt.cn-shanghai.aliyuncs.com"; /* 阿里云平台上海站点的域名后缀 */
212 char host[100] = {0}; /* 用这个数组拼接设备连接的云平台站点全地址, 规则是 ${productKey}.iot-as-mqtt.cn-shanghai.aliyuncs.com */
213 uint16_t port = 443; /* 无论设备是否使用TLS连接阿里云平台, 目的端口都是443 */
214 aiot_sysdep_network_cred_t cred; /* 安全凭据结构体, 如果要用TLS, 这个结构体中配置CA证书等参数 */
215
216 /* get device tripple info */
217 char product_key[IOTX_PRODUCT_KEY_LEN] = {0};
218 char device_name[IOTX_DEVICE_NAME_LEN] = {0};
219 char device_secret[IOTX_DEVICE_SECRET_LEN] = {0};
220
221 int productkey_len = IOTX_PRODUCT_KEY_LEN;
222 int devicename_len = IOTX_DEVICE_NAME_LEN;
223 int devicesecret_len = IOTX_DEVICE_SECRET_LEN;
224
225 aos_kv_get(AMP_CUSTOMER_PRODUCTKEY, product_key, &productkey_len);
226 aos_kv_get(AMP_CUSTOMER_DEVICENAME, device_name, &devicename_len);
227 aos_kv_get(AMP_CUSTOMER_DEVICESECRET, device_secret, &devicesecret_len);
228 /* end get device tripple info */
229
230 /* 配置SDK的底层依赖 */
231 aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
232 /* 配置SDK的日志输出 */
233 // aiot_state_set_logcb(demo_state_logcb);
234
235 /* 创建SDK的安全凭据, 用于建立TLS连接 */
236 memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
237 cred.option = AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA; /* 使用RSA证书校验MQTT服务端 */
238 cred.max_tls_fragment = 16384; /* 最大的分片长度为16K, 其它可选值还有4K, 2K, 1K, 0.5K */
239 cred.sni_enabled = 1; /* TLS建连时, 支持Server Name Indicator */
240 cred.x509_server_cert = ali_ca_cert; /* 用来验证MQTT服务端的RSA根证书 */
241 cred.x509_server_cert_len = strlen(ali_ca_cert); /* 用来验证MQTT服务端的RSA根证书长度 */
242
243 /* 创建1个MQTT客户端实例并内部初始化默认参数 */
244 mqtt_handle = aiot_mqtt_init();
245
246 if (mqtt_handle == NULL) {
247 amp_debug(MOD_STR, "aiot_mqtt_init failed");
248 aos_free(mqtt_handle);
249 return -1;
250 }
251
252 /* TODO: 如果以下代码不被注释, 则例程会用TCP而不是TLS连接云平台 */
253 {
254 memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
255 cred.option = AIOT_SYSDEP_NETWORK_CRED_NONE;
256 }
257
258 snprintf(host, 100, "%s.%s", product_key, url);
259 /* 配置MQTT服务器地址 */
260 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host);
261 /* 配置MQTT服务器端口 */
262 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
263 /* 配置设备productKey */
264 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key);
265 /* 配置设备deviceName */
266 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name);
267 /* 配置设备deviceSecret */
268 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret);
269 /* 配置网络连接的安全凭据, 上面已经创建好了 */
270 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
271 /* 配置MQTT心跳间隔 */
272 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_KEEPALIVE_SEC, (void *)&keepaliveSec);
273 /* 配置回调参数 */
274 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERDATA, userdata);
275 /* 配置MQTT默认消息接收回调函数 */
276 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)aiot_app_mqtt_recv_handler);
277 /* 配置MQTT事件回调函数 */
278 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)aiot_app_mqtt_event_handler);
279
280 /* 与服务器建立MQTT连接 */
281 res = aiot_mqtt_connect(mqtt_handle);
282 if (res < STATE_SUCCESS)
283 {
284 /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
285 aiot_mqtt_deinit(&mqtt_handle);
286 amp_debug(MOD_STR, "aiot_mqtt_connect failed: -0x%04X", -res);
287 aos_task_exit(0);
288 return -1;
289 }
290
291 /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活, 以及重发QoS1的未应答报文 */
292 g_app_mqtt_process_thread_running = 1;
293
294 aos_task_t mqtt_process_task;
295
296 if (aos_task_new_ext(&mqtt_process_task, "mqtt_process", aiot_app_mqtt_process_thread, mqtt_handle, 1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
297 amp_debug(MOD_STR, "management mqtt process task create failed!");
298 aiot_mqtt_deinit(&mqtt_handle);
299 aos_task_exit(0);
300 return -1;
301 }
302 amp_debug(MOD_STR, "app mqtt process start");
303
304 /* 创建一个单独的线程用于执行aiot_mqtt_recv, 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */
305 g_app_mqtt_recv_thread_running = 1;
306
307 aos_task_t mqtt_rec_task;
308
309 if (aos_task_new_ext(&mqtt_rec_task, "mqtt_rec", aiot_app_mqtt_recv_thread, mqtt_handle, 1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
310 amp_debug(MOD_STR, "management mqtt rec task create failed!");
311 aiot_mqtt_deinit(&mqtt_handle);
312 aos_task_exit(0);
313 return -1;
314 }
315 amp_debug(MOD_STR, "app mqtt rec start");
316
317 *handle = mqtt_handle;
318 #ifdef AOS_COMP_UAGENT
319 res = uagent_mqtt_client_set(mqtt_handle);
320 if (res != 0) {
321 amp_debug(MOD_STR, "uAgent mqtt handle set failed ret = %d\n", res);
322 }
323
324 res = uagent_ext_comm_start(product_key, device_name);
325 if (res != 0) {
326 amp_debug(MOD_STR, "uAgent ext comm start failed ret = %d\n", res);
327 }
328 #endif
329
330 return 0;
331 }
332
333 /* mqtt stop */
aiot_mqtt_client_stop(void ** handle)334 int32_t aiot_mqtt_client_stop(void **handle)
335 {
336 int32_t res = STATE_SUCCESS;
337 void *mqtt_handle = NULL;
338
339 mqtt_handle = *handle;
340
341 g_app_mqtt_process_thread_running = 0;
342 g_app_mqtt_recv_thread_running = 0;
343
344 /* 断开MQTT连接 */
345 res = aiot_mqtt_disconnect(mqtt_handle);
346 if (res < STATE_SUCCESS) {
347 aiot_mqtt_deinit(&mqtt_handle);
348 amp_debug(MOD_STR, "aiot_mqtt_disconnect failed: -0x%04X", -res);
349 return -1;
350 }
351
352 /* 销毁MQTT实例 */
353 res = aiot_mqtt_deinit(&mqtt_handle);
354 if (res < STATE_SUCCESS) {
355 amp_debug(MOD_STR, "aiot_mqtt_deinit failed: -0x%04X", -res);
356 return -1;
357 }
358
359 return res;
360 }
361