1# Test when client does a TCP RST on an open connection
2
3import struct, time, socket, select
4
5PORT = 8000
6
7
8def convert_poll_list(l):
9    # To be compatible across all ports/targets
10    return [ev for _, ev in l]
11
12
13# Server
14def instance0():
15    multitest.globals(IP=multitest.get_network_ip())
16    s = socket.socket()
17    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
18    s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
19    s.listen(1)
20    multitest.next()
21    s2, _ = s.accept()
22    s2.setblocking(False)
23    poll = select.poll()
24    poll.register(s2, select.POLLIN)
25    time.sleep(0.4)
26    print(convert_poll_list(poll.poll(1000)))
27    # TODO: the following recv don't work with lwip, it abandons data upon TCP RST
28    try:
29        print(s2.recv(10))
30        print(convert_poll_list(poll.poll(1000)))
31        print(s2.recv(10))
32        print(convert_poll_list(poll.poll(1000)))
33        print(s2.recv(10))
34        print(convert_poll_list(poll.poll(1000)))
35    except OSError as er:
36        print(er.args[0])
37    print(convert_poll_list(poll.poll(1000)))
38    # TODO lwip raises here but apparently it shouldn't
39    print(s2.recv(10))
40    print(convert_poll_list(poll.poll(1000)))
41    s.close()
42
43
44# Client
45def instance1():
46    if not hasattr(socket, "SO_LINGER"):
47        multitest.skip()
48    multitest.next()
49    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
50    s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
51    lgr_onoff = 1
52    lgr_linger = 0
53    s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", lgr_onoff, lgr_linger))
54    s.send(b"GET / HTTP/1.0\r\n\r\n")
55    time.sleep(0.2)
56    s.close()  # This issues a TCP RST since we've set the linger option
57