1# Test when client does a TCP RST on an open connection 2 3import struct, time, socket, select 4 5PORT = 8000 6 7 8def convert_poll_list(l): 9 # To be compatible across all ports/targets 10 return [ev for _, ev in l] 11 12 13# Server 14def instance0(): 15 multitest.globals(IP=multitest.get_network_ip()) 16 s = socket.socket() 17 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 18 s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1]) 19 s.listen(1) 20 multitest.next() 21 s2, _ = s.accept() 22 s2.setblocking(False) 23 poll = select.poll() 24 poll.register(s2, select.POLLIN) 25 time.sleep(0.4) 26 print(convert_poll_list(poll.poll(1000))) 27 # TODO: the following recv don't work with lwip, it abandons data upon TCP RST 28 try: 29 print(s2.recv(10)) 30 print(convert_poll_list(poll.poll(1000))) 31 print(s2.recv(10)) 32 print(convert_poll_list(poll.poll(1000))) 33 print(s2.recv(10)) 34 print(convert_poll_list(poll.poll(1000))) 35 except OSError as er: 36 print(er.args[0]) 37 print(convert_poll_list(poll.poll(1000))) 38 # TODO lwip raises here but apparently it shouldn't 39 print(s2.recv(10)) 40 print(convert_poll_list(poll.poll(1000))) 41 s.close() 42 43 44# Client 45def instance1(): 46 if not hasattr(socket, "SO_LINGER"): 47 multitest.skip() 48 multitest.next() 49 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 50 s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) 51 lgr_onoff = 1 52 lgr_linger = 0 53 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", lgr_onoff, lgr_linger)) 54 s.send(b"GET / HTTP/1.0\r\n\r\n") 55 time.sleep(0.2) 56 s.close() # This issues a TCP RST since we've set the linger option 57