1 #include "iotx_cm_internal.h" 2 3 #if defined(MQTT_COMM_ENABLED) || defined(MAL_ENABLED) 4 #include "iotx_cm_mqtt.h" 5 #endif 6 #ifdef COAP_COMM_ENABLED 7 #include "iotx_cm_coap.h" 8 #endif 9 10 static void *fd_lock = NULL; 11 static iotx_cm_connection_t *_cm_fd[CM_MAX_FD_NUM] = { NULL }; 12 static int _get_fd(iotx_cm_connection_t *handle); 13 static int _recycle_fd(int fd); 14 static inline int _fd_is_valid(int fd); 15 static int inited_conn_num = 0; 16 17 #ifdef DEVICE_MODEL_GATEWAY 18 static void *_iotx_cm_yield_thread_func(void *params); 19 static void *yield_thread = NULL; 20 static int yield_task_leave = 1; 21 #endif 22 23 const char ERR_INVALID_PARAMS[] = "invalid parameter"; 24 iotx_cm_open(iotx_cm_init_param_t * params)25 int iotx_cm_open(iotx_cm_init_param_t *params) 26 { 27 int fd; 28 iotx_cm_connection_t *connection = NULL; 29 30 switch (params->protocol_type) { 31 case IOTX_CM_PROTOCOL_TYPE_MQTT: 32 #if defined(MQTT_COMM_ENABLED) || defined(MAL_ENABLED) 33 connection = iotx_cm_open_mqtt(params); 34 #endif 35 break; 36 case IOTX_CM_PROTOCOL_TYPE_COAP: 37 #ifdef COAP_COMM_ENABLED 38 connection = iotx_cm_open_coap(params); 39 #endif 40 break; 41 default: 42 break; 43 } 44 45 if (connection == NULL) { 46 cm_err("cm opon failed"); 47 return -1; 48 } 49 fd = _get_fd(connection); 50 if (fd < 0) { 51 cm_err("get fd failed"); 52 connection->close_func(); 53 return -1; 54 } 55 connection->fd = fd; 56 return fd; 57 } 58 iotx_cm_connect(int fd,uint32_t timeout)59 int iotx_cm_connect(int fd, uint32_t timeout) 60 { 61 iotx_cm_connect_fp connect_func; 62 int ret; 63 64 if (_fd_is_valid(fd) < 0) { 65 cm_err(ERR_INVALID_PARAMS); 66 return -1; 67 } 68 HAL_MutexLock(fd_lock); 69 connect_func = _cm_fd[fd]->connect_func; 70 HAL_MutexUnlock(fd_lock); 71 72 iotx_event_post(IOTX_CONN_CLOUD); 73 74 ret = connect_func(timeout); 75 76 if (ret == 0) { 77 inited_conn_num++; 78 if (inited_conn_num == 1) { 79 #ifdef DEVICE_MODEL_GATEWAY 80 int stack_used; 81 hal_os_thread_param_t task_parms = { 0 }; 82 task_parms.stack_size = 6144; 83 task_parms.name = "cm_yield"; 84 ret = HAL_ThreadCreate(&yield_thread, _iotx_cm_yield_thread_func, 85 NULL, &task_parms, &stack_used); 86 if (ret < 0) { 87 inited_conn_num--; 88 } 89 #endif 90 } 91 iotx_event_post(IOTX_CONN_CLOUD_SUC); 92 } else { 93 iotx_event_post(IOTX_CONN_CLOUD_FAIL); 94 } 95 96 return ret; 97 } 98 _iotx_cm_yield(int fd,unsigned int timeout)99 static int _iotx_cm_yield(int fd, unsigned int timeout) 100 { 101 iotx_cm_yield_fp yield_func; 102 103 if (fd_lock == NULL) { 104 return NULL_VALUE_ERROR; 105 } 106 107 if (fd == -1) { 108 int i; 109 for (i = 0; i < CM_MAX_FD_NUM; i++) { 110 yield_func = NULL; 111 HAL_MutexLock(fd_lock); 112 if (_cm_fd[i] != NULL) { 113 yield_func = _cm_fd[i]->yield_func; 114 } 115 HAL_MutexUnlock(fd_lock); 116 if (yield_func != NULL) { 117 yield_func(timeout); 118 } 119 } 120 return 0; 121 } 122 123 if (_fd_is_valid(fd) < 0) { 124 cm_err(ERR_INVALID_PARAMS); 125 return -1; 126 } 127 128 HAL_MutexLock(fd_lock); 129 yield_func = _cm_fd[fd]->yield_func; 130 HAL_MutexUnlock(fd_lock); 131 return yield_func(timeout); 132 } 133 #ifdef DEVICE_MODEL_GATEWAY _iotx_cm_yield_thread_func(void * params)134 static void *_iotx_cm_yield_thread_func(void *params) 135 { 136 yield_task_leave = 0; 137 while (inited_conn_num > 0) { 138 _iotx_cm_yield(-1, CM_DEFAULT_YIELD_TIMEOUT); 139 } 140 yield_task_leave = 1; 141 return NULL; 142 } 143 #endif 144 iotx_cm_yield(int fd,unsigned int timeout)145 int iotx_cm_yield(int fd, unsigned int timeout) 146 { 147 #ifdef DEVICE_MODEL_GATEWAY 148 return 0; 149 #else 150 return _iotx_cm_yield(fd, timeout); 151 #endif 152 } 153 iotx_cm_sub(int fd,iotx_cm_ext_params_t * ext,const char * topic,iotx_cm_data_handle_cb topic_handle_func,void * pcontext)154 int iotx_cm_sub(int fd, iotx_cm_ext_params_t *ext, const char *topic, 155 iotx_cm_data_handle_cb topic_handle_func, void *pcontext) 156 { 157 iotx_cm_sub_fp sub_func; 158 159 if (_fd_is_valid(fd) < 0) { 160 cm_err(ERR_INVALID_PARAMS); 161 return -1; 162 } 163 164 HAL_MutexLock(fd_lock); 165 sub_func = _cm_fd[fd]->sub_func; 166 HAL_MutexUnlock(fd_lock); 167 return sub_func(ext, topic, topic_handle_func, pcontext); 168 } 169 iotx_cm_unsub(int fd,const char * topic)170 int iotx_cm_unsub(int fd, const char *topic) 171 { 172 iotx_cm_unsub_fp unsub_func; 173 174 if (_fd_is_valid(fd) < 0) { 175 cm_err(ERR_INVALID_PARAMS); 176 return -1; 177 } 178 179 HAL_MutexLock(fd_lock); 180 unsub_func = _cm_fd[fd]->unsub_func; 181 HAL_MutexUnlock(fd_lock); 182 return unsub_func(topic); 183 } 184 iotx_cm_pub(int fd,iotx_cm_ext_params_t * ext,const char * topic,const char * payload,unsigned int payload_len)185 int iotx_cm_pub(int fd, iotx_cm_ext_params_t *ext, const char *topic, 186 const char *payload, unsigned int payload_len) 187 { 188 iotx_cm_pub_fp pub_func; 189 190 if (_fd_is_valid(fd) < 0) { 191 cm_err(ERR_INVALID_PARAMS); 192 return -1; 193 } 194 195 HAL_MutexLock(fd_lock); 196 pub_func = _cm_fd[fd]->pub_func; 197 HAL_MutexUnlock(fd_lock); 198 return pub_func(ext, topic, payload, payload_len); 199 } 200 iotx_cm_close(int fd)201 int iotx_cm_close(int fd) 202 { 203 iotx_cm_close_fp close_func; 204 205 if (_fd_is_valid(fd) < 0) { 206 cm_err(ERR_INVALID_PARAMS); 207 return -1; 208 } 209 210 if (inited_conn_num > 0) { 211 inited_conn_num--; 212 } 213 214 if (inited_conn_num == 0) { 215 #ifdef DEVICE_MODEL_GATEWAY 216 while (!yield_task_leave) { 217 HAL_SleepMs(10); 218 } 219 yield_thread = NULL; 220 #endif 221 } 222 223 HAL_MutexLock(fd_lock); 224 close_func = _cm_fd[fd]->close_func; 225 HAL_MutexUnlock(fd_lock); 226 if (close_func() != 0) { 227 return -1; 228 } 229 if (_recycle_fd(fd) != 0) { 230 return -1; 231 } 232 233 if (inited_conn_num == 0) { 234 if (fd_lock != NULL) { 235 HAL_MutexDestroy(fd_lock); 236 fd_lock = NULL; 237 } 238 } 239 240 return 0; 241 } 242 _fd_is_valid(int fd)243 static inline int _fd_is_valid(int fd) 244 { 245 int ret; 246 247 if (fd_lock == NULL) { 248 return NULL_VALUE_ERROR; 249 } 250 251 HAL_MutexLock(fd_lock); 252 ret = (fd >= 0 && fd < CM_MAX_FD_NUM && _cm_fd[fd] != NULL) ? 0 : -1; 253 HAL_MutexUnlock(fd_lock); 254 return ret; 255 } 256 _recycle_fd(int fd)257 static int _recycle_fd(int fd) 258 { 259 if (fd_lock == NULL) { 260 fd_lock = HAL_MutexCreate(); 261 if (fd_lock == NULL) { 262 return -1; 263 } 264 } 265 266 if (fd < 0 || fd > CM_MAX_FD_NUM - 1) { 267 return -1; 268 } 269 270 HAL_MutexLock(fd_lock); 271 _cm_fd[fd] = NULL; 272 HAL_MutexUnlock(fd_lock); 273 274 return 0; 275 } 276 _get_fd(iotx_cm_connection_t * handle)277 static int _get_fd(iotx_cm_connection_t *handle) 278 { 279 int i; 280 if (handle == NULL) { 281 return NULL_VALUE_ERROR; 282 } 283 284 if (fd_lock == NULL) { 285 fd_lock = HAL_MutexCreate(); 286 if (fd_lock == NULL) { 287 return -1; 288 } 289 } 290 291 HAL_MutexLock(fd_lock); 292 for (i = 0; i < CM_MAX_FD_NUM; i++) { 293 if (_cm_fd[i] == NULL) { 294 _cm_fd[i] = handle; 295 HAL_MutexUnlock(fd_lock); 296 return i; 297 } 298 } 299 HAL_MutexUnlock(fd_lock); 300 cm_err("cm fd reached the limit"); 301 return -1; 302 } 303