Skip to content

Fix async driver waiting for OS on closing half-open socket#1311

Merged
robsdedude merged 9 commits into
neo4j:6.xfrom
robsdedude:fix/async-close-blocking
Jun 5, 2026
Merged

Fix async driver waiting for OS on closing half-open socket#1311
robsdedude merged 9 commits into
neo4j:6.xfrom
robsdedude:fix/async-close-blocking

Conversation

@robsdedude
Copy link
Copy Markdown
Member

@robsdedude robsdedude commented Jun 2, 2026

This change aligns the async driver better with the sync driver. When a socket is closed, we don't want to wait until the OS acknowledges that the channel has been fully torn down. It's sufficient to initiate the closure. The rest we'll leave to the OS's TCP stack (and potentially other intermediates such as TLS wrappers) or the async runtime to figure out in the background.

Closes: #1309
Part of: DRIVERS-404

Reproducer

Assumes Ubuntu/Debian base.

  1. Install dependencies apt-get install iptables build-essential python3-venv python3-dev libnetfilter-queue-dev
  2. Start neo4j in docker, enable SSL for bolt, expose port 7687.
  3. Create filter.py:
    # requires
    # NetfilterQueue==1.1.0
    # scapy==2.7.0
    
    import signal
    import sys
    from netfilterqueue import NetfilterQueue
    from scapy.layers.inet import IP, TCP
    
    SERVER_IPS = {
        "127.0.0.1",
        "172.24.0.2",  # ADJUST: neo4j docker container IP
    }
    SERVER_PORT = 7687
    
    seen_syn_conns: set = set()  # client ports whose SYN was observed
    
    
    def conn_key(ip_pkt, tcp_pkt):
        if ip_pkt.dport == SERVER_PORT:
            return ip_pkt.sport
        return ip_pkt.dport
    
    
    def handle(pkt):
        raw = IP(pkt.get_payload())
        if not raw.haslayer(TCP):
            pkt.accept()
            return
    
        ip  = raw[IP]
        tcp = raw[TCP]
        key = conn_key(ip, tcp)
    
        if not (
            (ip.dst in SERVER_IPS or ip.src in SERVER_IPS)
            and (ip.dport == SERVER_PORT or ip.sport == SERVER_PORT)
        ):
            print(f"{raw} (unrelated)")
            pkt.accept()
            return
    
        print(f"{raw}")
    
        # Register the connection on the client's initial SYN.
        if "S" in tcp.flags:
            print(f"    [  syn] {key}")
            seen_syn_conns.add(key)
            pkt.accept()
            return
    
        # Drop any packet whose connection was never opened through us.
        if key not in seen_syn_conns:
            print(f"    [ drop] no SYN seen  {key}")
            pkt.drop()
            return
    
        pkt.accept()
    
    
    def shutdown(sig, _frame):
        print("\nShutting down.")
        sys.exit(0)
    
    
    signal.signal(signal.SIGINT,  shutdown)
    signal.signal(signal.SIGTERM, shutdown)
    
    nfq = NetfilterQueue()
    nfq.bind(1, handle)
    print("Running — Ctrl-C to stop.")
    try:
        nfq.run()
    finally:
        nfq.unbind()
  4. Create venv for filter.py filter rule
    1. python -m venv .venv
    2. source .venv/bin/activate
    3. pip install NetfilterQueue==1.1.0 scapy==2.7.0
  5. Start sudo .venv/bin/python filter.py
  6. Inside another python environment with the driver and its dependencies available, run the following reproducer:
    import sys
    import time
    import subprocess
    
    import asyncio
    from neo4j import AsyncGraphDatabase, GraphDatabase
    from neo4j.debug import watch
    
    watch("neo4j")
    
    URI = "bolt+ssc://localhost:7687"
    AUTH = ("neo4j", "pass")
    
    SPORT_FILTER = ("--sport", "7687")
    DPORT_FILTER = ("--dport", "7687")
    
    
    def _iptables_rule(port_filter):
        return (
            "INPUT", "-p", "tcp", *port_filter, "-j", "NFQUEUE", "--queue-num", "1"
        )
    
    
    def _clear_iptables():
        for port_filter in (SPORT_FILTER, DPORT_FILTER):
            subprocess.run(
                ["sudo", "iptables", "-D", *_iptables_rule(port_filter)],
                stderr=sys.stderr,
                stdout=sys.stdout,
            )
    
    
    def _setup_iptables():
        for port_filter in (SPORT_FILTER, DPORT_FILTER):
            subprocess.run(
                ["sudo", "iptables", "-I", *_iptables_rule(port_filter)],
                stderr=sys.stderr,
                stdout=sys.stdout,
                check=True,
            )
    
    
    async def main():
        _clear_iptables()
    
        async with AsyncGraphDatabase.driver(
            URI,
            auth=AUTH,
            max_connection_pool_size=1,
            connection_acquisition_timeout=3,
            liveness_check_timeout=1,
            max_connection_lifetime=5,
        ) as driver:
            t0 = time.time()
    
            try:
                async with driver.session() as session:
                    await (await session.run("RETURN 1 AS n")).to_eager_result()
            finally:
                print(f"\n  >>>>> took: {time.time() - t0:.2f} seconds\n")
    
            _setup_iptables()
            await asyncio.sleep(6)  # exceed connection lifetime
    
            t0 = time.time()
    
            try:
                async with driver.session() as session:
                    await (await session.run("RETURN 1 AS n")).to_eager_result()
            finally:
                print(f"\n  >>>>> took: {time.time() - t0:.2f} seconds\n")
    
    
    def main_sync():
        _clear_iptables()
    
        with GraphDatabase.driver(
            URI,
            auth=AUTH,
            max_connection_pool_size=1,
            connection_acquisition_timeout=3,
            liveness_check_timeout=1,
            max_connection_lifetime=5,
        ) as driver:
            t0 = time.time()
    
            try:
                with driver.session() as session:
                    session.run("RETURN 1 AS n").to_eager_result()
            finally:
                print(f"\n  >>>>> took: {time.time() - t0:.2f} seconds\n")
    
            _setup_iptables()
            time.sleep(1)  # exceed liveness check timeout
    
            t0 = time.time()
    
            try:
                with driver.session() as session:
                    session.run("RETURN 1 AS n").to_eager_result()
            finally:
                print(f"\n  >>>>> took: {time.time() - t0:.2f} seconds\n")
    
    
    if __name__ == "__main__":
        try:
            asyncio.run(main())
            # main_sync()
        finally:
            _clear_iptables()
            print(" ====== DONE ====== ")

This change aligns the async driver better with the sync driver.
When a socket is closed, we don't want to wait until the OS acknowledges that
the channel has been fully torn down. It's sufficient to flush all our data then
tell the OS to close both ends of the channel. The rest we'll leave to the OS's
TCP stack (and potentially other intermediates such as TLS wrappers) to figure
out.

Further, should the flushing respect any write timeouts.
@robsdedude robsdedude marked this pull request as ready for review June 2, 2026 10:27
@robsdedude robsdedude requested review from MaxAake and Copilot and removed request for MaxAake June 2, 2026 10:27

This comment was marked as resolved.

Swallow timeout or OS errors after we've instructed asyncio to shutdown the
channel.
@robsdedude robsdedude marked this pull request as draft June 2, 2026 11:16
@robsdedude robsdedude requested a review from Copilot June 2, 2026 11:16

This comment was marked as resolved.

This comment was marked as low quality.

@robsdedude robsdedude marked this pull request as ready for review June 2, 2026 13:08
@robsdedude robsdedude requested a review from MaxAake June 2, 2026 13:08
Closing the socket should always be on a best-effort basis and never block.
The driver should leave it to the OS or async loop implementation to clean up
the resources in the background as needed.
Copy link
Copy Markdown
Contributor

@MaxAake MaxAake left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robsdedude robsdedude changed the title Fix async driver waiting for OS on closing half-open socket. Fix async driver waiting for OS on closing half-open socket Jun 5, 2026
@robsdedude robsdedude merged commit 8f2b069 into neo4j:6.x Jun 5, 2026
29 checks passed
@robsdedude robsdedude deleted the fix/async-close-blocking branch June 5, 2026 13:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async pool: acquire blocks far past connection_acquisition_timeout on unbounded teardown of a discarded half-open connection

3 participants