1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "ringbuf.h"
17 #include <stdbool.h>
18 #include <stdint.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23 
24 
25 #define RB_TAG "RINGBUF"
26 
rb_init(const char * name,uint32_t size)27 ringbuf_t* rb_init(const char* name, uint32_t size) {
28   ringbuf_t* r;
29   unsigned char* buf;
30 
31   if (size < 2 || !name) {
32     return NULL;
33   }
34 
35   r = malloc(sizeof(ringbuf_t));
36   assert(r);
37 #if (CONFIG_SPIRAM_SUPPORT && \
38      (CONFIG_SPIRAM_USE_CAPS_ALLOC || CONFIG_SPIRAM_USE_MALLOC))
39   buf = heap_caps_calloc(1, size, MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT);
40 #else
41   buf = calloc(1, size);
42 #endif
43   assert(buf);
44 
45   r->name = (char*)name;
46   r->base = r->readptr = r->writeptr = buf;
47   r->fill_cnt = 0;
48   r->size = size;
49 
50   //vSemaphoreCreateBinary(r->can_read);
51   aos_sem_new(&r->can_read, 0);
52   assert(r->can_read);
53   // vSemaphoreCreateBinary(r->can_write);
54   aos_sem_new(&r->can_write, 0);
55   assert(r->can_write);
56   // r->lock = xSemaphoreCreateMutex();
57   aos_mutex_new(&r->lock);
58   assert(r->lock);
59 
60   r->abort_read = 0;
61   r->abort_write = 0;
62   r->writer_finished = 0;
63   r->reader_unblock = 0;
64 
65   return r;
66 }
67 
rb_cleanup(ringbuf_t * rb)68 void rb_cleanup(ringbuf_t* rb) {
69   free(rb->base);
70   rb->base = NULL;
71   // vSemaphoreDelete(rb->can_read);
72   aos_sem_free(&rb->can_read);
73   rb->can_read = NULL;
74   // vSemaphoreDelete(rb->can_write);
75   aos_sem_free(&rb->can_write);
76   rb->can_write = NULL;
77   // vSemaphoreDelete(rb->lock);
78   aos_mutex_free(&rb->lock);
79   rb->lock = NULL;
80   free(rb);
81 }
82 
83 /*
84  * @brief: get the number of filled bytes in the buffer
85  */
rb_filled(ringbuf_t * rb)86 ssize_t rb_filled(ringbuf_t* rb) { return rb->fill_cnt; }
87 
88 /*
89  * @brief: get the number of empty bytes available in the buffer
90  */
rb_available(ringbuf_t * rb)91 ssize_t rb_available(ringbuf_t* rb) {
92   LOGD(RB_TAG, "rb leftover %d bytes", rb->size - rb->fill_cnt);
93   return (rb->size - rb->fill_cnt);
94 }
95 
rb_read(ringbuf_t * rb,uint8_t * buf,int buf_len,uint32_t ticks_to_wait)96 int rb_read(ringbuf_t* rb, uint8_t* buf, int buf_len, uint32_t ticks_to_wait) {
97   int read_size;
98   int total_read_size = 0;
99 
100   /**
101    * In case where we are able to read buf_len in one go,
102    * we are not able to check for abort and keep returning buf_len as bytes
103    * read. Check for argument validity check and abort case before entering
104    * memcpy loop.
105    */
106 
107   if (rb == NULL || rb->abort_read == 1) {
108     return RB_FAIL;
109   }
110 
111   //xSemaphoreTake(rb->lock, portMAX_DELAY);
112   aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
113 
114   while (buf_len) {
115     if (rb->fill_cnt < buf_len) {
116       read_size = rb->fill_cnt;
117     } else {
118       read_size = buf_len;
119     }
120     if ((rb->readptr + read_size) > (rb->base + rb->size)) {
121       int rlen1 = rb->base + rb->size - rb->readptr;
122       int rlen2 = read_size - rlen1;
123       if (buf) {
124         memcpy(buf, rb->readptr, rlen1);
125         memcpy(buf + rlen1, rb->base, rlen2);
126       }
127       rb->readptr = rb->base + rlen2;
128     } else {
129       if (buf) {
130         memcpy(buf, rb->readptr, read_size);
131       }
132       rb->readptr = rb->readptr + read_size;
133     }
134 
135     buf_len -= read_size;
136     rb->fill_cnt -= read_size;
137     total_read_size += read_size;
138     if (buf) {
139       buf += read_size;
140     }
141 
142     // xSemaphoreGive(rb->can_write);
143     aos_sem_signal(&rb->can_write);
144     if (buf_len == 0) {
145       break;
146     }
147 
148     // xSemaphoreGive(rb->lock);
149     aos_mutex_unlock(&rb->lock);
150     if (!rb->writer_finished && !rb->abort_read && !rb->reader_unblock) {
151       // if (xSemaphoreTake(rb->can_read, ticks_to_wait) != pdTRUE) {
152       if (aos_sem_wait(&rb->can_read, ticks_to_wait) != pdTRUE) {
153         goto out;
154       }
155     }
156     if (rb->abort_read == 1) {
157       total_read_size = RB_ABORT;
158       goto out;
159     }
160     if (rb->writer_finished == 1) {
161       goto out;
162     }
163     if (rb->reader_unblock == 1) {
164       if (total_read_size == 0) {
165         total_read_size = RB_READER_UNBLOCK;
166       }
167       goto out;
168     }
169 
170     aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
171     //xSemaphoreTake(rb->lock, portMAX_DELAY);
172   }
173 
174   // xSemaphoreGive(rb->lock);
175   aos_mutex_unlock(&rb->lock);
176 out:
177   if (rb->writer_finished == 1 && total_read_size == 0) {
178     total_read_size = RB_WRITER_FINISHED;
179   }
180   rb->reader_unblock = 0; /* We are anyway unblocking reader */
181   return total_read_size;
182 }
183 
rb_write(ringbuf_t * rb,const uint8_t * buf,int buf_len,uint32_t ticks_to_wait)184 int rb_write(ringbuf_t* rb, const uint8_t* buf, int buf_len,
185              uint32_t ticks_to_wait) {
186   int write_size;
187   int total_write_size = 0;
188 
189   /**
190    * In case where we are able to write buf_len in one go,
191    * we are not able to check for abort and keep returning buf_len as bytes
192    * written. Check for arguments' validity and abort case before entering
193    * memcpy loop.
194    */
195 
196   if (rb == NULL || buf == NULL || rb->abort_write == 1) {
197     return RB_FAIL;
198   }
199 
200   //xSemaphoreTake(rb->lock, portMAX_DELAY);
201   aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
202 
203   while (buf_len) {
204     if ((rb->size - rb->fill_cnt) < buf_len) {
205       write_size = rb->size - rb->fill_cnt;
206     } else {
207       write_size = buf_len;
208     }
209     if ((rb->writeptr + write_size) > (rb->base + rb->size)) {
210       int wlen1 = rb->base + rb->size - rb->writeptr;
211       int wlen2 = write_size - wlen1;
212       memcpy(rb->writeptr, buf, wlen1);
213       memcpy(rb->base, buf + wlen1, wlen2);
214       rb->writeptr = rb->base + wlen2;
215     } else {
216       memcpy(rb->writeptr, buf, write_size);
217       rb->writeptr = rb->writeptr + write_size;
218     }
219 
220     buf_len -= write_size;
221     rb->fill_cnt += write_size;
222     total_write_size += write_size;
223     buf += write_size;
224 
225     // xSemaphoreGive(rb->can_read);
226     aos_sem_signal(&rb->can_read);
227     if (buf_len == 0) {
228       break;
229     }
230 
231     // xSemaphoreGive(rb->lock);
232     aos_mutex_unlock(&rb->lock);
233     if (rb->writer_finished) {
234       return write_size > 0 ? write_size : RB_WRITER_FINISHED;
235     }
236     // if (xSemaphoreTake(rb->can_write, ticks_to_wait) != pdTRUE) {
237     if (aos_sem_wait(&rb->can_write, ticks_to_wait) != pdTRUE) {
238       goto out;
239     }
240     if (rb->abort_write == 1) {
241       goto out;
242     }
243     // xSemaphoreTake(rb->lock, portMAX_DELAY);
244     aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
245   }
246 
247   // xSemaphoreGive(rb->lock);
248   aos_mutex_unlock(&rb->lock);
249 out:
250   return total_write_size;
251 }
252 
253 /**
254  * abort and set abort_read and abort_write to asked values.
255  */
_rb_reset(ringbuf_t * rb,int abort_read,int abort_write)256 static void _rb_reset(ringbuf_t* rb, int abort_read, int abort_write) {
257   if (rb == NULL) {
258     return;
259   }
260   // xSemaphoreTake(rb->lock, portMAX_DELAY);
261   aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
262   rb->readptr = rb->writeptr = rb->base;
263   rb->fill_cnt = 0;
264   rb->writer_finished = 0;
265   rb->reader_unblock = 0;
266   rb->abort_read = abort_read;
267   rb->abort_write = abort_write;
268   // xSemaphoreGive(rb->lock);
269   aos_mutex_unlock(&rb->lock);
270 }
271 
rb_reset(ringbuf_t * rb)272 void rb_reset(ringbuf_t* rb) { _rb_reset(rb, 0, 0); }
273 
rb_abort_read(ringbuf_t * rb)274 void rb_abort_read(ringbuf_t* rb) {
275   if (rb == NULL) {
276     return;
277   }
278   rb->abort_read = 1;
279   // xSemaphoreGive(rb->can_read);
280   // xSemaphoreGive(rb->lock);
281   aos_sem_signal(&rb->can_read);
282   aos_mutex_unlock(&rb->lock);
283 }
284 
rb_abort_write(ringbuf_t * rb)285 void rb_abort_write(ringbuf_t* rb) {
286   if (rb == NULL) {
287     return;
288   }
289   rb->abort_write = 1;
290   // xSemaphoreGive(rb->can_write);
291   // xSemaphoreGive(rb->lock);
292   aos_sem_signal(&rb->can_write);
293   aos_mutex_unlock(&rb->lock);
294 }
295 
rb_abort(ringbuf_t * rb)296 void rb_abort(ringbuf_t* rb) {
297   if (rb == NULL) {
298     return;
299   }
300   rb->abort_read = 1;
301   rb->abort_write = 1;
302   // xSemaphoreGive(rb->can_read);
303   // xSemaphoreGive(rb->can_write);
304   // xSemaphoreGive(rb->lock);
305   aos_sem_signal(&rb->can_read);
306   aos_sem_signal(&rb->can_write);
307   aos_mutex_unlock(&rb->lock);
308 }
309 
310 /**
311  * Reset the ringbuffer and keep rb_write aborted.
312  * Note that we are taking lock before even toggling `abort_write` variable.
313  * This serves a special purpose to not allow this abort to be mixed with
314  * rb_write.
315  */
rb_reset_and_abort_write(ringbuf_t * rb)316 void rb_reset_and_abort_write(ringbuf_t* rb) {
317   _rb_reset(rb, 0, 1);
318   // xSemaphoreGive(rb->can_write);
319   aos_sem_signal(&rb->can_write);
320 }
321 
rb_signal_writer_finished(ringbuf_t * rb)322 void rb_signal_writer_finished(ringbuf_t* rb) {
323   if (rb == NULL) {
324     return;
325   }
326   rb->writer_finished = 1;
327   // xSemaphoreGive(rb->can_read);
328   aos_sem_signal(&rb->can_read);
329 }
330 
rb_is_writer_finished(ringbuf_t * rb)331 int rb_is_writer_finished(ringbuf_t* rb) {
332   if (rb == NULL) {
333     return RB_FAIL;
334   }
335   return (rb->writer_finished);
336 }
337 
rb_wakeup_reader(ringbuf_t * rb)338 void rb_wakeup_reader(ringbuf_t* rb) {
339   if (rb == NULL) {
340     return;
341   }
342   rb->reader_unblock = 1;
343   // xSemaphoreGive(rb->can_read);
344   aos_sem_signal(&rb->can_read);
345 }
346 
rb_stat(ringbuf_t * rb)347 void rb_stat(ringbuf_t* rb) {
348   aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
349   //xSemaphoreTake(rb->lock, portMAX_DELAY);
350   LOGI(RB_TAG,
351            "filled: %d, base: %p, read_ptr: %p, write_ptr: %p, size: %d\n",
352            rb->fill_cnt, rb->base, rb->readptr, rb->writeptr, rb->size);
353   // xSemaphoreGive(rb->lock);
354   aos_mutex_unlock(&rb->lock);
355 }
356