From a2ad9c62ba7e02a2f40030d7e338054cad3ad0de Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 9 Feb 2024 10:44:29 +0100 Subject: [PATCH 1/3] Remove legacy Python 2 code --- boltstub/__init__.py | 4 ++-- boltstub/parsing.py | 10 +++++----- boltstub/watcher.py | 6 +++--- boltstub/wiring.py | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/boltstub/__init__.py b/boltstub/__init__.py index 0223f39f..3d1a0d9b 100644 --- a/boltstub/__init__.py +++ b/boltstub/__init__.py @@ -57,13 +57,13 @@ class BoltStubServer(TCPServer): timed_out = False def __init__(self, *args, **kwargs): - super(BoltStubServer, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) def handle_timeout(self): self.timed_out = True def server_activate(self): - super(BoltStubServer, self).server_activate() + super().server_activate() # Must be here, testkit waits for something to be written on stdout to # know when the server is listening. print("Listening") diff --git a/boltstub/parsing.py b/boltstub/parsing.py index fa6230fd..85295244 100644 --- a/boltstub/parsing.py +++ b/boltstub/parsing.py @@ -100,20 +100,20 @@ def __repr__(self): class Line(str, abc.ABC): def __new__(cls, line_number: int, raw_line, content: str): - obj = super(Line, cls).__new__(cls, raw_line) + obj = super().__new__(cls, raw_line) obj.line_number = line_number obj.content = content return obj def __str__(self): return "({:3}) {}".format(self.line_number, - super(Line, self).__str__()) + super().__str__()) def __repr__(self): return "<{}>{}".format(self.__class__.__name__, self.__str__()) def __getnewargs__(self): - return self.line_number, super(Line, self).__str__(), self.content + return self.line_number, super().__str__(), self.content @abc.abstractmethod def canonical(self): @@ -390,7 +390,7 @@ class ServerLine(MessageLine): always_parse = False def __new__(cls, *args, **kwargs): - obj = super(ServerLine, cls).__new__(cls, *args, **kwargs) + obj = super().__new__(cls, *args, **kwargs) obj.command_match = re.match(r"^<(.+?)>(.*)$", obj.content) obj.is_command = bool(obj.command_match) if not obj.is_command: @@ -623,7 +623,7 @@ def __init__(self, line: AutoLine, line_number: int): # A: RESET # This is to avoid ambiguity when it comes to `?:`, `*:`, and `+:` # macros. - super(AutoBlock, self).__init__([line], line_number) + super().__init__([line], line_number) def _consume(self, channel): msg = channel.consume(self.lines[self.index].line_number) diff --git a/boltstub/watcher.py b/boltstub/watcher.py index 1475c47d..160aa1cd 100644 --- a/boltstub/watcher.py +++ b/boltstub/watcher.py @@ -97,7 +97,7 @@ def bright_white(s): class ColourFormatter(Formatter): def format(self, record): - s = super(ColourFormatter, self).format(record) + s = super().format(record) bits = s.split(" ", maxsplit=1) bits[0] = bright_black(bits[0]) if record.levelno == CRITICAL: @@ -121,13 +121,13 @@ def formatTime(self, record, datefmt=None): # noqa: N802 return f"{t}.{ms:03d}" -class Watcher(object): +class Watcher: """Log watcher for monitoring driver and protocol activity.""" handlers = {} def __init__(self, logger_name): - super(Watcher, self).__init__() + super().__init__() self.logger_name = logger_name self.logger = getLogger(self.logger_name) self.formatter = ColourFormatter("%(asctime)s %(message)s") diff --git a/boltstub/wiring.py b/boltstub/wiring.py index 723f44cb..10c6ca9c 100644 --- a/boltstub/wiring.py +++ b/boltstub/wiring.py @@ -229,7 +229,7 @@ def send(self, payload) -> int: sendall = send -class Wire(object): +class Wire: """Buffered socket wrapper for reading and writing bytes.""" _closed = False From 449fdfdfec68f83851b8937a00f9d23b724f0c4a Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 9 Feb 2024 14:16:13 +0100 Subject: [PATCH 2/3] IPv6 test --- boltstub/__init__.py | 27 ++++++--- boltstub/__main__.py | 6 +- boltstub/addressing.py | 4 +- main.py | 2 +- tests/shared.py | 25 ++++++--- tests/stub/routing/_routing.py | 44 +++++++++------ tests/stub/routing/test_routing_v3.py | 6 +- tests/stub/routing/test_routing_v4x1.py | 6 +- tests/stub/routing/test_routing_v4x3.py | 3 + tests/stub/routing/test_routing_v4x4.py | 3 + tests/stub/routing/test_routing_v5x0.py | 73 ++++++++++++++++++++++--- tests/stub/shared.py | 17 ++++-- 12 files changed, 164 insertions(+), 52 deletions(-) diff --git a/boltstub/__init__.py b/boltstub/__init__.py index 3d1a0d9b..82ce64af 100644 --- a/boltstub/__init__.py +++ b/boltstub/__init__.py @@ -16,6 +16,7 @@ # limitations under the License. +import socket import time import traceback from copy import deepcopy @@ -25,7 +26,6 @@ TCPServer, ThreadingMixIn, ) -from sys import stdout from threading import ( Lock, Thread, @@ -56,18 +56,27 @@ class BoltStubServer(TCPServer): timed_out = False - def __init__(self, *args, **kwargs): + _ipv6: bool + + def __init__(self, *args, ipv6=False, **kwargs) -> None: + self._ipv6 = ipv6 + if self._ipv6: + self.address_family = socket.AF_INET6 super().__init__(*args, **kwargs) def handle_timeout(self): self.timed_out = True - def server_activate(self): + def server_bind(self) -> None: + if self._ipv6: + self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) + super().server_bind() + + def server_activate(self) -> None: super().server_activate() # Must be here, testkit waits for something to be written on stdout to # know when the server is listening. - print("Listening") - stdout.flush() + print("Listening", flush=True) class ThreadedBoltStubServer(ThreadingMixIn, BoltStubServer): @@ -86,13 +95,14 @@ class BoltStubService: def load(cls, *script_filenames, **kwargs): return cls(*map(parse_file, script_filenames), **kwargs) - def __init__(self, script: Script, listen_addr=None, timeout=None): + def __init__(self, script: Script, listen_addr=None, timeout=None, + ipv6=False): if listen_addr: listen_addr = Address.parse(listen_addr) else: listen_addr = Address(("localhost", self.default_base_port)) self.host = listen_addr.host - self.address = Address((listen_addr.host, listen_addr.port_number)) + self.address = Address(listen_addr) self.script = script self.exceptions = [] self.actors = [] @@ -153,7 +163,8 @@ def finish(self): server_cls = ThreadedBoltStubServer else: server_cls = BoltStubServer - self.server = server_cls(self.address, BoltStubRequestHandler) + self.server = server_cls(self.address, BoltStubRequestHandler, + ipv6=ipv6) self.server.timeout = timeout or self.default_timeout def start(self): diff --git a/boltstub/__main__.py b/boltstub/__main__.py index 2ba4a7de..796c3221 100644 --- a/boltstub/__main__.py +++ b/boltstub/__main__.py @@ -95,6 +95,10 @@ def _main(): "-v", "--verbose", action="store_true", help="Show more detail about the client-server exchange." ) + parser.add_argument( + "-6", "--ipv6", action="store_true", + help="Force the server to only listen on IPv6 interfaces." + ) parser.add_argument("script", nargs="+") parsed = parser.parse_args() @@ -103,7 +107,7 @@ def _main(): scripts = map(parse_file, parsed.script) service = BoltStubService(*scripts, listen_addr=parsed.listen_addr, - timeout=parsed.timeout) + timeout=parsed.timeout, ipv6=parsed.ipv6) try: service.start() diff --git a/boltstub/addressing.py b/boltstub/addressing.py index 178fa8bb..cbb6a1ce 100644 --- a/boltstub/addressing.py +++ b/boltstub/addressing.py @@ -34,13 +34,13 @@ def parse(cls, s, default_host=None, default_port=None): # IPv6 host, _, port = s[1:].rpartition("]") return cls((host or default_host or "localhost", - port.lstrip(":") or default_port or 0, + int(port.lstrip(":")) or default_port or 0, 0, 0)) else: # IPv4 host, _, port = s.partition(":") return cls((host or default_host or "localhost", - port or default_port or 0)) + int(port) or default_port or 0)) else: raise TypeError("Address.parse requires a string argument") diff --git a/main.py b/main.py index de8edf07..49e2917e 100644 --- a/main.py +++ b/main.py @@ -322,7 +322,7 @@ def _exit(): # address to be able to start services on the network that the driver # connects to (stub server and TLS server). for network in networks: - cmd = ["docker", "network", "create", network] + cmd = ["docker", "network", "create", "--ipv6", network] print(cmd) subprocess.run(cmd) diff --git a/tests/shared.py b/tests/shared.py index 9adf831c..bcb5da9b 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -61,21 +61,32 @@ def pick_address(adapter_): return ips -def dns_resolve(host_name): - _, _, ip_addresses = socket.gethostbyname_ex(host_name) - return ip_addresses +def dns_resolve(host_name, ipv4=True, ipv6=False): + return list({ + address[0] + for family, _type, _proto, _canonname, address in socket.getaddrinfo( + host_name, None + ) + if ( + (family == socket.AF_INET and ipv4) + or (family == socket.AF_INET6 and ipv6) + ) + }) -def dns_resolve_single(host_name): - ips = dns_resolve(host_name) +def dns_resolve_single(host_name, ipv4=True, ipv6=False): + ips = dns_resolve(host_name, ipv4=ipv4, ipv6=ipv6) if len(ips) != 1: raise ValueError("%s resolved to %i instead of 1 IP address" % (host_name, len(ips))) return ips[0] -def get_dns_resolved_server_address(server): - return "%s:%i" % (dns_resolve_single(server.host), server.port) +def get_dns_resolved_server_address(server, ipv4=True, ipv6=False): + host = dns_resolve_single(server.host, ipv4=ipv4, ipv6=ipv6) + if ":" in host: + return f"[{host}]:{server.port}" + return f"{host}:{server.port}" def driver_feature(*features): diff --git a/tests/stub/routing/_routing.py b/tests/stub/routing/_routing.py index fdefa744..c4f125cb 100644 --- a/tests/stub/routing/_routing.py +++ b/tests/stub/routing/_routing.py @@ -13,21 +13,7 @@ class RoutingBase(TestkitTestCase): def setUp(self): super().setUp() - self._routingServer1 = StubServer(9000) - self._routingServer2 = StubServer(9001) - self._routingServer3 = StubServer(9002) - self._readServer1 = StubServer(9010) - self._readServer2 = StubServer(9011) - self._readServer3 = StubServer(9012) - self._writeServer1 = StubServer(9020) - self._writeServer2 = StubServer(9021) - self._writeServer3 = StubServer(9022) - self._uri_template = "neo4j://%s:%d" - self._uri_template_with_context = \ - self._uri_template + "?region=china&policy=my_policy" - self._uri_with_context = self._uri_template_with_context % ( - self._routingServer1.host, self._routingServer1.port - ) + self.set_up_servers() self._auth = types.AuthorizationToken( "basic", principal="p", credentials="c" ) @@ -45,6 +31,26 @@ def tearDown(self): self._writeServer3.reset() super().tearDown() + def set_up_servers(self, ipv6=False): + self._routingServer1 = StubServer(9000, ipv6=ipv6) + self._routingServer2 = StubServer(9001, ipv6=ipv6) + self._routingServer3 = StubServer(9002, ipv6=ipv6) + self._readServer1 = StubServer(9010, ipv6=ipv6) + self._readServer2 = StubServer(9011, ipv6=ipv6) + self._readServer3 = StubServer(9012, ipv6=ipv6) + self._writeServer1 = StubServer(9020, ipv6=ipv6) + self._writeServer2 = StubServer(9021, ipv6=ipv6) + self._writeServer3 = StubServer(9022, ipv6=ipv6) + self._set_up_uris() + + def _set_up_uris(self): + self._uri_template = "neo4j://%s" + self._uri_template_with_context = \ + self._uri_template + "?region=china&policy=my_policy" + self._uri_with_context = self._uri_template_with_context % ( + self._routingServer1.address + ) + @property @abstractmethod def bolt_version(self): @@ -60,9 +66,15 @@ def server_agent(self): def adb(self): pass - def get_vars(self, host=None): + def host_in_address(self, host=None): if host is None: host = self._routingServer1.host + if ":" in host: + host = f"[{host}]" + return host + + def get_vars(self, host=None): + host = self.host_in_address(host) v = { "#VERSION#": self.bolt_version, "#HOST#": host, diff --git a/tests/stub/routing/test_routing_v3.py b/tests/stub/routing/test_routing_v3.py index 06209c17..b56c97c6 100644 --- a/tests/stub/routing/test_routing_v3.py +++ b/tests/stub/routing/test_routing_v3.py @@ -11,8 +11,7 @@ class RoutingV3(RoutingV4x4): server_agent = "Neo4j/3.5.0" def get_vars(self, host=None): - if host is None: - host = self._routingServer1.host + host = self.host_in_address(host) v = { "#VERSION#": self.bolt_version, "#HOST#": host, @@ -82,3 +81,6 @@ def test_should_fail_on_empty_routing_response(self): def test_should_drop_connections_failing_liveness_check(self): super().test_should_drop_connections_failing_liveness_check() + + def test_ipv6_read(self): + super().test_ipv6_read() diff --git a/tests/stub/routing/test_routing_v4x1.py b/tests/stub/routing/test_routing_v4x1.py index 83a3e577..ef4711c6 100644 --- a/tests/stub/routing/test_routing_v4x1.py +++ b/tests/stub/routing/test_routing_v4x1.py @@ -12,8 +12,7 @@ class RoutingV4x1(RoutingV4x4): server_agent = "Neo4j/4.1.0" def get_vars(self, host=None): - if host is None: - host = self._routingServer1.host + host = self.host_in_address(host) v = { "#VERSION#": self.bolt_version, "#HOST#": host, @@ -65,3 +64,6 @@ def test_should_pass_system_bookmark_when_getting_rt_for_multi_db(self): def test_should_ignore_system_bookmark_when_getting_rt_for_multi_db(self): pass + + def test_ipv6_read(self): + super().test_ipv6_read() diff --git a/tests/stub/routing/test_routing_v4x3.py b/tests/stub/routing/test_routing_v4x3.py index cdf30459..f40a0cff 100644 --- a/tests/stub/routing/test_routing_v4x3.py +++ b/tests/stub/routing/test_routing_v4x3.py @@ -250,3 +250,6 @@ def test_should_successfully_get_server_agent(self): def test_should_fail_when_driver_closed_using_session_run(self): super().test_should_fail_when_driver_closed_using_session_run() + + def test_ipv6_read(self): + super().test_ipv6_read() diff --git a/tests/stub/routing/test_routing_v4x4.py b/tests/stub/routing/test_routing_v4x4.py index fb2ac325..8c78ba38 100644 --- a/tests/stub/routing/test_routing_v4x4.py +++ b/tests/stub/routing/test_routing_v4x4.py @@ -247,3 +247,6 @@ def test_should_successfully_get_server_agent(self): def test_should_fail_when_driver_closed_using_session_run(self): super().test_should_fail_when_driver_closed_using_session_run() + + def test_ipv6_read(self): + super().test_ipv6_read() diff --git a/tests/stub/routing/test_routing_v5x0.py b/tests/stub/routing/test_routing_v5x0.py index b628a7e5..ffc4bdc8 100644 --- a/tests/stub/routing/test_routing_v5x0.py +++ b/tests/stub/routing/test_routing_v5x0.py @@ -50,13 +50,12 @@ def test_should_successfully_get_routing_table(self): self._routingServer1.done() rt = driver.get_routing_table(self.adb) driver.close() + host = self.host_in_address(vars_["#HOST#"]) assert rt.database == self.adb assert rt.ttl == 1000 - assert rt.routers == [vars_["#HOST#"] + ":9000"] - assert sorted(rt.readers) == [vars_["#HOST#"] + ":9010", - vars_["#HOST#"] + ":9011"] - assert sorted(rt.writers) == [vars_["#HOST#"] + ":9020", - vars_["#HOST#"] + ":9021"] + assert rt.routers == [f"{host}:9000"] + assert sorted(rt.readers) == [f"{host}:9010", f"{host}:9011"] + assert sorted(rt.writers) == [f"{host}:9020", f"{host}:9021"] # Checks that routing is used to connect to correct server and that # parameters for session run is passed on to the target server @@ -1667,10 +1666,10 @@ def test_should_successfully_read_from_readable_router_using_tx_function( # routing table. Since this test is not for testing DNS resolution, # it has been switched to IP-based address model. ip_address = get_ip_addresses()[0] + address = f"{ip_address:s}:{self._routingServer1.port:d}" driver = Driver( self._backend, - self._uri_template_with_context % (ip_address, - self._routingServer1.port), + self._uri_template_with_context % address, self._auth, self._userAgent ) @@ -1705,9 +1704,10 @@ def test_should_send_empty_hello(self): # routing table. Since this test is not for testing DNS resolution, # it has been switched to IP-based address model. ip_address = get_ip_addresses()[0] + address = f"{ip_address:s}:{self._routingServer1.port:d}" driver = Driver( self._backend, - self._uri_template % (ip_address, self._routingServer1.port), + self._uri_template % address, self._auth, self._userAgent ) @@ -3054,3 +3054,60 @@ def write(tx): self.assertNotEqual(read_summary.server_info.address, write_summary.server_info.address) + + def test_ipv6_read(self): + # *IPv6* + # So... *clears throat* + # You'd think after over a decade, IPv6 would be nice and smooth to + # use. Na-ah! Docker's IPv6 support is still experimental and only + # available on Linux hosts (see also + # https://github.com/docker/roadmap/issues/282). + # To enable it, have a look at + # https://docs.docker.com/config/daemon/ipv6/ + # + # *TL;DR:* + # you need to edit the docker daemon config and restart the docker + # service. Sample config (read the docs, and adjust as needed) + # `/etc/docker/daemon.json`: + # ```json + # { + # "ipv6": true, + # "fixed-cidr-v6": "fd12:0ac8:1::/64", + # "experimental": true, + # "ip6tables": true, + # "default-address-pools": [ + # { "base": "172.17.0.0/16", "size": 16 }, + # { "base": "172.18.0.0/16", "size": 16 }, + # { "base": "172.19.0.0/16", "size": 16 }, + # { "base": "172.20.0.0/14", "size": 16 }, + # { "base": "172.24.0.0/14", "size": 16 }, + # { "base": "172.28.0.0/14", "size": 16 }, + # { "base": "192.168.0.0/16", "size": 20 }, + # { "base": "fd12:0ac8::/104", "size": 112 } + # ] + # } + # ``` + # ```bash + # sudo systemctl restart docker + # ``` + self.set_up_servers(ipv6=True) + driver = Driver(self._backend, self._uri_with_context, self._auth, + self._userAgent) + self.start_server(self._routingServer1, "router_adb.script") + self.start_server(self._readServer1, "reader.script") + + session = driver.session("r", database=self.adb) + result = session.run("RETURN 1 as n") + sequence = self.collect_records(result) + summary = result.consume() + session.close() + driver.close() + + target_address = get_dns_resolved_server_address( + self._readServer1, ipv4=False, ipv6=True + ) + self._routingServer1.done() + self._readServer1.done() + self.assertTrue(summary.server_info.address in + [target_address, self._readServer1.address]) + self.assertEqual([1], sequence) diff --git a/tests/stub/shared.py b/tests/stub/shared.py index 9e0ad824..844e625a 100644 --- a/tests/stub/shared.py +++ b/tests/stub/shared.py @@ -48,10 +48,15 @@ def _poll_pipe(pipe, queue): class StubServer: - def __init__(self, port): - self.host = os.environ.get("TEST_STUB_HOST", "127.0.0.1") - self.address = "%s:%d" % (self.host, port) + def __init__(self, port, ipv6=False): + self.host = os.environ.get("TEST_STUB_HOST", + "::1" if ipv6 else "127.0.0.1") + if ":" in self.host: + self.address = f"[{self.host}]:{port}" + else: + self.address = f"{self.host}:{port}" self.port = port + self.ipv6 = ipv6 self._process = None self._stdout_buffer = Queue() self._stdout_lines = [] @@ -93,10 +98,12 @@ def start(self, path=None, script=None, vars_=None): os.fsync(f) self._script_path = path + listen_addr = f"{'[::]' if self.ipv6 else '0.0.0.0'}:{self.port}" + extra_options = ["-6"] if self.ipv6 else [] self._process = subprocess.Popen( [ - sys.executable, "-m", "boltstub", "-l", - "0.0.0.0:%d" % self.port, "-v", path + sys.executable, "-m", "boltstub", "-l", listen_addr, "-v", + *extra_options, path ], **POPEN_EXTRA_KWARGS, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, From c17d2f1f65848ee5d1254a1e7e1c01cc61e9049f Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Wed, 14 Feb 2024 15:26:55 +0100 Subject: [PATCH 3/3] Enable IPv6 in integration tests --- tests/neo4j/shared.py | 7 ++++--- tests/neo4j/test_summary.py | 17 ++++++++++++----- tests/shared.py | 10 +++++++--- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/tests/neo4j/shared.py b/tests/neo4j/shared.py index be4d7a91..83960fbe 100644 --- a/tests/neo4j/shared.py +++ b/tests/neo4j/shared.py @@ -25,7 +25,7 @@ from nutkit.frontend import Driver from nutkit.protocol import AuthorizationToken from tests.shared import ( - dns_resolve_single, + dns_resolve, Potential, TestkitTestCase, ) @@ -56,9 +56,10 @@ def get_neo4j_host_and_port(): return host, port -def get_neo4j_resolved_host_and_port(): +def get_neo4j_host_and_port_resolutions(): host, port = get_neo4j_host_and_port() - return dns_resolve_single(host), port + return [(resolved, port) + for resolved in dns_resolve(host, ipv4=True, ipv6=True)] def get_neo4j_host_and_http_port(): diff --git a/tests/neo4j/test_summary.py b/tests/neo4j/test_summary.py index 640b5026..aa4d52dd 100644 --- a/tests/neo4j/test_summary.py +++ b/tests/neo4j/test_summary.py @@ -5,12 +5,15 @@ cluster_unsafe_test, get_driver, get_neo4j_host_and_port, - get_neo4j_resolved_host_and_port, + get_neo4j_host_and_port_resolutions, get_server_info, QueryBuilder, requires_multi_db_support, ) -from tests.shared import TestkitTestCase +from tests.shared import ( + format_address, + TestkitTestCase, +) class TestSummary(TestkitTestCase): @@ -120,9 +123,13 @@ def test_agent_string(self): @cluster_unsafe_test # routing can lead us to another server (address) def test_address(self): summary = self.get_summary("RETURN 1 AS number") - self.assertTrue(summary.server_info.address in - ["%s:%s" % get_neo4j_resolved_host_and_port(), - "%s:%s" % get_neo4j_host_and_port()]) + + expected = [ + format_address(host, port) + for (host, port) in + (*get_neo4j_host_and_port_resolutions(), get_neo4j_host_and_port()) + ] + self.assertTrue(summary.server_info.address in expected) def _assert_counters(self, summary, nodes_created=0, nodes_deleted=0, relationships_created=0, relationships_deleted=0, diff --git a/tests/shared.py b/tests/shared.py index bcb5da9b..fdcfc6f4 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -82,11 +82,15 @@ def dns_resolve_single(host_name, ipv4=True, ipv6=False): return ips[0] +def format_address(host, port): + if ":" in host: + return f"[{host}]:{port}" + return f"{host}:{port}" + + def get_dns_resolved_server_address(server, ipv4=True, ipv6=False): host = dns_resolve_single(server.host, ipv4=ipv4, ipv6=ipv6) - if ":" in host: - return f"[{host}]:{server.port}" - return f"{host}:{server.port}" + return format_address(host, server.port) def driver_feature(*features):