I noticed some latency in a unit test on a single machine. It turned into a bit of a rabbit hole that took some time to get to the bottom of. I think I at least understand some libraries and tools better as a result though!
I noticed a weird thing with a unit test recently. It was obvious because the usual feel of the tests is this:
$ ./venv/bin/python -m unittest -q test/test_websocket.py
----------------------------------------------------------------------
Ran 1 test in 0.025s
And I was instead facing this:
$ ./venv/bin/python -m unittest test/test_websocket.py
.
----------------------------------------------------------------------
Ran 1 test in 11.930s
My real test actually does some amount of work but I kept paring it back further and further to try to narrow down where the problem lay. The most minimal case I've come up with looks like this:
import unittest
class TestWebsocket(unittest.TestCase):
def test_two_clients(self):
client = fastapi.testclient.TestClient(app)
with (client.websocket_connect("/"),
client.websocket_connect("/")):
pass
The application code was similarly pared back until it was a nearly empty websocket endpoint that did no useful work:
import fastapi
import fastapi.testclient
import websockets.exceptions
app = fastapi.FastAPI()
@app.websocket("/")
async def websocket(websocket: fastapi.WebSocket):
await websocket.accept()
try:
while True:
message = await websocket.receive_json()
except (fastapi.WebSocketDisconnect, websockets.exceptions.ConnectionClosed) as e:
pass
The tests are executed on the same version of Python (3.12.2), on the same operating system (Fedora 39), on the same architecture (x86_64). The problem machine has 8 cores and is using 6.7.10-200.fc39.x86_64 and the non-problem machine has 16 cores and is using kernel 6.6.8-200.fc39.x86_64. In an effort to narrow down what was happening during all those slow runs I ran the program under strace to capture system calls being made. strace is notorious for slowing down programs, even the man page says so:
BUGS Programs that use the setuid bit do not have effective user ID privi‐ leges while being traced. A traced process runs slowly (but check out the --seccomp-bpf option). Unless --kill-on-exit option is used (or --seccomp-bpf option is used in a way that implies --kill-on-exit), traced processes which are de‐ scended from command may be left running after an interrupt signal (CTRL-C).
It is a fun quirk then that under strace my test sometimes runs faster!
$ strace -c ./venv/bin/python -m unittest test/test_websocket.py
.
----------------------------------------------------------------------
Ran 1 test in 0.852s
OK
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
97.06 0.161901 179 903 223 futex
1.10 0.001838 0 3737 231 newfstatat
0.46 0.000768 1 749 54 openat
0.39 0.000656 0 1195 read
...
There isn't always such a discrepancy under strace though and I was able to capture several runs that are representatively slow. Interestingly even in the fast case the biggest cause of slowness remains the same. Here is a full strace output for a slow run:
$ strace -c ./venv/bin/python -m unittest test/test_websocket.py
.
----------------------------------------------------------------------
Ran 1 test in 3.853s
OK
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
99.52 0.741645 1093 678 128 futex
0.19 0.001453 0 3737 231 newfstatat
0.07 0.000518 0 749 54 openat
0.06 0.000469 0 1195 read
0.03 0.000240 0 1096 3 lseek
0.03 0.000207 0 702 close
0.02 0.000149 0 242 mmap
0.02 0.000143 0 158 getdents64
0.02 0.000131 0 582 558 ioctl
0.01 0.000090 1 67 rt_sigaction
0.01 0.000042 0 151 brk
0.01 0.000040 0 52 mprotect
0.00 0.000019 9 2 clone3
0.00 0.000015 1 10 getrandom
0.00 0.000012 2 6 write
0.00 0.000012 0 13 getpid
0.00 0.000012 1 10 epoll_wait
0.00 0.000006 1 5 rt_sigprocmask
0.00 0.000005 1 3 dup
0.00 0.000003 1 2 epoll_ctl
0.00 0.000002 2 1 fcntl
0.00 0.000001 0 2 getsockname
0.00 0.000001 1 1 socketpair
0.00 0.000000 0 13 munmap
0.00 0.000000 0 2 pread64
0.00 0.000000 0 1 1 access
0.00 0.000000 0 6 sendto
0.00 0.000000 0 1 execve
0.00 0.000000 0 1 uname
0.00 0.000000 0 3 getcwd
0.00 0.000000 0 3 1 readlink
0.00 0.000000 0 1 sysinfo
0.00 0.000000 0 2 1 arch_prctl
0.00 0.000000 0 1 gettid
0.00 0.000000 0 1 set_tid_address
0.00 0.000000 0 1 set_robust_list
0.00 0.000000 0 2 epoll_create1
0.00 0.000000 0 2 prlimit64
0.00 0.000000 0 1 rseq
------ ----------- ----------- --------- --------- ----------------
100.00 0.745215 78 9505 977 total
futex
is, of course:
futex(2) System Calls Manual futex(2) NAME futex - fast user-space locking DESCRIPTION The futex() system call provides a method for waiting until a certain condition becomes true. It is typically used as a blocking construct in the context of shared-memory synchronization. When using futexes, the majority of the synchronization operations are performed in user space.
During the test run the CPU usage for the program hits over 100%
in top
; Starting to suspect some scheduling problem or
partial deadlock of some sort I tried pinning the process to use a
single CPU with taskset
:
$ taskset -c 0 ./venv/bin/python -m unittest test/test_websocket.py
.
----------------------------------------------------------------------
Ran 1 test in 0.006s
And immediately the problem cleared up. Very interesting.
Thinking back to that time I was able
to debug emacs by setting a
breakpoint in some C code to gather a traceback into elisp code I
went rummaging around the standard library to see if I might find
something similar. What I have turned up so far
is the
faulthandler module, specifically
the dump_traceback_later
method. It allows for a traceback to be dumped for all threads after
a configurable timeout, my test now looks like this:
import faulthandler
import unittest
class TestWebsocket(unittest.TestCase):
def test_two_clients(self):
client = fastapi.testclient.TestClient(app)
faulthandler.dump_traceback_later(2)
with (client.websocket_connect("/"),
client.websocket_connect("/")):
pass
And executing it produces the following after the 2 second timeout:
$ ./venv/bin/python -m unittest test/test_websocket.py
Timeout (0:00:02)!
Thread 0x00007f77e26006c0 (most recent call first):
File "/usr/lib64/python3.12/selectors.py", line 468 in select
File "/usr/lib64/python3.12/asyncio/base_events.py", line 1947 in _run_once
File "/usr/lib64/python3.12/asyncio/base_events.py", line 639 in run_forever
File "/usr/lib64/python3.12/asyncio/base_events.py", line 672 in run_until_complete
File "/usr/lib64/python3.12/asyncio/runners.py", line 118 in run
File "/home/nprescott/project/venv/lib64/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2001 in run
File "/home/nprescott/project/venv/lib64/python3.12/site-packages/anyio/_core/_eventloop.py", line 73 in run
File "/usr/lib64/python3.12/concurrent/futures/thread.py", line 58 in run
File "/usr/lib64/python3.12/concurrent/futures/thread.py", line 92 in _worker
File "/usr/lib64/python3.12/threading.py", line 1010 in run
File "/usr/lib64/python3.12/threading.py", line 1073 in _bootstrap_inner
File "/usr/lib64/python3.12/threading.py", line 1030 in _bootstrap
Thread 0x00007f77e30006c0 (most recent call first):
File "/home/nprescott/project/venv/lib64/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2045 in sleep
Thread 0x00007f77f3af2740 (most recent call first):
File "/usr/lib64/python3.12/asyncio/selector_events.py", line 152 in _write_to_self
File "/usr/lib64/python3.12/asyncio/base_events.py", line 844 in call_soon_threadsafe
File "/home/nprescott/project/venv/lib64/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2205 in run_sync_from_thread
File "/home/nprescott/project/venv/lib64/python3.12/site-packages/anyio/_backends/_asyncio.py", line 896 in _spawn_task_from_thread
File "/home/nprescott/project/venv/lib64/python3.12/site-packages/anyio/from_thread.py", line 334 in start_task_soon
File "/home/nprescott/project/venv/lib64/python3.12/site-packages/starlette/testclient.py", line 101 in __enter__
File "/home/nprescott/project/test/test_websocket.py", line 37 in test_two_clients
File "/usr/lib64/python3.12/unittest/case.py", line 589 in _callTestMethod
File "/usr/lib64/python3.12/unittest/case.py", line 634 in run
File "/usr/lib64/python3.12/unittest/case.py", line 690 in __call__
File "/usr/lib64/python3.12/unittest/suite.py", line 122 in run
File "/usr/lib64/python3.12/unittest/suite.py", line 84 in __call__
File "/usr/lib64/python3.12/unittest/suite.py", line 122 in run
File "/usr/lib64/python3.12/unittest/suite.py", line 84 in __call__
File "/usr/lib64/python3.12/unittest/suite.py", line 122 in run
File "/usr/lib64/python3.12/unittest/suite.py", line 84 in __call__
File "/usr/lib64/python3.12/unittest/runner.py", line 240 in run
File "/usr/lib64/python3.12/unittest/main.py", line 281 in runTests
File "/usr/lib64/python3.12/unittest/main.py", line 105 in __init__
File "/usr/lib64/python3.12/unittest/__main__.py", line 18 in <module>
File "<frozen runpy>", line 88 in _run_code
File "<frozen runpy>", line 198 in _run_module_as_main
.
----------------------------------------------------------------------
Ran 1 test in 8.716s
What appears to be happening is user-error on my part. The FastAPI testclient is really a thin wrapper over the starlette library's testclient. That uses a context manager to launch a thread, running an event loop, which in this test is calling select to service events. I think the reason why I'm seeing it only on one computer has to do with the difference in processor speed between the different machines. The fact that the test client spawns a thread to emulate a server isn't too surprising (I was immediately reminded of a very similar construct I wrote about for an HTTP server). What has caught me off guard has been the interaction between the two asyncio event loops in this case. I'm at least pretty sure the event loops are to blame, I've been wending my way through all the layers of indirection with fastapi (which is really starlette) calling anyio calling asyncio and am a little dizzy.
In an effort to better confirm some of my ideas I figured I should
keep digging for some better tools. While the output of the
faulthandler isn't useless it lacks plenty of detail to wholly
explain what is going on when the problem occurs. I dug around some
more to
find information
on integrating gdb with my python program. A bit of a missing
piece though was how I was supposed to capture the moment when the
problem was occuring without an error happening. Rather than spend
too long trying to think of something smart I added a third test
client to the test so that test runs take 30+ seconds to
complete. While the test is running and I was pretty confident
nothing terribly useful was happening I sent a SIGQUIT
using ctrl-\
which causes a core dump.
With a core dump available it isn't too hard to then just pass it
along to gdb like: coredumpctl debug
What to say about gdb? It is hugely featureful and oftentimes cryptic to a layman such as myself. I mostly limited myself to:
bt
info threads
thread n
py-bt
But there is a lot available and it is pretty cool:
┌─/usr/src/debug/python3.12-3.12.2-2.fc39.x86_64/Python/condvar.h────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ 59 #define PyMUTEX_UNLOCK(mut) pthread_mutex_unlock(mut) │ │ 60 │ │ 61 #define PyCOND_INIT(cond) _PyThread_cond_init(cond) │ │ 62 #define PyCOND_FINI(cond) pthread_cond_destroy(cond) │ │ 63 #define PyCOND_SIGNAL(cond) pthread_cond_signal(cond) │ │ 64 #define PyCOND_BROADCAST(cond) pthread_cond_broadcast(cond) │ │ 65 #define PyCOND_WAIT(cond, mut) pthread_cond_wait((cond), (mut)) │ │ 66 │ │ 67 /* return 0 for success, 1 on timeout, -1 on error */ │ │ 68 Py_LOCAL_INLINE(int) │ │ 69 PyCOND_TIMEDWAIT(PyCOND_T *cond, PyMUTEX_T *mut, long long us) │ │ 70 { │ │ 71 struct timespec abs_timeout; │ │ 72 _PyThread_cond_after(us, &abs_timeout); │ │ > 73 int ret = pthread_cond_timedwait(cond, mut, &abs_timeout); │ │ 74 if (ret == ETIMEDOUT) { │ │ 75 return 1; │ │ 76 } │ │ 77 if (ret) { │ │ 78 return -1; │ │ 79 } │ │ 80 return 0; │ │ 81 } │ │ 82 │ │ 83 #elif defined(NT_THREADS) │ │ 84 /* │ │ 85 * Windows (XP, 2003 server and later, as well as (hopefully) CE) support │ │ 86 * │ │ 87 * Emulated condition variables ones that work with XP and later, plus │ │ 88 * example native support on VISTA and onwards. │ └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ multi-thre Thread 0x7f74894b37 In: take_gil L73 PC: 0x7f7488fe44c4 #6 Waiting for the GIL #14 Frame 0x7f74894c1da0, for file /usr/lib64/python3.12/asyncio/selector_events.py, line 152, in _write_to_self (self=<_UnixSelectorEventLoop(_timer_cancelled_count=0, _closed=False, _stopping=False, _ready=<collections.deque at remote 0 x7f7479053650>, _scheduled=[], _default_executor=None, _internal_fds=1, _thread_id=140138136995520, _clock_resolution=<float at remote 0x7f7479563460>, _exception_handler=None, _debug=False, slow_callback_duration=<float at remote 0x7f748 8ab5690>, _current_handle=None, _task_factory=None, _coroutine_origin_tracking_enabled=False, _coroutine_origin_tracking_saved_depth=None, _asyncgens=<WeakSet(data=set(), _remove=<function at remote 0x7f7478eb5850>, _pending_removals=[], _iterating=set()) at remote 0x7f7478ea1460>, _asyncgens_shutdown_called=False, _executor_shutdown_called=False, _selector=<EpollSelector(_fd_to_key={7: <SelectorKey at remote 0x7f7478e99a90>}, _map=<_SelectorMapping(_selector=<...>) at re mote 0x7f7478ea0dd0>, _selector=<select.epoll at remote 0x7f7479563280>) at ...(truncated) csock.send(b'\0') #14 Frame 0x7f74894c1d08, for file /usr/lib64/python3.12/asyncio/base_events.py, line 844, in call_soon_threadsafe (self=<_UnixSelectorEventLoop(_timer_cancelled_count=0, _closed=False, _stopping=False, _ready=<collections.deque at remote 0x7f7479053650>, _scheduled=[], _default_executor=None, _internal_fds=1, _thread_id=140138136995520, _clock_resolution=<float at remote 0x7f7479563460>, _exception_handler=None, _debug=False, slow_callback_duration=<float at remote 0x7f7 488ab5690>, _current_handle=None, _task_factory=None, _coroutine_origin_tracking_enabled=False, _coroutine_origin_tracking_saved_depth=None, _asyncgens=<WeakSet(data=set(), _remove=<function at remote 0x7f7478eb5850>, _pending_removals=[] , _iterating=set()) at remote 0x7f7478ea1460>, _asyncgens_shutdown_called=False, _executor_shutdown_called=False, _selector=<EpollSelector(_fd_to_key={7: <SelectorKey at remote 0x7f7478e99a90>}, _map=<_SelectorMapping(_selector=<...>) at remote 0x7f7478ea0dd0>, _selector=<select.epoll at remote 0x7f7479563280>) a...(truncated) self._write_to_self() #14 Frame 0x7f74894c1c60, for file /home/nprescott/project/new-venv/lib64/python3.12/site-packages/anyio/_backends/_asyncio.py, line 2205, in run_sync_from_thread (cls=<ABCMeta(__module__='anyio._backends._asyncio', run=<classmethod at re mote 0x7f7478ea0a50>, current_token=<classmethod at remote 0x7f7478ea0910>, current_time=<classmethod at remote 0x7f7478ea08c0>, cancelled_exception_class=<classmethod at remote 0x7f7478ea06e0>, checkpoint=<classmethod at remote 0x7f7478e a1a40>, checkpoint_if_cancelled=<classmethod at remote 0x7f7478ea1a90>, cancel_shielded_checkpoint=<classmethod at remote 0x7f7478ea1ae0>, sleep=<classmethod at remote 0x7f7478ea1b30>, create_cancel_scope=<classmethod at remote 0x7f7478ea --Type <RET> for more, q to quit, c to continue without paging--
I don't know that I've entirely satisfied my curiousity with this
though. I can see that the different threads are invoking
things
like call_soon_threadsafe
but I don't understand if they are not truly dead-locked why it
takes so long to progress. I have a hunch the different multitasking
patterns in use are fighting with each other behind the
scenes. While asyncio is supposed to achieve cooperative
multitasking, threads are independently managed by the
scheduler. Digging around through all the code in play I came up
with a bit of stream of consciousness:
TestClientTransport
which
invokes a portal factory which starts a thread running an
asyncio event loop
Upgrade
with a session
linked to the portal factory
start_task_soon
with self._run
which runs the app in a task group
_run
calls a nested function run_app
which invokes the starlette (FastAPI) app, passing it the
method _asgi_receive
as the parameter "receive", part of the interface or type of
an ASGIAPP
_asgi_receive
is defined like this:
async def _asgi_receive(self) -> Message:
while self._receive_queue.empty():
await anyio.sleep(0)
return self._receive_queue.get()
anyio
is an abstraction over asyncio to make it
pluggable/replaceable with something like trio, not relevant here
Suddenly things feel like they're clicking into place for me. A call
like asyncio.sleep(0)
is a very typical way to defer
execution back to the event loop so as to service any other pending
coroutines — you don't have to actually sleep you just have to
signal that you have finished some logical piece of work
(cooperating with other tasks explicitly). I suspect though that the
limited number of other tasks on this threaded event loop in the
thread is leading to a funky too-fast cycle through the event loop,
maybe causing it to spin not-quite-endlessly on the check
for self._receive_queue.empty()
which
is taking
a lock. I'm not exactly an expert in asyncio but I do happen to
know that asyncio.sleep(0)
is special-coded
distinctly from an actual sleep call. Based on some of what I
saw in gdb I think one of these operations is inappropriately
holding onto the GIL
for too long as a result of over-eagerly looping for events.
I can't think of an elegant way of quickly verifying this
idea so we'll have to settle for a hack. If it is the case that
yielding to the event loop is happening too fast, we can simply
introduce a little latency to stall it ourselves. The first thing
that comes to mind is to override the _asgi_receive
method on the WebSocketTestSession
class.
import unittest
import asyncio
import starlette
async def _asgi_receive(self):
while self._receive_queue.empty():
await asyncio.sleep(0.001)
return self._receive_queue.get()
starlette.testclient.WebSocketTestSession._asgi_receive = _asgi_receive
class TestWebsocket(unittest.TestCase):
def test_two_clients(self):
client = fastapi.testclient.TestClient(app)
with (client.websocket_connect('/'),
client.websocket_connect('/')):
pass
In principle this means the test should be slower because we've introduced an artificial sleep. What about in practice?
$ ./venv/bin/python -m unittest test/test_websocket.py
.
----------------------------------------------------------------------
Ran 1 test in 0.007s
Boom.
All that really remains are some questions that I don't think I can fully answer mostly having to do with the intent of some of these design choices. Obviously I don't think the websocket test client is really intended to be repeatedly invoked like this despite the fact that it usually seems to work. I'm not sure if that isn't designed into the test client or I just haven't found the right approach. I opened a discussion on the starlette repository, figuring if nothing else I might save someone some time if they go searching for the problem in the future.