1try: 2 import uio 3 import uerrno 4 import uwebsocket 5except ImportError: 6 print("SKIP") 7 raise SystemExit 8 9# put raw data in the stream and do a websocket read 10def ws_read(msg, sz): 11 ws = uwebsocket.websocket(uio.BytesIO(msg)) 12 return ws.read(sz) 13 14 15# do a websocket write and then return the raw data from the stream 16def ws_write(msg, sz): 17 s = uio.BytesIO() 18 ws = uwebsocket.websocket(s) 19 ws.write(msg) 20 s.seek(0) 21 return s.read(sz) 22 23 24# basic frame 25print(ws_read(b"\x81\x04ping", 4)) 26print(ws_read(b"\x80\x04ping", 4)) # FRAME_CONT 27print(ws_write(b"pong", 6)) 28 29# split frames are not supported 30# print(ws_read(b"\x01\x04ping", 4)) 31 32# extended payloads 33print(ws_read(b"\x81~\x00\x80" + b"ping" * 32, 128)) 34print(ws_write(b"pong" * 32, 132)) 35 36# mask (returned data will be 'mask' ^ 'mask') 37print(ws_read(b"\x81\x84maskmask", 4)) 38 39# close control frame 40s = uio.BytesIO(b"\x88\x00") # FRAME_CLOSE 41ws = uwebsocket.websocket(s) 42print(ws.read(1)) 43s.seek(2) 44print(s.read(4)) 45 46# misc control frames 47print(ws_read(b"\x89\x00\x81\x04ping", 4)) # FRAME_PING 48print(ws_read(b"\x8a\x00\x81\x04pong", 4)) # FRAME_PONG 49 50# close method 51ws = uwebsocket.websocket(uio.BytesIO()) 52ws.close() 53 54# ioctl 55ws = uwebsocket.websocket(uio.BytesIO()) 56print(ws.ioctl(8)) # GET_DATA_OPTS 57print(ws.ioctl(9, 2)) # SET_DATA_OPTS 58print(ws.ioctl(9)) 59try: 60 ws.ioctl(-1) 61except OSError as e: 62 print("ioctl: EINVAL:", e.args[0] == uerrno.EINVAL) 63