1try:
2    from pyb import CAN
3except ImportError:
4    print("SKIP")
5    raise SystemExit
6
7from array import array
8import micropython
9import pyb
10
11# test we can correctly create by id (2 handled in can2.py test)
12for bus in (-1, 0, 1, 3):
13    try:
14        CAN(bus, CAN.LOOPBACK)
15        print("CAN", bus)
16    except ValueError:
17        print("ValueError", bus)
18CAN(1).deinit()
19
20CAN.initfilterbanks(14)
21can = CAN(1)
22print(can)
23
24# Test state when de-init'd
25print(can.state() == can.STOPPED)
26
27can.init(CAN.LOOPBACK)
28print(can)
29print(can.any(0))
30
31# Test state when freshly created
32print(can.state() == can.ERROR_ACTIVE)
33
34# Test that restart can be called
35can.restart()
36
37# Test info returns a sensible value
38print(can.info())
39
40# Catch all filter
41can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0))
42
43can.send("abcd", 123, timeout=5000)
44print(can.any(0), can.info())
45print(can.recv(0))
46
47can.send("abcd", -1, timeout=5000)
48print(can.recv(0))
49
50can.send("abcd", 0x7FF + 1, timeout=5000)
51print(can.recv(0))
52
53# Test too long message
54try:
55    can.send("abcdefghi", 0x7FF, timeout=5000)
56except ValueError:
57    print("passed")
58else:
59    print("failed")
60
61# Test that recv can work without allocating memory on the heap
62
63buf = bytearray(10)
64l = [0, 0, 0, memoryview(buf)]
65l2 = None
66
67micropython.heap_lock()
68
69can.send("", 42)
70l2 = can.recv(0, l)
71assert l is l2
72print(l, len(l[3]), buf)
73
74can.send("1234", 42)
75l2 = can.recv(0, l)
76assert l is l2
77print(l, len(l[3]), buf)
78
79can.send("01234567", 42)
80l2 = can.recv(0, l)
81assert l is l2
82print(l, len(l[3]), buf)
83
84can.send("abc", 42)
85l2 = can.recv(0, l)
86assert l is l2
87print(l, len(l[3]), buf)
88
89micropython.heap_unlock()
90
91# Test that recv can work with different arrays behind the memoryview
92can.send("abc", 1)
93print(bytes(can.recv(0, [0, 0, 0, memoryview(array("B", range(8)))])[3]))
94can.send("def", 1)
95print(bytes(can.recv(0, [0, 0, 0, memoryview(array("b", range(8)))])[3]))
96
97# Test for non-list passed as second arg to recv
98can.send("abc", 1)
99try:
100    can.recv(0, 1)
101except TypeError:
102    print("TypeError")
103
104# Test for too-short-list passed as second arg to recv
105can.send("abc", 1)
106try:
107    can.recv(0, [0, 0, 0])
108except ValueError:
109    print("ValueError")
110
111# Test for non-memoryview passed as 4th element to recv
112can.send("abc", 1)
113try:
114    can.recv(0, [0, 0, 0, 0])
115except TypeError:
116    print("TypeError")
117
118# Test for read-only-memoryview passed as 4th element to recv
119can.send("abc", 1)
120try:
121    can.recv(0, [0, 0, 0, memoryview(bytes(8))])
122except ValueError:
123    print("ValueError")
124
125# Test for bad-typecode-memoryview passed as 4th element to recv
126can.send("abc", 1)
127try:
128    can.recv(0, [0, 0, 0, memoryview(array("i", range(8)))])
129except ValueError:
130    print("ValueError")
131
132del can
133
134# Testing extended IDs
135can = CAN(1, CAN.LOOPBACK, extframe=True)
136# Catch all filter
137can.setfilter(0, CAN.MASK32, 0, (0, 0))
138
139print(can)
140
141try:
142    can.send("abcde", 0x7FF + 1, timeout=5000)
143except ValueError:
144    print("failed")
145else:
146    r = can.recv(0)
147    if r[0] == 0x7FF + 1 and r[3] == b"abcde":
148        print("passed")
149    else:
150        print("failed, wrong data received")
151
152# Test filters
153for n in [0, 8, 16, 24]:
154    filter_id = 0b00001000 << n
155    filter_mask = 0b00011100 << n
156    id_ok = 0b00001010 << n
157    id_fail = 0b00011010 << n
158
159    can.clearfilter(0)
160    can.setfilter(0, pyb.CAN.MASK32, 0, (filter_id, filter_mask))
161
162    can.send("ok", id_ok, timeout=3)
163    if can.any(0):
164        msg = can.recv(0)
165        print((hex(filter_id), hex(filter_mask), hex(msg[0]), msg[3]))
166
167    can.send("fail", id_fail, timeout=3)
168    if can.any(0):
169        msg = can.recv(0)
170        print((hex(filter_id), hex(filter_mask), hex(msg[0]), msg[3]))
171
172del can
173
174# Test RxCallbacks
175can = CAN(1, CAN.LOOPBACK)
176can.setfilter(0, CAN.LIST16, 0, (1, 2, 3, 4))
177can.setfilter(1, CAN.LIST16, 1, (5, 6, 7, 8))
178
179
180def cb0(bus, reason):
181    print("cb0")
182    if reason == 0:
183        print("pending")
184    if reason == 1:
185        print("full")
186    if reason == 2:
187        print("overflow")
188
189
190def cb1(bus, reason):
191    print("cb1")
192    if reason == 0:
193        print("pending")
194    if reason == 1:
195        print("full")
196    if reason == 2:
197        print("overflow")
198
199
200def cb0a(bus, reason):
201    print("cb0a")
202    if reason == 0:
203        print("pending")
204    if reason == 1:
205        print("full")
206    if reason == 2:
207        print("overflow")
208
209
210def cb1a(bus, reason):
211    print("cb1a")
212    if reason == 0:
213        print("pending")
214    if reason == 1:
215        print("full")
216    if reason == 2:
217        print("overflow")
218
219
220can.rxcallback(0, cb0)
221can.rxcallback(1, cb1)
222
223can.send("11111111", 1, timeout=5000)
224can.send("22222222", 2, timeout=5000)
225can.send("33333333", 3, timeout=5000)
226can.rxcallback(0, cb0a)
227can.send("44444444", 4, timeout=5000)
228
229can.send("55555555", 5, timeout=5000)
230can.send("66666666", 6, timeout=5000)
231can.send("77777777", 7, timeout=5000)
232can.rxcallback(1, cb1a)
233can.send("88888888", 8, timeout=5000)
234
235print(can.recv(0))
236print(can.recv(0))
237print(can.recv(0))
238print(can.recv(1))
239print(can.recv(1))
240print(can.recv(1))
241
242can.send("11111111", 1, timeout=5000)
243can.send("55555555", 5, timeout=5000)
244
245print(can.recv(0))
246print(can.recv(1))
247
248del can
249
250# Testing asynchronous send
251can = CAN(1, CAN.LOOPBACK)
252can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0))
253
254while can.any(0):
255    can.recv(0)
256
257can.send("abcde", 1, timeout=0)
258print(can.any(0))
259while not can.any(0):
260    pass
261
262print(can.recv(0))
263
264try:
265    can.send("abcde", 2, timeout=0)
266    can.send("abcde", 3, timeout=0)
267    can.send("abcde", 4, timeout=0)
268    can.send("abcde", 5, timeout=0)
269except OSError as e:
270    if str(e) == "16":
271        print("passed")
272    else:
273        print("failed")
274
275pyb.delay(500)
276while can.any(0):
277    print(can.recv(0))
278
279# Testing rtr messages
280bus1 = CAN(1, CAN.LOOPBACK)
281while bus1.any(0):
282    bus1.recv(0)
283bus1.setfilter(0, CAN.LIST16, 0, (1, 2, 3, 4))
284bus1.setfilter(1, CAN.LIST16, 0, (5, 6, 7, 8), rtr=(True, True, True, True))
285bus1.setfilter(2, CAN.MASK16, 0, (64, 64, 32, 32), rtr=(False, True))
286
287bus1.send("", 1, rtr=True)
288print(bus1.any(0))
289bus1.send("", 5, rtr=True)
290print(bus1.recv(0))
291bus1.send("", 6, rtr=True)
292print(bus1.recv(0))
293bus1.send("", 7, rtr=True)
294print(bus1.recv(0))
295bus1.send("", 16, rtr=True)
296print(bus1.any(0))
297bus1.send("", 32, rtr=True)
298print(bus1.recv(0))
299
300# test HAL error, timeout
301can = pyb.CAN(1, pyb.CAN.NORMAL)
302try:
303    can.send("1", 1, timeout=50)
304except OSError as e:
305    print(repr(e))
306