1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "ringbuf.h"
17 #include <stdbool.h>
18 #include <stdint.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23
24
25 #define RB_TAG "RINGBUF"
26
rb_init(const char * name,uint32_t size)27 ringbuf_t* rb_init(const char* name, uint32_t size) {
28 ringbuf_t* r;
29 unsigned char* buf;
30
31 if (size < 2 || !name) {
32 return NULL;
33 }
34
35 r = malloc(sizeof(ringbuf_t));
36 assert(r);
37 #if (CONFIG_SPIRAM_SUPPORT && \
38 (CONFIG_SPIRAM_USE_CAPS_ALLOC || CONFIG_SPIRAM_USE_MALLOC))
39 buf = heap_caps_calloc(1, size, MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT);
40 #else
41 buf = calloc(1, size);
42 #endif
43 assert(buf);
44
45 r->name = (char*)name;
46 r->base = r->readptr = r->writeptr = buf;
47 r->fill_cnt = 0;
48 r->size = size;
49
50 //vSemaphoreCreateBinary(r->can_read);
51 aos_sem_new(&r->can_read, 0);
52 assert(r->can_read);
53 // vSemaphoreCreateBinary(r->can_write);
54 aos_sem_new(&r->can_write, 0);
55 assert(r->can_write);
56 // r->lock = xSemaphoreCreateMutex();
57 aos_mutex_new(&r->lock);
58 assert(r->lock);
59
60 r->abort_read = 0;
61 r->abort_write = 0;
62 r->writer_finished = 0;
63 r->reader_unblock = 0;
64
65 return r;
66 }
67
rb_cleanup(ringbuf_t * rb)68 void rb_cleanup(ringbuf_t* rb) {
69 free(rb->base);
70 rb->base = NULL;
71 // vSemaphoreDelete(rb->can_read);
72 aos_sem_free(&rb->can_read);
73 rb->can_read = NULL;
74 // vSemaphoreDelete(rb->can_write);
75 aos_sem_free(&rb->can_write);
76 rb->can_write = NULL;
77 // vSemaphoreDelete(rb->lock);
78 aos_mutex_free(&rb->lock);
79 rb->lock = NULL;
80 free(rb);
81 }
82
83 /*
84 * @brief: get the number of filled bytes in the buffer
85 */
rb_filled(ringbuf_t * rb)86 ssize_t rb_filled(ringbuf_t* rb) { return rb->fill_cnt; }
87
88 /*
89 * @brief: get the number of empty bytes available in the buffer
90 */
rb_available(ringbuf_t * rb)91 ssize_t rb_available(ringbuf_t* rb) {
92 LOGD(RB_TAG, "rb leftover %d bytes", rb->size - rb->fill_cnt);
93 return (rb->size - rb->fill_cnt);
94 }
95
rb_read(ringbuf_t * rb,uint8_t * buf,int buf_len,uint32_t ticks_to_wait)96 int rb_read(ringbuf_t* rb, uint8_t* buf, int buf_len, uint32_t ticks_to_wait) {
97 int read_size;
98 int total_read_size = 0;
99
100 /**
101 * In case where we are able to read buf_len in one go,
102 * we are not able to check for abort and keep returning buf_len as bytes
103 * read. Check for argument validity check and abort case before entering
104 * memcpy loop.
105 */
106
107 if (rb == NULL || rb->abort_read == 1) {
108 return RB_FAIL;
109 }
110
111 //xSemaphoreTake(rb->lock, portMAX_DELAY);
112 aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
113
114 while (buf_len) {
115 if (rb->fill_cnt < buf_len) {
116 read_size = rb->fill_cnt;
117 } else {
118 read_size = buf_len;
119 }
120 if ((rb->readptr + read_size) > (rb->base + rb->size)) {
121 int rlen1 = rb->base + rb->size - rb->readptr;
122 int rlen2 = read_size - rlen1;
123 if (buf) {
124 memcpy(buf, rb->readptr, rlen1);
125 memcpy(buf + rlen1, rb->base, rlen2);
126 }
127 rb->readptr = rb->base + rlen2;
128 } else {
129 if (buf) {
130 memcpy(buf, rb->readptr, read_size);
131 }
132 rb->readptr = rb->readptr + read_size;
133 }
134
135 buf_len -= read_size;
136 rb->fill_cnt -= read_size;
137 total_read_size += read_size;
138 if (buf) {
139 buf += read_size;
140 }
141
142 // xSemaphoreGive(rb->can_write);
143 aos_sem_signal(&rb->can_write);
144 if (buf_len == 0) {
145 break;
146 }
147
148 // xSemaphoreGive(rb->lock);
149 aos_mutex_unlock(&rb->lock);
150 if (!rb->writer_finished && !rb->abort_read && !rb->reader_unblock) {
151 // if (xSemaphoreTake(rb->can_read, ticks_to_wait) != pdTRUE) {
152 if (aos_sem_wait(&rb->can_read, ticks_to_wait) != pdTRUE) {
153 goto out;
154 }
155 }
156 if (rb->abort_read == 1) {
157 total_read_size = RB_ABORT;
158 goto out;
159 }
160 if (rb->writer_finished == 1) {
161 goto out;
162 }
163 if (rb->reader_unblock == 1) {
164 if (total_read_size == 0) {
165 total_read_size = RB_READER_UNBLOCK;
166 }
167 goto out;
168 }
169
170 aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
171 //xSemaphoreTake(rb->lock, portMAX_DELAY);
172 }
173
174 // xSemaphoreGive(rb->lock);
175 aos_mutex_unlock(&rb->lock);
176 out:
177 if (rb->writer_finished == 1 && total_read_size == 0) {
178 total_read_size = RB_WRITER_FINISHED;
179 }
180 rb->reader_unblock = 0; /* We are anyway unblocking reader */
181 return total_read_size;
182 }
183
rb_write(ringbuf_t * rb,const uint8_t * buf,int buf_len,uint32_t ticks_to_wait)184 int rb_write(ringbuf_t* rb, const uint8_t* buf, int buf_len,
185 uint32_t ticks_to_wait) {
186 int write_size;
187 int total_write_size = 0;
188
189 /**
190 * In case where we are able to write buf_len in one go,
191 * we are not able to check for abort and keep returning buf_len as bytes
192 * written. Check for arguments' validity and abort case before entering
193 * memcpy loop.
194 */
195
196 if (rb == NULL || buf == NULL || rb->abort_write == 1) {
197 return RB_FAIL;
198 }
199
200 //xSemaphoreTake(rb->lock, portMAX_DELAY);
201 aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
202
203 while (buf_len) {
204 if ((rb->size - rb->fill_cnt) < buf_len) {
205 write_size = rb->size - rb->fill_cnt;
206 } else {
207 write_size = buf_len;
208 }
209 if ((rb->writeptr + write_size) > (rb->base + rb->size)) {
210 int wlen1 = rb->base + rb->size - rb->writeptr;
211 int wlen2 = write_size - wlen1;
212 memcpy(rb->writeptr, buf, wlen1);
213 memcpy(rb->base, buf + wlen1, wlen2);
214 rb->writeptr = rb->base + wlen2;
215 } else {
216 memcpy(rb->writeptr, buf, write_size);
217 rb->writeptr = rb->writeptr + write_size;
218 }
219
220 buf_len -= write_size;
221 rb->fill_cnt += write_size;
222 total_write_size += write_size;
223 buf += write_size;
224
225 // xSemaphoreGive(rb->can_read);
226 aos_sem_signal(&rb->can_read);
227 if (buf_len == 0) {
228 break;
229 }
230
231 // xSemaphoreGive(rb->lock);
232 aos_mutex_unlock(&rb->lock);
233 if (rb->writer_finished) {
234 return write_size > 0 ? write_size : RB_WRITER_FINISHED;
235 }
236 // if (xSemaphoreTake(rb->can_write, ticks_to_wait) != pdTRUE) {
237 if (aos_sem_wait(&rb->can_write, ticks_to_wait) != pdTRUE) {
238 goto out;
239 }
240 if (rb->abort_write == 1) {
241 goto out;
242 }
243 // xSemaphoreTake(rb->lock, portMAX_DELAY);
244 aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
245 }
246
247 // xSemaphoreGive(rb->lock);
248 aos_mutex_unlock(&rb->lock);
249 out:
250 return total_write_size;
251 }
252
253 /**
254 * abort and set abort_read and abort_write to asked values.
255 */
_rb_reset(ringbuf_t * rb,int abort_read,int abort_write)256 static void _rb_reset(ringbuf_t* rb, int abort_read, int abort_write) {
257 if (rb == NULL) {
258 return;
259 }
260 // xSemaphoreTake(rb->lock, portMAX_DELAY);
261 aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
262 rb->readptr = rb->writeptr = rb->base;
263 rb->fill_cnt = 0;
264 rb->writer_finished = 0;
265 rb->reader_unblock = 0;
266 rb->abort_read = abort_read;
267 rb->abort_write = abort_write;
268 // xSemaphoreGive(rb->lock);
269 aos_mutex_unlock(&rb->lock);
270 }
271
rb_reset(ringbuf_t * rb)272 void rb_reset(ringbuf_t* rb) { _rb_reset(rb, 0, 0); }
273
rb_abort_read(ringbuf_t * rb)274 void rb_abort_read(ringbuf_t* rb) {
275 if (rb == NULL) {
276 return;
277 }
278 rb->abort_read = 1;
279 // xSemaphoreGive(rb->can_read);
280 // xSemaphoreGive(rb->lock);
281 aos_sem_signal(&rb->can_read);
282 aos_mutex_unlock(&rb->lock);
283 }
284
rb_abort_write(ringbuf_t * rb)285 void rb_abort_write(ringbuf_t* rb) {
286 if (rb == NULL) {
287 return;
288 }
289 rb->abort_write = 1;
290 // xSemaphoreGive(rb->can_write);
291 // xSemaphoreGive(rb->lock);
292 aos_sem_signal(&rb->can_write);
293 aos_mutex_unlock(&rb->lock);
294 }
295
rb_abort(ringbuf_t * rb)296 void rb_abort(ringbuf_t* rb) {
297 if (rb == NULL) {
298 return;
299 }
300 rb->abort_read = 1;
301 rb->abort_write = 1;
302 // xSemaphoreGive(rb->can_read);
303 // xSemaphoreGive(rb->can_write);
304 // xSemaphoreGive(rb->lock);
305 aos_sem_signal(&rb->can_read);
306 aos_sem_signal(&rb->can_write);
307 aos_mutex_unlock(&rb->lock);
308 }
309
310 /**
311 * Reset the ringbuffer and keep rb_write aborted.
312 * Note that we are taking lock before even toggling `abort_write` variable.
313 * This serves a special purpose to not allow this abort to be mixed with
314 * rb_write.
315 */
rb_reset_and_abort_write(ringbuf_t * rb)316 void rb_reset_and_abort_write(ringbuf_t* rb) {
317 _rb_reset(rb, 0, 1);
318 // xSemaphoreGive(rb->can_write);
319 aos_sem_signal(&rb->can_write);
320 }
321
rb_signal_writer_finished(ringbuf_t * rb)322 void rb_signal_writer_finished(ringbuf_t* rb) {
323 if (rb == NULL) {
324 return;
325 }
326 rb->writer_finished = 1;
327 // xSemaphoreGive(rb->can_read);
328 aos_sem_signal(&rb->can_read);
329 }
330
rb_is_writer_finished(ringbuf_t * rb)331 int rb_is_writer_finished(ringbuf_t* rb) {
332 if (rb == NULL) {
333 return RB_FAIL;
334 }
335 return (rb->writer_finished);
336 }
337
rb_wakeup_reader(ringbuf_t * rb)338 void rb_wakeup_reader(ringbuf_t* rb) {
339 if (rb == NULL) {
340 return;
341 }
342 rb->reader_unblock = 1;
343 // xSemaphoreGive(rb->can_read);
344 aos_sem_signal(&rb->can_read);
345 }
346
rb_stat(ringbuf_t * rb)347 void rb_stat(ringbuf_t* rb) {
348 aos_mutex_lock(&rb->lock, AOS_WAIT_FOREVER);
349 //xSemaphoreTake(rb->lock, portMAX_DELAY);
350 LOGI(RB_TAG,
351 "filled: %d, base: %p, read_ptr: %p, write_ptr: %p, size: %d\n",
352 rb->fill_cnt, rb->base, rb->readptr, rb->writeptr, rb->size);
353 // xSemaphoreGive(rb->lock);
354 aos_mutex_unlock(&rb->lock);
355 }
356