Skip to content

Commit 3494396

Browse files
committed
gh-142837: Use semaphore accounting for multiprocessing.Queue.empty()
1 parent 645f5c4 commit 3494396

File tree

3 files changed

+50
-5
lines changed

3 files changed

+50
-5
lines changed

Doc/library/multiprocessing.rst

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -869,10 +869,13 @@ Note that one can also create a shared queue by using a manager object -- see
869869
bother you then you can instead use a queue created with a
870870
:ref:`manager <multiprocessing-managers>`.
871871

872-
(1) After putting an object on an empty queue there may be an
873-
infinitesimal delay before the queue's :meth:`~Queue.empty`
874-
method returns :const:`False` and :meth:`~Queue.get_nowait` can
875-
return without raising :exc:`queue.Empty`.
872+
(1) After putting an object on an empty queue there may be a delay
873+
before :meth:`~Queue.get_nowait` can return without raising
874+
:exc:`queue.Empty`, because the feeder thread flushes objects to
875+
the underlying pipe asynchronously. On platforms where
876+
``sem_getvalue()`` is not implemented (for example macOS), the
877+
queue's :meth:`~Queue.empty` method may also remain :const:`True`
878+
during this delay.
876879

877880
(2) If multiple processes are enqueuing objects, it is possible for
878881
the objects to be received at the other end out-of-order.
@@ -947,8 +950,17 @@ For an example of the usage of queues for interprocess communication see
947950
Return ``True`` if the queue is empty, ``False`` otherwise. Because of
948951
multithreading/multiprocessing semantics, this is not reliable.
949952

953+
On platforms where ``sem_getvalue()`` is implemented, this method
954+
uses the same approximate size accounting as :meth:`~Queue.qsize`.
955+
Otherwise, it may report ``True`` while items are still buffered and
956+
waiting to be flushed to the underlying pipe.
957+
950958
May raise an :exc:`OSError` on closed queues. (not guaranteed)
951959

960+
.. versionchanged:: 3.15
961+
On platforms where ``sem_getvalue()`` is implemented, this method
962+
now uses semaphore-based queue size accounting.
963+
952964
.. method:: full()
953965

954966
Return ``True`` if the queue is full, ``False`` otherwise. Because of

Lib/multiprocessing/queues.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,17 @@ def qsize(self):
124124
return self._maxsize - self._sem.get_value()
125125

126126
def empty(self):
127-
return not self._poll()
127+
# Preserve the historical "closed queue may raise OSError" behavior.
128+
# q.close() is a no-op for unused queues, so this only raises once the
129+
# reader end has actually been closed.
130+
if self._closed:
131+
self._poll()
132+
133+
try:
134+
return self._sem.get_value() == self._maxsize
135+
except NotImplementedError:
136+
# Fallback for platforms without sem_getvalue() (for example macOS).
137+
return not self._poll()
128138

129139
def full(self):
130140
return self._sem._semlock._is_zero()

Lib/test/_test_multiprocessing.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,29 @@ def test_qsize(self):
13201320
self.assertEqual(q.qsize(), 0)
13211321
close_queue(q)
13221322

1323+
def test_empty_uses_semaphore_count(self):
1324+
if self.TYPE != 'processes':
1325+
self.skipTest(f'test not appropriate for {self.TYPE}')
1326+
1327+
q = self.Queue()
1328+
try:
1329+
q._sem.get_value()
1330+
except NotImplementedError:
1331+
close_queue(q)
1332+
self.skipTest('sem_getvalue not implemented on this platform')
1333+
1334+
q.put('sentinel')
1335+
original_poll = q._poll
1336+
q._poll = lambda timeout=0.0: False
1337+
try:
1338+
self.assertFalse(q.empty())
1339+
finally:
1340+
q._poll = original_poll
1341+
1342+
self.assertEqual(q.get(timeout=support.SHORT_TIMEOUT), 'sentinel')
1343+
self.assertTrue(q.empty())
1344+
close_queue(q)
1345+
13231346
@classmethod
13241347
def _test_task_done(cls, q):
13251348
for obj in iter(q.get, None):

0 commit comments

Comments
 (0)