As noted by @wence- in rapidsai/cudf#21382 (comment), cudf-polars' use of asyncio.to_thread ends up offloading blocking function calls to a thread from a new ThreadPoolExecutor created by asyncio.
We'd (maybe?) prefer that those functions run on the py_executor passed into run_streaming_pipeline. I haven't thought through all the consequences of this, but it seems reasonable.
Because rapidsmpf creates and controls the event loop, we should be able to get a running loop in the runner closure called by asynico.run around
, and call
loop.set_default_executor. Then calls to
asyncio.to_thread will run on
py_executor.
As noted by @wence- in rapidsai/cudf#21382 (comment), cudf-polars' use of
asyncio.to_threadends up offloading blocking function calls to a thread from a newThreadPoolExecutorcreated by asyncio.We'd (maybe?) prefer that those functions run on the
py_executorpassed intorun_streaming_pipeline. I haven't thought through all the consequences of this, but it seems reasonable.Because rapidsmpf creates and controls the event loop, we should be able to get a running loop in the
runnerclosure called byasynico.runaroundrapidsmpf/python/rapidsmpf/rapidsmpf/streaming/core/node.pyx
Line 254 in 42eca8b
loop.set_default_executor. Then calls toasyncio.to_threadwill run onpy_executor.