From cf0224a3cc69e6892c0ac85b30f86546a908e3a8 Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Mon, 2 Sep 2024 22:42:29 -0400 Subject: [PATCH 01/12] Updated for python3.6+ --- .gitignore | 1 + README | 21 - README.md | 26 + setup.py | 36 +- stream.py | 2004 +++++++++++++++++++++++---------------------- test/asyncpool.py | 47 +- test/collector.py | 11 +- test/executor.py | 16 +- test/sorter.py | 12 +- 9 files changed, 1094 insertions(+), 1080 deletions(-) delete mode 100644 README create mode 100644 README.md diff --git a/.gitignore b/.gitignore index ce9b540..ba86012 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ build dist *.swp +stream.egg-info diff --git a/README b/README deleted file mode 100644 index 5a067e0..0000000 --- a/README +++ /dev/null @@ -1,21 +0,0 @@ -ABOUT -===== - -Streams are iterables with a pipelining mechanism to enable data-flow -programming and easy parallelization. - -See the reference documentation at http://www.trinhhaianh.com/stream.py - - -INSTALL -======= - -This module requires Python 2.6. - -To install system-wide: - - $ sudo python ./setup.py install - -or if you just need it in a project: - - $ cp ./stream.py ~/your/project diff --git a/README.md b/README.md new file mode 100644 index 0000000..3c26183 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +ABOUT +===== + +Streams are iterables with a pipelining mechanism to enable data-flow +programming and easy parallelization. + +See the reference documentation in [doc](doc/index.rst). + + +INSTALL +======= + +This module requires Python 3.6. + + $ pip install stream + +or, for development, + + $ pip install -e . + + +TEST +==== + + python3 stream.py # runs doctests + pytest test/*.py # runs test_* functions diff --git a/setup.py b/setup.py index 3eb62a5..210c639 100755 --- a/setup.py +++ b/setup.py @@ -1,24 +1,13 @@ #!/usr/bin/env python -import os -import sys - -from distutils.core import setup - -__dir__ = os.path.realpath(os.path.dirname(__file__)) - -sys.path.insert(0, __dir__) -try: - import stream -finally: - del sys.path[0] +from setuptools import setup, find_packages classifiers = """ Development Status :: 3 - Alpha Intended Audience :: Developers License :: OSI Approved :: MIT License Operating System :: OS Independent -Programming Language :: Python +Programming Language :: Python :: 3 Topic :: Software Development :: Libraries :: Python Modules Topic :: Utilities """ @@ -26,23 +15,24 @@ __doc__ = """Streams are iterables with a pipelining mechanism to enable data-flow programming and easy parallelization. -See the reference documentation at . - -Articles written by the author about the module can be viewed at . +See the reference documentation in the doc/ subdirectory. The code repository is located at . """ setup( - name = 'stream', - version = stream.__version__, - description = stream.__doc__.split('\n')[0], + name='stream', + version='0.9.0', + description=__doc__.split('\n', 1)[0], long_description = __doc__, - author = 'Anh Hai Trinh', - author_email = 'moc.liamg@hnirt.iah.hna:otliam'[::-1], + author='Anh Hai Trinh', + author_email='moc.liamg@hnirt.iah.hna:otliam'[::-1], keywords='lazy iterator generator stream pipe parallellization data flow functional list processing', url = 'http://github.com/aht/stream.py', - platforms=['any'], + #packages=find_packages(), # Automatically find packages in the directory + #packages=['your_package'], # List of packages to include + py_modules=['stream'], # single python module to include classifiers=filter(None, classifiers.split("\n")), - py_modules = ['stream'] + platforms=['any'], + python_requires='>=3.6', ) diff --git a/stream.py b/stream.py index 76d04c9..c6ab06a 100644 --- a/stream.py +++ b/stream.py @@ -18,17 +18,17 @@ of one iterable argument. Producers: anything iterable - + from this module: seq, gseq, repeatcall, chaincall + + from this module: seq, gseq, repeatcall, chaincall Filters: - + by index: take, drop, takei, dropi - + by condition: filter, takewhile, dropwhile - + by transformation: apply, map, fold - + by combining streams: prepend, tee - + for special purpose: chop, cut, flatten + + by index: take, drop, takei, dropi + + by condition: filter, takewhile, dropwhile + + by transformation: apply, map, fold + + by combining streams: prepend, tee + + for special purpose: chop, cut, flatten Accumulators: item, maximum, minimum, reduce - + from Python: list, sum, dict, max, min ... + + from Python: list, sum, dict, max, min ... Values are computed only when an accumulator forces some or all evaluation (not when the stream are set up). @@ -70,15 +70,13 @@ . """ -from __future__ import with_statement - -import __builtin__ import copy import collections import heapq import itertools +import functools import operator -import Queue +import queue import re import select import sys @@ -87,34 +85,45 @@ from operator import itemgetter, attrgetter -zip = itertools.izip +_filter = filter +_map = map +_reduce = functools.reduce + +try: + zip = itertools.izip +except AttributeError: + pass try: - import multiprocessing - import multiprocessing.queues - _nCPU = multiprocessing.cpu_count() + import multiprocessing + _nCPU = multiprocessing.cpu_count() except ImportError: - _nCPU = 1 + _nCPU = 1 try: - Iterable = collections.Iterable + Iterable = collections.Iterable except AttributeError: - Iterable = object + Iterable = object try: - next + next except NameError: - def next(iterator): - return iterator.next() + def next(iterator): + return iterator.next() try: - from operator import methodcaller + from operator import methodcaller except ImportError: - def methodcaller(methodname, *args, **kwargs): - return lambda o: getattr(o, methodname)(*args, **kwargs) + def methodcaller(methodname, *args, **kwargs): + return lambda o: getattr(o, methodname)(*args, **kwargs) + +import pkg_resources -__version__ = '0.8' +try: + __version__ = pkg_resources.get_distribution('stream').version +except Exception: + __version__ = 'unknown' #_____________________________________________________________________ @@ -122,78 +131,78 @@ def methodcaller(methodname, *args, **kwargs): class BrokenPipe(Exception): - pass + pass class Stream(Iterable): - """A stream is both a lazy list and an iterator-processing function. - - The lazy list is represented by the attribute 'iterator'. - - The iterator-processing function is represented by the method - __call__(iterator), which should return a new iterator - representing the output of the Stream. - - By default, __call__(iterator) chains iterator with self.iterator, - appending itself to the input stream in effect. - - __pipe__(inpipe) defines the connection mechanism between Stream objects. - By default, it replaces self.iterator with the iterator returned by - __call__(iter(inpipe)). - - A Stream subclass will usually implement __call__, unless it is an - accumulator and will not return a Stream, in which case it will need to - implement __pipe__. - - The `>>` operator works as follow: the expression `a >> b` means - `b.__pipe__(a) if hasattr(b, '__pipe__') else b(a)`. - - >>> [1, 2, 3] >> Stream([4, 5, 6]) >> list - [1, 2, 3, 4, 5, 6] - """ - def __init__(self, iterable=None): - """Make a Stream object from an iterable.""" - self.iterator = iter(iterable if iterable else []) - - def __iter__(self): - return self.iterator - - def __call__(self, iterator): - """Append to the end of iterator.""" - return itertools.chain(iterator, self.iterator) - - def __pipe__(self, inpipe): - self.iterator = self.__call__(iter(inpipe)) - return self - - @staticmethod - def pipe(inpipe, outpipe): - """Connect inpipe and outpipe. If outpipe is not a Stream instance, - it should be an function callable on an iterable. - """ - if hasattr(outpipe, '__pipe__'): - return outpipe.__pipe__(inpipe) - elif hasattr(outpipe, '__call__'): - return outpipe(inpipe) - else: - raise BrokenPipe('No connection mechanism defined') - - def __rshift__(self, outpipe): - return Stream.pipe(self, outpipe) - - def __rrshift__(self, inpipe): - return Stream.pipe(inpipe, self) - - def extend(self, outpipe): - """Similar to __pipe__, except that outpipe must be a Stream, in - which case self.iterator will be modified in-place by calling - outpipe.__call__ on it. - """ - self.iterator = outpipe.__call__(self.iterator) - return self - - def __repr__(self): - return 'Stream(%s)' % repr(self.iterator) + """A stream is both a lazy list and an iterator-processing function. + + The lazy list is represented by the attribute 'iterator'. + + The iterator-processing function is represented by the method + __call__(iterator), which should return a new iterator + representing the output of the Stream. + + By default, __call__(iterator) chains iterator with self.iterator, + appending itself to the input stream in effect. + + __pipe__(inpipe) defines the connection mechanism between Stream objects. + By default, it replaces self.iterator with the iterator returned by + __call__(iter(inpipe)). + + A Stream subclass will usually implement __call__, unless it is an + accumulator and will not return a Stream, in which case it will need to + implement __pipe__. + + The `>>` operator works as follow: the expression `a >> b` means + `b.__pipe__(a) if hasattr(b, '__pipe__') else b(a)`. + + >>> [1, 2, 3] >> Stream([4, 5, 6]) >> list + [1, 2, 3, 4, 5, 6] + """ + def __init__(self, iterable=None): + """Make a Stream object from an iterable.""" + self.iterator = iter(iterable if iterable else []) + + def __iter__(self): + return self.iterator + + def __call__(self, iterator): + """Append to the end of iterator.""" + return itertools.chain(iterator, self.iterator) + + def __pipe__(self, inpipe): + self.iterator = self.__call__(iter(inpipe)) + return self + + @staticmethod + def pipe(inpipe, outpipe): + """Connect inpipe and outpipe. If outpipe is not a Stream instance, + it should be an function callable on an iterable. + """ + if hasattr(outpipe, '__pipe__'): + return outpipe.__pipe__(inpipe) + elif hasattr(outpipe, '__call__'): + return outpipe(inpipe) + else: + raise BrokenPipe('No connection mechanism defined') + + def __rshift__(self, outpipe): + return Stream.pipe(self, outpipe) + + def __rrshift__(self, inpipe): + return Stream.pipe(inpipe, self) + + def extend(self, outpipe): + """Similar to __pipe__, except that outpipe must be a Stream, in + which case self.iterator will be modified in-place by calling + outpipe.__call__ on it. + """ + self.iterator = outpipe.__call__(self.iterator) + return self + + def __repr__(self): + return 'Stream(%s)' % repr(self.iterator) #_______________________________________________________________________ @@ -201,172 +210,175 @@ def __repr__(self): class take(Stream): - """Take the firts n items of the input stream, return a Stream. - - >>> seq(1, 2) >> take(10) - Stream([1, 3, 5, 7, 9, 11, 13, 15, 17, 19]) - """ - def __init__(self, n): - """n: the number of elements to be taken""" - super(take, self).__init__() - self.n = n - self.items = [] + """Take the firts n items of the input stream, return a Stream. + + >>> seq(1, 2) >> take(10) + Stream([1, 3, 5, 7, 9, 11, 13, 15, 17, 19]) + """ + def __init__(self, n): + """n: the number of elements to be taken""" + super(take, self).__init__() + self.n = n + self.items = [] - def __call__(self, iterator): - self.items = list(itertools.islice(iterator, self.n)) - return iter(self.items) + def __call__(self, iterator): + self.items = list(itertools.islice(iterator, self.n)) + return iter(self.items) - def __repr__(self): - return 'Stream(%s)' % repr(self.items) + def __repr__(self): + return 'Stream(%s)' % repr(self.items) negative = lambda x: x and x < 0 ### since None < 0 == True class itemtaker(Stream): - """Slice the input stream, return a list. - - >>> i = itertools.count() - >>> i >> item[:10:2] - [0, 2, 4, 6, 8] - >>> i >> item[:5] - [10, 11, 12, 13, 14] - - >>> xrange(20) >> item[::-2] - [19, 17, 15, 13, 11, 9, 7, 5, 3, 1] - """ - def __init__(self, key=None): - self.key = key - - @staticmethod - def __getitem__(key): - if (type(key) is int) or (type(key) is slice): - return itemtaker(key) - else: - raise TypeError('key must be an integer or a slice') - - def __pipe__(self, inpipe): - i = iter(inpipe) - if type(self.key) is int: - ## just one item is needed - if self.key >= 0: - # throw away self.key items - collections.deque(itertools.islice(i, self.key), maxlen=0) - return next(i) - else: - # keep the last -self.key items - # since we don't know beforehand when the stream stops - n = -self.key if self.key else 1 - items = collections.deque(itertools.islice(i, None), maxlen=n) - if items: - return items[-n] - else: - return [] - else: - ## a list is needed - if negative(self.key.stop) or negative(self.key.start) \ - or not (self.key.start or self.key.stop) \ - or (not self.key.start and negative(self.key.step)) \ - or (not self.key.stop and not negative(self.key.step)): - # force all evaluation - items = [x for x in i] - else: - # force some evaluation - if negative(self.key.step): - stop = self.key.start - else: - stop = self.key.stop - items = list(itertools.islice(i, stop)) - return items[self.key] - - def __repr__(self): - return '' % hex(id(self)) + """Slice the input stream, return a list. + + >>> i = itertools.count() + >>> i >> item[:10:2] + [0, 2, 4, 6, 8] + >>> i >> item[:5] + [10, 11, 12, 13, 14] + + >>> range(20) >> item[::-2] + [19, 17, 15, 13, 11, 9, 7, 5, 3, 1] + """ + def __init__(self, key=None): + self.key = key + + @staticmethod + def __getitem__(key): + if (type(key) is int) or (type(key) is slice): + return itemtaker(key) + else: + raise TypeError('key must be an integer or a slice') + + def __pipe__(self, inpipe): + i = iter(inpipe) + if type(self.key) is int: + ## just one item is needed + if self.key >= 0: + # throw away self.key items + collections.deque(itertools.islice(i, self.key), maxlen=0) + return next(i) + else: + # keep the last -self.key items + # since we don't know beforehand when the stream stops + n = -self.key if self.key else 1 + items = collections.deque(itertools.islice(i, None), maxlen=n) + if items: + return items[-n] + else: + return [] + else: + ## a list is needed + if negative(self.key.stop) or negative(self.key.start) \ + or not (self.key.start or self.key.stop) \ + or (not self.key.start and negative(self.key.step)) \ + or (not self.key.stop and not negative(self.key.step)): + # force all evaluation + items = [x for x in i] + else: + # force some evaluation + if negative(self.key.step): + stop = self.key.start + else: + stop = self.key.stop + items = list(itertools.islice(i, stop)) + return items[self.key] + + def __repr__(self): + return '' % hex(id(self)) item = itemtaker() class takei(Stream): - """Take elements of the input stream by indices. - - >>> seq() >> takei(xrange(2, 43, 4)) >> list - [2, 6, 10, 14, 18, 22, 26, 30, 34, 38, 42] - """ - def __init__(self, indices): - """indices: an iterable of indices to be taken, should yield - non-negative integers in monotonically increasing order - """ - super(takei, self).__init__() - self.indexiter = iter(indices) - - def __call__(self, iterator): - def itaker(): - old_idx = -1 - idx = next(self.indexiter) # next value to yield - counter = seq() - while 1: - c = next(counter) - elem = next(iterator) - while idx <= old_idx: # ignore bad values - idx = next(self.indexiter) - if c == idx: - yield elem - old_idx = idx - idx = next(self.indexiter) - return itaker() + """Take elements of the input stream by indices. + + >>> seq() >> takei(range(2, 43, 4)) >> list + [2, 6, 10, 14, 18, 22, 26, 30, 34, 38, 42] + """ + def __init__(self, indices): + """indices: an iterable of indices to be taken, should yield + non-negative integers in monotonically increasing order + """ + super(takei, self).__init__() + self.indexiter = iter(indices) + + def __call__(self, iterator): + def itaker(): + try: + old_idx = -1 + idx = next(self.indexiter) # next value to yield + counter = seq() + while 1: + c = next(counter) + elem = next(iterator) + while idx <= old_idx: # ignore bad values + idx = next(self.indexiter) + if c == idx: + yield elem + old_idx = idx + idx = next(self.indexiter) + except StopIteration: + pass + return itaker() class drop(Stream): - """Drop the first n elements of the input stream. + """Drop the first n elements of the input stream. - >>> seq(0, 2) >> drop(1) >> take(5) - Stream([2, 4, 6, 8, 10]) - """ - def __init__(self, n): - """n: the number of elements to be dropped""" - super(drop, self).__init__() - self.n = n + >>> seq(0, 2) >> drop(1) >> take(5) + Stream([2, 4, 6, 8, 10]) + """ + def __init__(self, n): + """n: the number of elements to be dropped""" + super(drop, self).__init__() + self.n = n - def __call__(self, iterator): - collections.deque(itertools.islice(iterator, self.n), maxlen=0) - return iterator + def __call__(self, iterator): + collections.deque(itertools.islice(iterator, self.n), maxlen=0) + return iterator class dropi(Stream): - """Drop elements of the input stream by indices. - - >>> seq() >> dropi(seq(0,3)) >> item[:10] - [1, 2, 4, 5, 7, 8, 10, 11, 13, 14] - """ - def __init__(self, indices): - """indices: an iterable of indices to be dropped, should yield - non-negative integers in monotonically increasing order - """ - super(dropi, self).__init__() - self.indexiter = iter(indices) - - def __call__(self, iterator): - def idropper(): - counter = seq() - def try_next_idx(): - ## so that the stream keeps going - ## after the discard iterator is exhausted - try: - return next(self.indexiter), False - except StopIteration: - return -1, True - old_idx = -1 - idx, exhausted = try_next_idx() # next value to discard - while 1: - c = next(counter) - elem = next(iterator) - while not exhausted and idx <= old_idx: # ignore bad values - idx, exhausted = try_next_idx() - if c != idx: - yield elem - elif not exhausted: - old_idx = idx - idx, exhausted = try_next_idx() - return idropper() + """Drop elements of the input stream by indices. + + >>> seq() >> dropi(seq(0,3)) >> item[:10] + [1, 2, 4, 5, 7, 8, 10, 11, 13, 14] + """ + def __init__(self, indices): + """indices: an iterable of indices to be dropped, should yield + non-negative integers in monotonically increasing order + """ + super(dropi, self).__init__() + self.indexiter = iter(indices) + + def __call__(self, iterator): + def idropper(): + counter = seq() + def try_next_idx(): + ## so that the stream keeps going + ## after the discard iterator is exhausted + try: + return next(self.indexiter), False + except StopIteration: + return -1, True + old_idx = -1 + idx, exhausted = try_next_idx() # next value to discard + while 1: + c = next(counter) + elem = next(iterator) + while not exhausted and idx <= old_idx: # ignore bad values + idx, exhausted = try_next_idx() + if c != idx: + yield elem + elif not exhausted: + old_idx = idx + idx, exhausted = try_next_idx() + return idropper() #_______________________________________________________________________ @@ -374,136 +386,136 @@ def try_next_idx(): class Processor(Stream): - """A decorator to turn an iterator-processing function into - a Stream processor object. - """ - def __init__(self, function): - """function: an iterator-processing function, one that takes an - iterator and return an iterator - """ - super(Processor, self).__init__() - self.function = function - - def __call__(self, iterator): - return self.function(iterator) + """A decorator to turn an iterator-processing function into + a Stream processor object. + """ + def __init__(self, function): + """function: an iterator-processing function, one that takes an + iterator and return an iterator + """ + super(Processor, self).__init__() + self.function = function + + def __call__(self, iterator): + return self.function(iterator) class apply(Stream): - """Invoke a function using each element of the input stream unpacked as - its argument list, a la itertools.starmap. + """Invoke a function using each element of the input stream unpacked as + its argument list, a la itertools.starmap. - >>> vectoradd = lambda u,v: zip(u, v) >> apply(lambda x,y: x+y) >> list - >>> vectoradd([1, 2, 3], [4, 5, 6]) - [5, 7, 9] - """ - def __init__(self, function): - """function: to be called with each stream element unpacked as its - argument list - """ - super(apply, self).__init__() - self.function = function + >>> vectoradd = lambda u,v: zip(u, v) >> apply(lambda x,y: x+y) >> list + >>> vectoradd([1, 2, 3], [4, 5, 6]) + [5, 7, 9] + """ + def __init__(self, function): + """function: to be called with each stream element unpacked as its + argument list + """ + super(apply, self).__init__() + self.function = function - def __call__(self, iterator): - return itertools.starmap(self.function, iterator) + def __call__(self, iterator): + return itertools.starmap(self.function, iterator) class map(Stream): - """Invoke a function using each element of the input stream as its only - argument, a la itertools.imap. + """Invoke a function using each element of the input stream as its only + argument, a la `map` - >>> square = lambda x: x*x - >>> range(10) >> map(square) >> list - [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] - """ - def __init__(self, function): - """function: to be called with each stream element as its - only argument - """ - super(map, self).__init__() - self.function = function + >>> square = lambda x: x*x + >>> range(10) >> map(square) >> list + [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] + """ + def __init__(self, function): + """function: to be called with each stream element as its + only argument + """ + super(map, self).__init__() + self.function = function - def __call__(self, iterator): - return itertools.imap(self.function, iterator) + def __call__(self, iterator): + return _map(self.function, iterator) class filter(Stream): - """Filter the input stream, selecting only values which evaluates to True - by the given function, a la itertools.ifilter. + """Filter the input stream, selecting only values which evaluates to True + by the given function, a la `filter`. - >>> even = lambda x: x%2 == 0 - >>> range(10) >> filter(even) >> list - [0, 2, 4, 6, 8] - """ - def __init__(self, function): - """function: to be called with each stream element as its - only argument - """ - super(filter, self).__init__() - self.function = function + >>> even = lambda x: x%2 == 0 + >>> range(10) >> filter(even) >> list + [0, 2, 4, 6, 8] + """ + def __init__(self, function): + """function: to be called with each stream element as its + only argument + """ + super(filter, self).__init__() + self.function = function - def __call__(self, iterator): - return itertools.ifilter(self.function, iterator) + def __call__(self, iterator): + return _filter(self.function, iterator) class takewhile(Stream): - """Take items from the input stream that come before the first item to - evaluate to False by the given function, a la itertools.takewhile. - """ - def __init__(self, function): - """function: to be called with each stream element as its - only argument - """ - super(takewhile, self).__init__() - self.function = function + """Take items from the input stream that come before the first item to + evaluate to False by the given function, a la itertools.takewhile. + """ + def __init__(self, function): + """function: to be called with each stream element as its + only argument + """ + super(takewhile, self).__init__() + self.function = function - def __call__(self, iterator): - return itertools.takewhile(self.function, iterator) + def __call__(self, iterator): + return itertools.takewhile(self.function, iterator) class dropwhile(Stream): - """Drop items from the input stream that come before the first item to - evaluate to False by the given function, a la itertools.dropwhile. - """ - def __init__(self, function): - """function: to be called with each stream element as its - only argument - """ - super(dropwhile, self).__init__() - self.function = function + """Drop items from the input stream that come before the first item to + evaluate to False by the given function, a la itertools.dropwhile. + """ + def __init__(self, function): + """function: to be called with each stream element as its + only argument + """ + super(dropwhile, self).__init__() + self.function = function - def __call__(self, iterator): - return itertools.dropwhile(self.function, iterator) + def __call__(self, iterator): + return itertools.dropwhile(self.function, iterator) class fold(Stream): - """Combines the elements of the input stream by applying a function of two - argument to a value and each element in turn. At each step, the value is - set to the value returned by the function, thus it is, in effect, an - accumulation. - - Intermediate values are yielded (similar to Haskell `scanl`). - - This example calculate partial sums of the series 1 + 1/2 + 1/4 +... - - >>> gseq(0.5) >> fold(operator.add) >> item[:5] - [1, 1.5, 1.75, 1.875, 1.9375] - """ - def __init__(self, function, initval=None): - super(fold, self).__init__() - self.function = function - self.initval = initval - - def __call__(self, iterator): - def folder(): - if self.initval: - accumulated = self.initval - else: - accumulated = next(iterator) - while 1: - yield accumulated - val = next(iterator) - accumulated = self.function(accumulated, val) - return folder() + """Combines the elements of the input stream by applying a function of two + argument to a value and each element in turn. At each step, the value is + set to the value returned by the function, thus it is, in effect, an + accumulation. + + Intermediate values are yielded (similar to Haskell `scanl`). + + This example calculate partial sums of the series 1 + 1/2 + 1/4 +... + + >>> gseq(0.5) >> fold(operator.add) >> item[:5] + [1, 1.5, 1.75, 1.875, 1.9375] + """ + def __init__(self, function, initval=None): + super(fold, self).__init__() + self.function = function + self.initval = initval + + def __call__(self, iterator): + def folder(): + if self.initval: + accumulated = self.initval + else: + accumulated = next(iterator) + while 1: + yield accumulated + val = next(iterator) + accumulated = self.function(accumulated, val) + return folder() #_____________________________________________________________________ @@ -511,76 +523,76 @@ def folder(): class chop(Stream): - """Chop the input stream into segments of length n. - - >>> range(10) >> chop(3) >> list - [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] - """ - def __init__(self, n): - """n: the length of the segments""" - super(chop, self).__init__() - self.n = n - - def __call__(self, iterator): - def chopper(): - while 1: - s = iterator >> item[:self.n] - if s: - yield s - else: - break - return chopper() + """Chop the input stream into segments of length n. + + >>> range(10) >> chop(3) >> list + [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] + """ + def __init__(self, n): + """n: the length of the segments""" + super(chop, self).__init__() + self.n = n + + def __call__(self, iterator): + def chopper(): + while 1: + s = iterator >> item[:self.n] + if s: + yield s + else: + break + return chopper() class itemcutter(map): - """Slice each element of the input stream. + """Slice each element of the input stream. - >>> [range(10), range(10, 20)] >> cut[::2] >> list - [[0, 2, 4, 6, 8], [10, 12, 14, 16, 18]] - """ + >>> [range(10), range(10, 20)] >> cut[::2] >> map(list) >> list + [[0, 2, 4, 6, 8], [10, 12, 14, 16, 18]] + """ - def __init__(self, *args): - super(itemcutter, self).__init__( methodcaller('__getitem__', *args) ) + def __init__(self, *args): + super(itemcutter, self).__init__( methodcaller('__getitem__', *args) ) - @classmethod - def __getitem__(cls, args): - return cls(args) + @classmethod + def __getitem__(cls, args): + return cls(args) - def __repr__(self): - return '' % hex(id(self)) + def __repr__(self): + return '' % hex(id(self)) cut = itemcutter() class flattener(Stream): - """Flatten a nested stream of arbitrary depth. - - >>> (xrange(i) for i in seq(step=3)) >> flatten >> item[:18] - [0, 1, 2, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 6, 7, 8] - """ - @staticmethod - def __call__(iterator): - def flatten(): - ## Maintain a LIFO stack of iterators - stack = [] - i = iterator - while True: - try: - e = next(i) - if hasattr(e, "__iter__") and not isinstance(e, basestring): - stack.append(i) - i = iter(e) - else: - yield e - except StopIteration: - try: - i = stack.pop() - except IndexError: - break - return flatten() - - def __repr__(self): - return '' % hex(id(self)) + """Flatten a nested stream of arbitrary depth. + + >>> (range(i) for i in seq(step=3)) >> flatten >> item[:18] + [0, 1, 2, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 6, 7, 8] + """ + @staticmethod + def __call__(iterator): + def flatten(): + ## Maintain a LIFO stack of iterators + stack = [] + i = iterator + while True: + try: + e = next(i) + if hasattr(e, "__iter__") and not isinstance(e, str): + stack.append(i) + i = iter(e) + else: + yield e + except StopIteration: + try: + i = stack.pop() + except IndexError: + break + return flatten() + + def __repr__(self): + return '' % hex(id(self)) flatten = flattener() @@ -590,37 +602,37 @@ def __repr__(self): class prepend(Stream): - """Inject values at the beginning of the input stream. + """Inject values at the beginning of the input stream. - >>> seq(7, 7) >> prepend(xrange(0, 10, 2)) >> item[:10] - [0, 2, 4, 6, 8, 7, 14, 21, 28, 35] - """ - def __call__(self, iterator): - return itertools.chain(self.iterator, iterator) + >>> seq(7, 7) >> prepend(range(0, 10, 2)) >> item[:10] + [0, 2, 4, 6, 8, 7, 14, 21, 28, 35] + """ + def __call__(self, iterator): + return itertools.chain(self.iterator, iterator) class tee(Stream): - """Make a T-split of the input stream. - - >>> foo = filter(lambda x: x%3==0) - >>> bar = seq(0, 2) >> tee(foo) - >>> bar >> item[:5] - [0, 2, 4, 6, 8] - >>> foo >> item[:5] - [0, 6, 12, 18, 24] - """ - def __init__(self, named_stream): - """named_stream: a Stream object toward which the split branch - will be piped. - """ - super(tee, self).__init__() - self.named_stream = named_stream - - def __pipe__(self, inpipe): - branch1, branch2 = itertools.tee(iter(inpipe)) - self.iterator = branch1 - Stream.pipe(branch2, self.named_stream) - return self + """Make a T-split of the input stream. + + >>> foo = filter(lambda x: x%3==0) + >>> bar = seq(0, 2) >> tee(foo) + >>> bar >> item[:5] + [0, 2, 4, 6, 8] + >>> foo >> item[:5] + [0, 6, 12, 18, 24] + """ + def __init__(self, named_stream): + """named_stream: a Stream object toward which the split branch + will be piped. + """ + super(tee, self).__init__() + self.named_stream = named_stream + + def __pipe__(self, inpipe): + branch1, branch2 = itertools.tee(iter(inpipe)) + self.iterator = branch1 + Stream.pipe(branch2, self.named_stream) + return self #_____________________________________________________________________ @@ -628,39 +640,39 @@ def __pipe__(self, inpipe): def _iterqueue(queue): - # Turn a either a threading.Queue or a multiprocessing.queues.SimpleQueue - # into an thread-safe iterator which will exhaust when StopIteration is - # put into it. - while 1: - item = queue.get() - if item is StopIteration: - # Re-broadcast, in case there is another listener blocking on - # queue.get(). That listener will receive StopIteration and - # re-broadcast to the next one in line. - try: - queue.put(StopIteration) - except IOError: - # Could happen if the Queue is based on a system pipe, - # and the other end was closed. - pass - break - else: - yield item + # Turn a either a threading.Queue or a multiprocessing.SimpleQueue + # into an thread-safe iterator which will exhaust when StopIteration is + # put into it. + while 1: + item = queue.get() + if item is StopIteration: + # Re-broadcast, in case there is another listener blocking on + # queue.get(). That listener will receive StopIteration and + # re-broadcast to the next one in line. + try: + queue.put(StopIteration) + except IOError: + # Could happen if the Queue is based on a system pipe, + # and the other end was closed. + pass + break + else: + yield item def _iterrecv(pipe): - # Turn a the receiving end of a multiprocessing.Connection object - # into an iterator which will exhaust when StopIteration is - # put into it. _iterrecv is NOT safe to use by multiple threads. - while 1: - try: - item = pipe.recv() - except EOFError: - break - else: - if item is StopIteration: - break - else: - yield item + # Turn a the receiving end of a multiprocessing.Connection object + # into an iterator which will exhaust when StopIteration is + # put into it. _iterrecv is NOT safe to use by multiple threads. + while 1: + try: + item = pipe.recv() + except EOFError: + break + else: + if item is StopIteration: + break + else: + yield item #_____________________________________________________________________ @@ -668,70 +680,70 @@ def _iterrecv(pipe): class ThreadedFeeder(Iterable): - def __init__(self, generator, *args, **kwargs): - """Create a feeder that start the given generator with - *args and **kwargs in a separate thread. The feeder will - act as an eagerly evaluating proxy of the generator. - - The feeder can then be iter()'ed over by other threads. - - This should improve performance when the generator often - blocks in system calls. - """ - self.outqueue = Queue.Queue() - def feeder(): - i = generator(*args, **kwargs) - while 1: - try: - self.outqueue.put(next(i)) - except StopIteration: - self.outqueue.put(StopIteration) - break - self.thread = threading.Thread(target=feeder) - self.thread.start() - - def __iter__(self): - return _iterqueue(self.outqueue) - - def join(self): - self.thread.join() - - def __repr__(self): - return '' % hex(id(self)) + def __init__(self, generator, *args, **kwargs): + """Create a feeder that start the given generator with + *args and **kwargs in a separate thread. The feeder will + act as an eagerly evaluating proxy of the generator. + + The feeder can then be iter()'ed over by other threads. + + This should improve performance when the generator often + blocks in system calls. + """ + self.outqueue = queue.Queue() + def feeder(): + i = generator(*args, **kwargs) + while 1: + try: + self.outqueue.put(next(i)) + except StopIteration: + self.outqueue.put(StopIteration) + break + self.thread = threading.Thread(target=feeder) + self.thread.start() + + def __iter__(self): + return _iterqueue(self.outqueue) + + def join(self): + self.thread.join() + + def __repr__(self): + return '' % hex(id(self)) class ForkedFeeder(Iterable): - def __init__(self, generator, *args, **kwargs): - """Create a feeder that start the given generator with - *args and **kwargs in a child process. The feeder will - act as an eagerly evaluating proxy of the generator. - - The feeder can then be iter()'ed over by other processes. - - This should improve performance when the generator often - blocks in system calls. Note that serialization could - be costly. - """ - self.outpipe, inpipe = multiprocessing.Pipe(duplex=False) - def feed(): - i = generator(*args, **kwargs) - while 1: - try: - inpipe.send(next(i)) - except StopIteration: - inpipe.send(StopIteration) - break - self.process = multiprocessing.Process(target=feed) - self.process.start() - - def __iter__(self): - return _iterrecv(self.outpipe) - - def join(self): - self.process.join() - - def __repr__(self): - return '' % hex(id(self)) + def __init__(self, generator, *args, **kwargs): + """Create a feeder that start the given generator with + *args and **kwargs in a child process. The feeder will + act as an eagerly evaluating proxy of the generator. + + The feeder can then be iter()'ed over by other processes. + + This should improve performance when the generator often + blocks in system calls. Note that serialization could + be costly. + """ + self.outpipe, inpipe = multiprocessing.Pipe(duplex=False) + def feed(): + i = generator(*args, **kwargs) + while 1: + try: + inpipe.send(next(i)) + except StopIteration: + inpipe.send(StopIteration) + break + self.process = multiprocessing.Process(target=feed) + self.process.start() + + def __iter__(self): + return _iterrecv(self.outpipe) + + def join(self): + self.process.join() + + def __repr__(self): + return '' % hex(id(self)) #_____________________________________________________________________ @@ -739,326 +751,326 @@ def __repr__(self): class ThreadPool(Stream): - """Work on the input stream asynchronously using a pool of threads. - - >>> range(10) >> ThreadPool(map(lambda x: x*x)) >> sum - 285 - - The pool object is an iterable over the output values. If an - input value causes an Exception to be raised, the tuple (value, - exception) is put into the pool's `failqueue`. The attribute - `failure` is a thead-safe iterator over the `failqueue`. - - See also: Executor - """ - def __init__(self, function, poolsize=_nCPU, args=[], kwargs={}): - """function: an iterator-processing function, one that takes an - iterator and return an iterator - """ - super(ThreadPool, self).__init__() - self.function = function - self.inqueue = Queue.Queue() - self.outqueue = Queue.Queue() - self.failqueue = Queue.Queue() - self.failure = Stream(_iterqueue(self.failqueue)) - self.closed = False - def work(): - input, dupinput = itertools.tee(_iterqueue(self.inqueue)) - output = self.function(input, *args, **kwargs) - while 1: - try: - self.outqueue.put(next(output)) - next(dupinput) - except StopIteration: - break - except Exception, e: - self.failqueue.put((next(dupinput), e)) - self.worker_threads = [] - for _ in range(poolsize): - t = threading.Thread(target=work) - self.worker_threads.append(t) - t.start() - def cleanup(): - # Wait for all workers to finish, - # then signal the end of outqueue and failqueue. - for t in self.worker_threads: - t.join() - self.outqueue.put(StopIteration) - self.failqueue.put(StopIteration) - self.closed = True - self.cleaner_thread = threading.Thread(target=cleanup) - self.cleaner_thread.start() - self.iterator = _iterqueue(self.outqueue) - - def __call__(self, inpipe): - if self.closed: - raise BrokenPipe('All workers are dead, refusing to summit jobs. ' - 'Use another Pool.') - def feed(): - for item in inpipe: - self.inqueue.put(item) - self.inqueue.put(StopIteration) - self.feeder_thread = threading.Thread(target=feed) - self.feeder_thread.start() - return self.iterator - - def join(self): - self.cleaner_thread.join() - - def __repr__(self): - return '' % (self.poolsize, hex(id(self))) + """Work on the input stream asynchronously using a pool of threads. + + >>> range(10) >> ThreadPool(map(lambda x: x*x)) >> sum + 285 + + The pool object is an iterable over the output values. If an + input value causes an Exception to be raised, the tuple (value, + exception) is put into the pool's `failqueue`. The attribute + `failure` is a thead-safe iterator over the `failqueue`. + + See also: Executor + """ + def __init__(self, function, poolsize=_nCPU, args=[], kwargs={}): + """function: an iterator-processing function, one that takes an + iterator and return an iterator + """ + super(ThreadPool, self).__init__() + self.function = function + self.inqueue = queue.Queue() + self.outqueue = queue.Queue() + self.failqueue = queue.Queue() + self.failure = Stream(_iterqueue(self.failqueue)) + self.closed = False + def work(): + input, dupinput = itertools.tee(_iterqueue(self.inqueue)) + output = self.function(input, *args, **kwargs) + while 1: + try: + self.outqueue.put(next(output)) + next(dupinput) + except StopIteration: + break + except Exception as e: + self.failqueue.put((next(dupinput), e)) + self.worker_threads = [] + for _ in range(poolsize): + t = threading.Thread(target=work) + self.worker_threads.append(t) + t.start() + def cleanup(): + # Wait for all workers to finish, + # then signal the end of outqueue and failqueue. + for t in self.worker_threads: + t.join() + self.outqueue.put(StopIteration) + self.failqueue.put(StopIteration) + self.closed = True + self.cleaner_thread = threading.Thread(target=cleanup) + self.cleaner_thread.start() + self.iterator = _iterqueue(self.outqueue) + + def __call__(self, inpipe): + if self.closed: + raise BrokenPipe('All workers are dead, refusing to summit jobs. ' + 'Use another Pool.') + def feed(): + for item in inpipe: + self.inqueue.put(item) + self.inqueue.put(StopIteration) + self.feeder_thread = threading.Thread(target=feed) + self.feeder_thread.start() + return self.iterator + + def join(self): + self.cleaner_thread.join() + + def __repr__(self): + return '' % (self.poolsize, hex(id(self))) class ProcessPool(Stream): - """Work on the input stream asynchronously using a pool of processes. - - >>> range(10) >> ProcessPool(map(lambda x: x*x)) >> sum - 285 - - The pool object is an iterable over the output values. If an - input value causes an Exception to be raised, the tuple (value, - exception) is put into the pool's `failqueue`. The attribute - `failure` is a thead-safe iterator over the `failqueue`. - - See also: Executor - """ - def __init__(self, function, poolsize=_nCPU, args=[], kwargs={}): - """function: an iterator-processing function, one that takes an - iterator and return an iterator - """ - super(ProcessPool, self).__init__() - self.function = function - self.poolsize = poolsize - self.inqueue = multiprocessing.queues.SimpleQueue() - self.outqueue = multiprocessing.queues.SimpleQueue() - self.failqueue = multiprocessing.queues.SimpleQueue() - self.failure = Stream(_iterqueue(self.failqueue)) - self.closed = False - def work(): - input, dupinput = itertools.tee(_iterqueue(self.inqueue)) - output = self.function(input, *args, **kwargs) - while 1: - try: - self.outqueue.put(next(output)) - next(dupinput) - except StopIteration: - break - except Exception, e: - self.failqueue.put((next(dupinput), e)) - self.worker_processes = [] - for _ in range(self.poolsize): - p = multiprocessing.Process(target=work) - self.worker_processes.append(p) - p.start() - def cleanup(): - # Wait for all workers to finish, - # then signal the end of outqueue and failqueue. - for p in self.worker_processes: - p.join() - self.outqueue.put(StopIteration) - self.failqueue.put(StopIteration) - self.closed = True - self.cleaner_thread = threading.Thread(target=cleanup) - self.cleaner_thread.start() - self.iterator = _iterqueue(self.outqueue) - - def __call__(self, inpipe): - if self.closed: - raise BrokenPipe('All workers are dead, refusing to summit jobs. ' - 'Use another Pool.') - def feed(): - for item in inpipe: - self.inqueue.put(item) - self.inqueue.put(StopIteration) - self.feeder_thread = threading.Thread(target=feed) - self.feeder_thread.start() - return self.iterator - - def join(self): - self.cleaner_thread.join() - - def __repr__(self): - return '' % (self.poolsize, hex(id(self))) + """Work on the input stream asynchronously using a pool of processes. + + >>> range(10) >> ProcessPool(map(lambda x: x*x)) >> sum + 285 + + The pool object is an iterable over the output values. If an + input value causes an Exception to be raised, the tuple (value, + exception) is put into the pool's `failqueue`. The attribute + `failure` is a thead-safe iterator over the `failqueue`. + + See also: Executor + """ + def __init__(self, function, poolsize=_nCPU, args=[], kwargs={}): + """function: an iterator-processing function, one that takes an + iterator and return an iterator + """ + super(ProcessPool, self).__init__() + self.function = function + self.poolsize = poolsize + self.inqueue = multiprocessing.SimpleQueue() + self.outqueue = multiprocessing.SimpleQueue() + self.failqueue = multiprocessing.SimpleQueue() + self.failure = Stream(_iterqueue(self.failqueue)) + self.closed = False + def work(): + input, dupinput = itertools.tee(_iterqueue(self.inqueue)) + output = self.function(input, *args, **kwargs) + while 1: + try: + self.outqueue.put(next(output)) + next(dupinput) + except StopIteration: + break + except Exception as e: + self.failqueue.put((next(dupinput), e)) + self.worker_processes = [] + for _ in range(self.poolsize): + p = multiprocessing.Process(target=work) + self.worker_processes.append(p) + p.start() + def cleanup(): + # Wait for all workers to finish, + # then signal the end of outqueue and failqueue. + for p in self.worker_processes: + p.join() + self.outqueue.put(StopIteration) + self.failqueue.put(StopIteration) + self.closed = True + self.cleaner_thread = threading.Thread(target=cleanup) + self.cleaner_thread.start() + self.iterator = _iterqueue(self.outqueue) + + def __call__(self, inpipe): + if self.closed: + raise BrokenPipe('All workers are dead, refusing to summit jobs. ' + 'Use another Pool.') + def feed(): + for item in inpipe: + self.inqueue.put(item) + self.inqueue.put(StopIteration) + self.feeder_thread = threading.Thread(target=feed) + self.feeder_thread.start() + return self.iterator + + def join(self): + self.cleaner_thread.join() + + def __repr__(self): + return '' % (self.poolsize, hex(id(self))) class Executor(object): - """Provide a fine-grained level of control over a ThreadPool or ProcessPool. - - The constructor takes a pool class and arguments to its constructor:: - - >>> executor = Executor(ThreadPool, map(lambda x: x*x)) - - Job ids are returned when items are submitted:: - - >>> executor.submit(*range(10)) - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - >>> executor.submit('foo') - 10 - - A call to close() ends jobs submission. Workers threads/processes - are now allowed to terminate after all jobs are completed:: - - >>> executor.close() - - The `result` and `failure` attributes are Stream instances and - thus iterable. The returned iterators behave as follow: their - next() calls will block until a next output is available, or - raise StopIteration if there is no more output. Thus we could use - the attributes `result` and `failure` like any other iterables:: - - >>> set(executor.result) == set([0, 1, 4, 9, 16, 25, 36, 49, 64, 81]) - True - >>> list(executor.failure) - [('foo', TypeError("can't multiply sequence by non-int of type 'str'",))] - """ - def __init__(self, poolclass, function, poolsize=_nCPU, args=[], kwargs={}): - def process_job_id(input): - input, dupinput = itertools.tee(input) - id = iter(dupinput >> cut[0]) - input = iter(input >> cut[1]) - output = function(input, *args, **kwargs) - for item in output: - yield next(id), item - self.pool = poolclass(process_job_id, - poolsize=poolsize, - args=args, - kwargs=kwargs) - self.jobcount = 0 - self._status = [] - self.waitqueue = Queue.Queue() - if poolclass is ProcessPool: - self.resultqueue = multiprocessing.queues.SimpleQueue() - self.failqueue = multiprocessing.queues.SimpleQueue() - else: - self.resultqueue = Queue.Queue() - self.failqueue = Queue.Queue() - self.result = Stream(_iterqueue(self.resultqueue)) - self.failure = Stream(_iterqueue(self.failqueue)) - self.closed = False - - self.lock = threading.Lock() - ## Acquired to submit and update job statuses. - - self.sema = threading.BoundedSemaphore(poolsize) - ## Used to throttle transfer from waitqueue to pool.inqueue, - ## acquired by input_feeder, released by trackers. - - def feed_input(): - for id, item in _iterqueue(self.waitqueue): - self.sema.acquire() - with self.lock: - if self._status[id] == 'SUBMITTED': - self.pool.inqueue.put((id, item)) - self._status[id] = 'RUNNING' - else: - self.sema.release() - self.pool.inqueue.put(StopIteration) - self.inputfeeder_thread = threading.Thread(target=feed_input) - self.inputfeeder_thread.start() - - def track_result(): - for id, item in self.pool: - self.sema.release() - with self.lock: - self._status[id] = 'COMPLETED' - self.resultqueue.put(item) - self.resultqueue.put(StopIteration) - self.resulttracker_thread = threading.Thread(target=track_result) - self.resulttracker_thread.start() - - def track_failure(): - for outval, exception in self.pool.failure: - self.sema.release() - id, item = outval - with self.lock: - self._status[id] = 'FAILED' - self.failqueue.put((item, exception)) - self.failqueue.put(StopIteration) - self.failuretracker_thread = threading.Thread(target=track_failure) - self.failuretracker_thread.start() - - def submit(self, *items): - """Return job ids assigned to the submitted items.""" - with self.lock: - if self.closed: - raise BrokenPipe('Job submission has been closed.') - id = self.jobcount - self._status += ['SUBMITTED'] * len(items) - self.jobcount += len(items) - for item in items: - self.waitqueue.put((id, item)) - id += 1 - if len(items) == 1: - return id - 1 - else: - return range(id - len(items), id) - - def cancel(self, *ids): - """Try to cancel jobs with associated ids. - - Return the actual number of jobs cancelled. - """ - ncancelled = 0 - with self.lock: - for id in ids: - try: - if self._status[id] == 'SUBMITTED': - self._status[id] = 'CANCELLED' - ncancelled += 1 - except IndexError: - pass - return ncancelled - - def status(self, *ids): - """Return the statuses of jobs with associated ids at the - time of call: either 'SUBMITED', 'CANCELLED', 'RUNNING', - 'COMPLETED' or 'FAILED'. - """ - with self.lock: - if len(ids) > 1: - return [self._status[i] for i in ids] - else: - return self._status[ids[0]] - - def close(self): - """Signal that the executor will no longer accept job submission. - - Worker threads/processes are now allowed to terminate after all - jobs have been are completed. Without a call to close(), they will - stay around forever waiting for more jobs to come. - """ - with self.lock: - if self.closed: - return - self.waitqueue.put(StopIteration) - self.closed = True - - def join(self): - """Note that the Executor must be close()'d elsewhere, - or join() will never return. - """ - self.inputfeeder_thread.join() - self.pool.join() - self.resulttracker_thread.join() - self.failuretracker_thread.join() - - def shutdown(self): - """Shut down the Executor. Suspend all waiting jobs. - - Running workers will terminate after finishing their current job items. - The call will block until all workers are terminated. - """ - with self.lock: - self.pool.inqueue.put(StopIteration) # Stop the pool workers - self.waitqueue.put(StopIteration) # Stop the input_feeder - _iterqueue(self.waitqueue) >> item[-1] # Exhaust the waitqueue - self.closed = True - self.join() - - def __repr__(self): - return '' % (self.pool.__class__.__name__, - self.pool.poolsize, - hex(id(self))) + """Provide a fine-grained level of control over a ThreadPool or ProcessPool. + + The constructor takes a pool class and arguments to its constructor:: + + >>> executor = Executor(ThreadPool, map(lambda x: x*x)) + + Job ids are returned when items are submitted:: + + >>> executor.submit(*range(10)) + range(0, 10) + >>> executor.submit('foo') + range(10, 11) + + A call to close() ends jobs submission. Workers threads/processes + are now allowed to terminate after all jobs are completed:: + + >>> executor.close() + + The `result` and `failure` attributes are Stream instances and + thus iterable. The returned iterators behave as follow: their + next() calls will block until a next output is available, or + raise StopIteration if there is no more output. Thus we could use + the attributes `result` and `failure` like any other iterables:: + + >>> set(executor.result) == set([0, 1, 4, 9, 16, 25, 36, 49, 64, 81]) + True + >>> list(executor.failure) + [('foo', TypeError("can't multiply sequence by non-int of type 'str'"))] + """ + def __init__(self, poolclass, function, poolsize=_nCPU, args=[], kwargs={}): + def process_job_id(input): + input, dupinput = itertools.tee(input) + id = iter(dupinput >> cut[0]) + input = iter(input >> cut[1]) + output = function(input, *args, **kwargs) + for item in output: + yield next(id), item + self.pool = poolclass(process_job_id, + poolsize=poolsize, + args=args, + kwargs=kwargs) + self.jobcount = 0 + self._status = [] + self.waitqueue = queue.Queue() + if poolclass is ProcessPool: + self.resultqueue = multiprocessing.SimpleQueue() + self.failqueue = multiprocessing.SimpleQueue() + else: + self.resultqueue = queue.Queue() + self.failqueue = queue.Queue() + self.result = Stream(_iterqueue(self.resultqueue)) + self.failure = Stream(_iterqueue(self.failqueue)) + self.closed = False + + self.lock = threading.Lock() + ## Acquired to submit and update job statuses. + + self.sema = threading.BoundedSemaphore(poolsize) + ## Used to throttle transfer from waitqueue to pool.inqueue, + ## acquired by input_feeder, released by trackers. + + def feed_input(): + for id, item in _iterqueue(self.waitqueue): + self.sema.acquire() + with self.lock: + if self._status[id] == 'SUBMITTED': + self.pool.inqueue.put((id, item)) + self._status[id] = 'RUNNING' + else: + self.sema.release() + self.pool.inqueue.put(StopIteration) + self.inputfeeder_thread = threading.Thread(target=feed_input) + self.inputfeeder_thread.start() + + def track_result(): + for id, item in self.pool: + self.sema.release() + with self.lock: + self._status[id] = 'COMPLETED' + self.resultqueue.put(item) + self.resultqueue.put(StopIteration) + self.resulttracker_thread = threading.Thread(target=track_result) + self.resulttracker_thread.start() + + def track_failure(): + for outval, exception in self.pool.failure: + self.sema.release() + id, item = outval + with self.lock: + self._status[id] = 'FAILED' + self.failqueue.put((item, exception)) + self.failqueue.put(StopIteration) + self.failuretracker_thread = threading.Thread(target=track_failure) + self.failuretracker_thread.start() + + def submit(self, *items): + """Return job ids assigned to the submitted items.""" + with self.lock: + if self.closed: + raise BrokenPipe('Job submission has been closed.') + id = self.jobcount + self._status += ['SUBMITTED'] * len(items) + self.jobcount += len(items) + for item in items: + self.waitqueue.put((id, item)) + id += 1 + #if len(items) == 1: + # return id - 1 + #else: + return range(id - len(items), id) + + def cancel(self, *ids): + """Try to cancel jobs with associated ids. + + Return the actual number of jobs cancelled. + """ + ncancelled = 0 + with self.lock: + for id in ids: + try: + if self._status[id] == 'SUBMITTED': + self._status[id] = 'CANCELLED' + ncancelled += 1 + except IndexError: + pass + return ncancelled + + def status(self, *ids): + """Return the statuses of jobs with associated ids at the + time of call: either 'SUBMITED', 'CANCELLED', 'RUNNING', + 'COMPLETED' or 'FAILED'. + """ + with self.lock: + if len(ids) > 1: + return [self._status[i] for i in ids] + else: + return self._status[ids[0]] + + def close(self): + """Signal that the executor will no longer accept job submission. + + Worker threads/processes are now allowed to terminate after all + jobs have been are completed. Without a call to close(), they will + stay around forever waiting for more jobs to come. + """ + with self.lock: + if self.closed: + return + self.waitqueue.put(StopIteration) + self.closed = True + + def join(self): + """Note that the Executor must be close()'d elsewhere, + or join() will never return. + """ + self.inputfeeder_thread.join() + self.pool.join() + self.resulttracker_thread.join() + self.failuretracker_thread.join() + + def shutdown(self): + """Shut down the Executor. Suspend all waiting jobs. + + Running workers will terminate after finishing their current job items. + The call will block until all workers are terminated. + """ + with self.lock: + self.pool.inqueue.put(StopIteration) # Stop the pool workers + self.waitqueue.put(StopIteration) # Stop the input_feeder + _iterqueue(self.waitqueue) >> item[-1] # Exhaust the waitqueue + self.closed = True + self.join() + + def __repr__(self): + return '' % (self.pool.__class__.__name__, + self.pool.poolsize, + hex(id(self))) #_____________________________________________________________________ @@ -1066,125 +1078,125 @@ def __repr__(self): class PCollector(Stream): - """Collect items from many ForkedFeeder's or ProcessPool's. - """ - def __init__(self): - self.inpipes = [] - def selrecv(): - while self.inpipes: - ready, _, _ = select.select(self.inpipes, [], []) - for inpipe in ready: - item = inpipe.recv() - if item is StopIteration: - del self.inpipes[self.inpipes.index(inpipe)] - else: - yield item - self.iterator = selrecv() - - def __pipe__(self, inpipe): - self.inpipes.append(inpipe.outpipe) - - def __repr__(self): - return '' % hex(id(self)) + """Collect items from many ForkedFeeder's or ProcessPool's. + """ + def __init__(self): + self.inpipes = [] + def selrecv(): + while self.inpipes: + ready, _, _ = select.select(self.inpipes, [], []) + for inpipe in ready: + item = inpipe.recv() + if item is StopIteration: + del self.inpipes[self.inpipes.index(inpipe)] + else: + yield item + self.iterator = selrecv() + + def __pipe__(self, inpipe): + self.inpipes.append(inpipe.outpipe) + + def __repr__(self): + return '' % hex(id(self)) class _PCollector(Stream): - """Collect items from many ForkedFeeder's or ProcessPool's. - - All input pipes are polled individually. When none is ready, the - collector sleeps for a fix duration before polling again. - """ - def __init__(self, waittime=0.1): - """waitime: the duration that the collector sleeps for - when all input pipes are empty - """ - self.inpipes = [] - self.waittime = waittime - def pollrecv(): - while self.inpipes: - ready = [p for p in self.inpipes if p.poll()] - for inpipe in ready: - item = inpipe.recv() - if item is StopIteration: - del self.inpipes[self.inpipes.index(inpipe)] - else: - yield item - self.iterator = pollrecv() - - def __pipe__(self, inpipe): - self.inpipes.append(inpipe.outpipe) - - def __repr__(self): - return '' % hex(id(self)) + """Collect items from many ForkedFeeder's or ProcessPool's. + + All input pipes are polled individually. When none is ready, the + collector sleeps for a fix duration before polling again. + """ + def __init__(self, waittime=0.1): + """waitime: the duration that the collector sleeps for + when all input pipes are empty + """ + self.inpipes = [] + self.waittime = waittime + def pollrecv(): + while self.inpipes: + ready = [p for p in self.inpipes if p.poll()] + for inpipe in ready: + item = inpipe.recv() + if item is StopIteration: + del self.inpipes[self.inpipes.index(inpipe)] + else: + yield item + self.iterator = pollrecv() + + def __pipe__(self, inpipe): + self.inpipes.append(inpipe.outpipe) + + def __repr__(self): + return '' % hex(id(self)) if sys.platform == "win32": - PCollector = _PCollector + PCollector = _PCollector class QCollector(Stream): - """Collect items from many ThreadedFeeder's or ThreadPool's. - - All input queues are polled individually. When none is ready, the - collector sleeps for a fix duration before polling again. - """ - def __init__(self, waittime=0.1): - """waitime: the duration that the collector sleeps for - when all input pipes are empty - """ - self.inqueues = [] - self.waittime = waittime - def nonemptyget(): - while self.inqueues: - ready = [q for q in self.inqueues if not q.empty()] - if not ready: - time.sleep(self.waittime) - for q in ready: - item = q.get() - if item is StopIteration: - del self.inqueues[self.inqueues.index(q)] - else: - yield item - self.iterator = nonemptyget() - - def __pipe__(self, inpipe): - self.inqueues.append(inpipe.outqueue) - - def __repr__(self): - return '' % hex(id(self)) + """Collect items from many ThreadedFeeder's or ThreadPool's. + + All input queues are polled individually. When none is ready, the + collector sleeps for a fix duration before polling again. + """ + def __init__(self, waittime=0.1): + """waitime: the duration that the collector sleeps for + when all input pipes are empty + """ + self.inqueues = [] + self.waittime = waittime + def nonemptyget(): + while self.inqueues: + ready = [q for q in self.inqueues if not q.empty()] + if not ready: + time.sleep(self.waittime) + for q in ready: + item = q.get() + if item is StopIteration: + del self.inqueues[self.inqueues.index(q)] + else: + yield item + self.iterator = nonemptyget() + + def __pipe__(self, inpipe): + self.inqueues.append(inpipe.outqueue) + + def __repr__(self): + return '' % hex(id(self)) class PSorter(Stream): - """Merge sorted input (smallest to largest) coming from many - ForkedFeeder's or ProcessPool's. - """ - def __init__(self): - self.inpipes = [] + """Merge sorted input (smallest to largest) coming from many + ForkedFeeder's or ProcessPool's. + """ + def __init__(self): + self.inpipes = [] - def __iter__(self): - return heapq.merge(*__builtin__.map(_iterrecv, self.inpipes)) - - def __pipe__(self, inpipe): - self.inpipes.append(inpipe.outpipe) + def __iter__(self): + return heapq.merge(*_map(_iterrecv, self.inpipes)) + + def __pipe__(self, inpipe): + self.inpipes.append(inpipe.outpipe) - def __repr__(self): - return '' % hex(id(self)) + def __repr__(self): + return '' % hex(id(self)) class QSorter(Stream): - """Merge sorted input (smallest to largest) coming from many - ThreadFeeder's or ThreadPool's. - """ - def __init__(self): - self.inqueues = [] - - def __iter__(self): - return heapq.merge(*__builtin__.map(_iterqueue, self.inqueues)) - - def __pipe__(self, inpipe): - self.inqueues.append(inpipe.outqueue) - - def __repr__(self): - return '' % hex(id(self)) + """Merge sorted input (smallest to largest) coming from many + ThreadFeeder's or ThreadPool's. + """ + def __init__(self): + self.inqueues = [] + + def __iter__(self): + return heapq.merge(*_map(_iterqueue, self.inqueues)) + + def __pipe__(self, inpipe): + self.inqueues.append(inpipe.outqueue) + + def __repr__(self): + return '' % hex(id(self)) #_____________________________________________________________________ @@ -1192,48 +1204,48 @@ def __repr__(self): def seq(start=0, step=1): - """An arithmetic sequence generator. Works with any type with + defined. + """An arithmetic sequence generator. Works with any type with + defined. - >>> seq(1, 0.25) >> item[:10] - [1, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25] - """ - def seq(a, d): - while 1: - yield a - a += d - return seq(start, step) + >>> seq(1, 0.25) >> item[:10] + [1, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25] + """ + def seq(a, d): + while 1: + yield a + a += d + return seq(start, step) def gseq(ratio, initval=1): - """A geometric sequence generator. Works with any type with * defined. + """A geometric sequence generator. Works with any type with * defined. - >>> from decimal import Decimal - >>> gseq(Decimal('.2')) >> item[:4] - [1, Decimal('0.2'), Decimal('0.04'), Decimal('0.008')] - """ - while 1: - yield initval - initval *= ratio + >>> from decimal import Decimal + >>> gseq(Decimal('.2')) >> item[:4] + [1, Decimal('0.2'), Decimal('0.04'), Decimal('0.008')] + """ + while 1: + yield initval + initval *= ratio def repeatcall(func, *args): - """Repeatedly call func(*args) and yield the result. - - Useful when func(*args) returns different results, esp. randomly. - """ - return itertools.starmap(func, itertools.repeat(args)) + """Repeatedly call func(*args) and yield the result. + + Useful when func(*args) returns different results, esp. randomly. + """ + return itertools.starmap(func, itertools.repeat(args)) def chaincall(func, initval): - """Yield func(initval), func(func(initval)), etc. - - >>> chaincall(lambda x: 3*x, 2) >> take(10) - Stream([2, 6, 18, 54, 162, 486, 1458, 4374, 13122, 39366]) - """ - x = initval - while 1: - yield x - x = func(x) + """Yield func(initval), func(func(initval)), etc. + + >>> chaincall(lambda x: 3*x, 2) >> take(10) + Stream([2, 6, 18, 54, 162, 486, 1458, 4374, 13122, 39366]) + """ + x = initval + while 1: + yield x + x = func(x) #_____________________________________________________________________ @@ -1241,36 +1253,36 @@ def chaincall(func, initval): def maximum(key): - """ - Curried version of the built-in max. - - >>> Stream([3, 5, 28, 42, 7]) >> maximum(lambda x: x%28) - 42 - """ - return lambda s: max(s, key=key) + """ + Curried version of the built-in max. + + >>> Stream([3, 5, 28, 42, 7]) >> maximum(lambda x: x%28) + 42 + """ + return lambda s: max(s, key=key) def minimum(key): - """ - Curried version of the built-in min. - - >>> Stream([[13, 52], [28, 35], [42, 6]]) >> minimum(lambda v: v[0] + v[1]) - [42, 6] - """ - return lambda s: min(s, key=key) + """ + Curried version of the built-in min. + + >>> Stream([[13, 52], [28, 35], [42, 6]]) >> minimum(lambda v: v[0] + v[1]) + [42, 6] + """ + return lambda s: min(s, key=key) def reduce(function, initval=None): - """ - Curried version of the built-in reduce. - - >>> reduce(lambda x,y: x+y)( [1, 2, 3, 4, 5] ) - 15 - """ - if initval is None: - return lambda s: __builtin__.reduce(function, s) - else: - return lambda s: __builtin__.reduce(function, s, initval) + """ + Curried version of the built-in reduce. + + >>> reduce(lambda x,y: x+y)( [1, 2, 3, 4, 5] ) + 15 + """ + if initval is None: + return lambda s: _reduce(function, s) + else: + return lambda s: _reduce(function, s, initval) #_____________________________________________________________________ @@ -1278,7 +1290,7 @@ def reduce(function, initval=None): if __name__ == "__main__": - import doctest - if doctest.testmod()[0]: - import sys - sys.exit(1) + import doctest + if doctest.testmod()[0]: + import sys + sys.exit(1) diff --git a/test/asyncpool.py b/test/asyncpool.py index 84cb010..76afd75 100644 --- a/test/asyncpool.py +++ b/test/asyncpool.py @@ -16,23 +16,24 @@ dataset = [] def alternating(n): - values = [] - for i in range(1, n+1): - values.append(i) - values.append(-i) - return values + values = [] + for i in range(1, n+1): + values.append(i) + values.append(-i) + return values def randomized(n): - values = [] - for _ in range(n): - values.append(randint(-sys.maxint, sys.maxint)) - return values + maxint = (1<<64)-1 + values = [] + for _ in range(n): + values.append(randint(-maxint, maxint)) + return values for v in [10, 100, 1000] >> map(alternating): - dataset.append(v) + dataset.append(v) for v in [10, 100, 1000] >> map(randomized): - dataset.append(v) + dataset.append(v) func = filter(lambda x: x&1) @@ -42,27 +43,27 @@ def randomized(n): ## Test scenario def threadpool(i): - result = dataset[i] >> ThreadPool(func, poolsize=2) >> set - pprint(result) - assert result == resultset[i] + result = dataset[i] >> ThreadPool(func, poolsize=2) >> set + pprint(result) + assert result == resultset[i] def processpool(i): - result = dataset[i] >> ProcessPool(func, poolsize=2) >> set - pprint(result) - assert result == resultset[i] + result = dataset[i] >> ProcessPool(func, poolsize=2) >> set + pprint(result) + assert result == resultset[i] ## Test cases def test_ThreadPool(): - for i in range(len(dataset)): - yield threadpool, i + for i in range(len(dataset)): + threadpool(i) def test_ProcessPool(): - for i in range(len(dataset)): - yield processpool, i + for i in range(len(dataset)): + processpool(i) if __name__ == '__main__': - import nose - nose.main() + import nose + nose.main() diff --git a/test/collector.py b/test/collector.py index 7713947..5391044 100644 --- a/test/collector.py +++ b/test/collector.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2.6 +#!/usr/bin/env python import os, sys @@ -14,7 +14,8 @@ N = 1000 def producer(): - for x in xrange(N): + # TODO: throw an error here to test how robust the collector is + for x in range(N): yield x def collect(feeder_class, collector_class, n): @@ -24,18 +25,18 @@ def collect(feeder_class, collector_class, n): results = consumer >> list pprint(results) assert len(results) == N * n - assert set(results) == set(xrange(N)) + assert set(results) == set(range(N)) ## Test cases def test_PCollector(): for i in [1, 2, 3, 4]: - yield collect, ForkedFeeder, PCollector, i + collect( ForkedFeeder, PCollector, i ) def test_QCollector(): for i in [1, 2, 3, 4]: - yield collect, ThreadedFeeder, QCollector, i + collect( ThreadedFeeder, QCollector, i ) if __name__ == '__main__': diff --git a/test/executor.py b/test/executor.py index 8280d2c..cf71041 100644 --- a/test/executor.py +++ b/test/executor.py @@ -26,11 +26,11 @@ def submit(poolclass, n): def test_ThreadPool_submit(): for n in result.keys(): - yield submit, ThreadPool, n + submit(ThreadPool, n) def test_ProcessPool_submit(): for n in result.keys(): - yield submit, ProcessPool, n + submit(ProcessPool, n) ## Test concurrent submission and cancellation @@ -46,16 +46,16 @@ def cancel(poolclass, n): t2.join() e.close() completed = len(e.result >> list) - print completed, cancelled + print(completed, cancelled) assert completed + cancelled == n def test_ThreadPool_cancel(): for n in result.keys(): - yield cancel, ThreadPool, n + cancel(ThreadPool, n) def test_ProcessPool_cancel(): for n in result.keys(): - yield cancel, ProcessPool, n + cancel(ProcessPool, n) ## Test shutdown @@ -64,18 +64,18 @@ def shutdown(poolclass, n): e = Executor(poolclass, map(lambda x: x*x), poolsize=2) e.submit(*range(n)) e.shutdown() - print e.result >> list + print(e.result >> list) assert e.inputfeeder_thread.is_alive() == False assert e.resulttracker_thread.is_alive() == False assert e.failuretracker_thread.is_alive() == False def test_ThreadPool_shutdown(): for n in result.keys(): - yield shutdown, ThreadPool, n + shutdown(ThreadPool, n) def test_ProcessPool_shutdown(): for n in result.keys(): - yield shutdown, ProcessPool, n + shutdown(ProcessPool, n) if __name__ == "__main__": diff --git a/test/sorter.py b/test/sorter.py index fbfc2e6..c8d8399 100644 --- a/test/sorter.py +++ b/test/sorter.py @@ -9,14 +9,18 @@ def test_PSorter(): sorter = PSorter() - ForkedFeeder(lambda: iter(xrange(10))) >> sorter - ForkedFeeder(lambda: iter(xrange(0, 20, 2))) >> sorter + # TODO: throw an error in the iter function to test robust PSorter behavior + #ForkedFeeder(lambda: iter(yrange(10))) >> sorter + ForkedFeeder(lambda: iter(range(10))) >> sorter + ForkedFeeder(lambda: iter(range(0, 20, 2))) >> sorter assert sorter >> list == [0, 0, 1, 2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 12, 14, 16, 18] def test_QSorter(): sorter = QSorter() - ThreadedFeeder(lambda: iter(xrange(10))) >> sorter - ThreadedFeeder(lambda: iter(xrange(0, 20, 2))) >> sorter + # TODO: throw an error in the iter function to test robust QSorter behavior + #ThreadedFeeder(lambda: iter(zrange(10))) >> sorter + ThreadedFeeder(lambda: iter(range(10))) >> sorter + ThreadedFeeder(lambda: iter(range(0, 20, 2))) >> sorter assert sorter >> list == [0, 0, 1, 2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 12, 14, 16, 18] From a89c7d156571d9a1c13d97607cf9a0c7a273195f Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Mon, 2 Sep 2024 23:32:28 -0400 Subject: [PATCH 02/12] Removed extra closures. --- stream.py | 119 +++++++++++++++++++++++++----------------------------- 1 file changed, 56 insertions(+), 63 deletions(-) diff --git a/stream.py b/stream.py index c6ab06a..ab23b6e 100644 --- a/stream.py +++ b/stream.py @@ -308,8 +308,7 @@ def __init__(self, indices): self.indexiter = iter(indices) def __call__(self, iterator): - def itaker(): - try: + try: old_idx = -1 idx = next(self.indexiter) # next value to yield counter = seq() @@ -322,9 +321,8 @@ def itaker(): yield elem old_idx = idx idx = next(self.indexiter) - except StopIteration: - pass - return itaker() + except StopIteration: + pass class drop(Stream): @@ -348,6 +346,8 @@ class dropi(Stream): >>> seq() >> dropi(seq(0,3)) >> item[:10] [1, 2, 4, 5, 7, 8, 10, 11, 13, 14] + >>> "abcd" >> dropi(range(1,3)) >> reduce(lambda a,b: a+b) + 'ad' """ def __init__(self, indices): """indices: an iterable of indices to be dropped, should yield @@ -357,28 +357,27 @@ def __init__(self, indices): self.indexiter = iter(indices) def __call__(self, iterator): - def idropper(): - counter = seq() - def try_next_idx(): - ## so that the stream keeps going - ## after the discard iterator is exhausted - try: - return next(self.indexiter), False - except StopIteration: - return -1, True - old_idx = -1 - idx, exhausted = try_next_idx() # next value to discard - while 1: - c = next(counter) - elem = next(iterator) - while not exhausted and idx <= old_idx: # ignore bad values - idx, exhausted = try_next_idx() - if c != idx: - yield elem - elif not exhausted: - old_idx = idx - idx, exhausted = try_next_idx() - return idropper() + counter = seq() + def try_next_idx(): + ## so that the stream keeps going + ## after the discard iterator is exhausted + try: + return next(self.indexiter), False + except StopIteration: + return -1, True + old_idx = -1 + idx, exhausted = try_next_idx() # next value to discard + while not exhausted: + c = next(counter) + elem = next(iterator) + while not exhausted and idx <= old_idx: # ignore bad values + idx, exhausted = try_next_idx() + if c != idx: + yield elem + elif not exhausted: + old_idx = idx + idx, exhausted = try_next_idx() + yield from iterator #_______________________________________________________________________ @@ -506,16 +505,14 @@ def __init__(self, function, initval=None): self.initval = initval def __call__(self, iterator): - def folder(): - if self.initval: - accumulated = self.initval - else: - accumulated = next(iterator) - while 1: - yield accumulated - val = next(iterator) - accumulated = self.function(accumulated, val) - return folder() + if self.initval: + accumulated = self.initval + else: + accumulated = next(iterator) + yield accumulated + for val in iterator: + accumulated = self.function(accumulated, val) + yield accumulated #_____________________________________________________________________ @@ -534,14 +531,12 @@ def __init__(self, n): self.n = n def __call__(self, iterator): - def chopper(): - while 1: - s = iterator >> item[:self.n] - if s: - yield s - else: - break - return chopper() + while 1: + s = iterator >> item[:self.n] + if s: + yield s + else: + break class itemcutter(map): @@ -572,24 +567,22 @@ class flattener(Stream): """ @staticmethod def __call__(iterator): - def flatten(): - ## Maintain a LIFO stack of iterators - stack = [] - i = iterator - while True: + ## Maintain a LIFO stack of iterators + stack = [] + i = iterator + while True: + try: + e = next(i) + if hasattr(e, "__iter__") and not isinstance(e, str): + stack.append(i) + i = iter(e) + else: + yield e + except StopIteration: try: - e = next(i) - if hasattr(e, "__iter__") and not isinstance(e, str): - stack.append(i) - i = iter(e) - else: - yield e - except StopIteration: - try: - i = stack.pop() - except IndexError: - break - return flatten() + i = stack.pop() + except IndexError: + break def __repr__(self): return '' % hex(id(self)) @@ -804,7 +797,7 @@ def cleanup(): def __call__(self, inpipe): if self.closed: - raise BrokenPipe('All workers are dead, refusing to summit jobs. ' + raise BrokenPipe('All workers are dead, refusing to submit jobs. ' 'Use another Pool.') def feed(): for item in inpipe: From 42d2d3f9f6cbc1e380b3770a496e97ba8e0a93fd Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Fri, 25 Oct 2024 07:50:21 -0400 Subject: [PATCH 03/12] Switched to literal version. --- setup.py | 1 + stream.py | 7 +------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index 210c639..031a65c 100755 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ setup( name='stream', + # Remember to also change stream.py:__version__ on update! version='0.9.0', description=__doc__.split('\n', 1)[0], long_description = __doc__, diff --git a/stream.py b/stream.py index ab23b6e..96ba7aa 100644 --- a/stream.py +++ b/stream.py @@ -118,12 +118,7 @@ def methodcaller(methodname, *args, **kwargs): return lambda o: getattr(o, methodname)(*args, **kwargs) -import pkg_resources - -try: - __version__ = pkg_resources.get_distribution('stream').version -except Exception: - __version__ = 'unknown' +__version__ = '0.9.0' #_____________________________________________________________________ From a59ff12451198e79ed45a1fcd8126771eb6bf64c Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Fri, 25 Oct 2024 11:07:51 -0400 Subject: [PATCH 04/12] Added github tests. --- .github/workflows/python-package.yml | 46 ++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 .github/workflows/python-package.yml diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml new file mode 100644 index 0000000..2b07d20 --- /dev/null +++ b/.github/workflows/python-package.yml @@ -0,0 +1,46 @@ +name: Python package + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + +jobs: + build: + + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + python-version: ["3.10"] + #os: ["ubuntu-latest", "macos-latest"] + #python-version: ["3.3", "3.8", "3.10", "3.12"] + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + + - name: Test doctests + run: | + python stream.py + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install . + + - name: Test with pytest + run: | + pytest test/*.py + +# - name: Upload coverage to Codecov +# if: matrix.python-version == 3.12 && startsWith(matrix.os, 'ubuntu') +# uses: codecov/codecov-action@v3 +# env: +# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} From c1e970e4921a65184c739dfa8ea6088c466c335b Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Fri, 25 Oct 2024 11:07:51 -0400 Subject: [PATCH 05/12] Added github tests. --- .github/workflows/python-package.yml | 47 ++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 .github/workflows/python-package.yml diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml new file mode 100644 index 0000000..70d70fe --- /dev/null +++ b/.github/workflows/python-package.yml @@ -0,0 +1,47 @@ +name: Python package + +on: + push: + branches: [ "master", "updates" ] + pull_request: + branches: [ "master" ] + +jobs: + build: + + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + python-version: ["3.10"] + #os: ["ubuntu-latest", "macos-latest"] + #python-version: ["3.3", "3.8", "3.10", "3.12"] + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + + - name: Test doctests + run: | + python stream.py + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install pytest + python -m pip install . + + - name: Test with pytest + run: | + pytest test/*.py + +# - name: Upload coverage to Codecov +# if: matrix.python-version == 3.12 && startsWith(matrix.os, 'ubuntu') +# uses: codecov/codecov-action@v3 +# env: +# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} From 19992b0a4cf6d5b63bc8f909dbf7ff80075535bd Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Tue, 5 Nov 2024 19:10:17 -0500 Subject: [PATCH 06/12] Additional python versions and OS-es tested. --- .github/workflows/python-package.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2b07d20..43db172 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -13,10 +13,10 @@ jobs: strategy: fail-fast: false matrix: - os: ["ubuntu-latest"] - python-version: ["3.10"] - #os: ["ubuntu-latest", "macos-latest"] - #python-version: ["3.3", "3.8", "3.10", "3.12"] + #os: ["ubuntu-latest"] + #python-version: ["3.10"] + os: ["ubuntu-latest", "macos-latest"] + python-version: ["3.3", "3.8", "3.10", "3.12"] steps: - uses: actions/checkout@v3 From d256114ddd43d1d6660b7d0434fcdfc887f47c45 Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Tue, 5 Nov 2024 21:06:22 -0500 Subject: [PATCH 07/12] Lifted out ProcessPool.__init__.work --- .github/workflows/python-package.yml | 2 +- stream.py | 31 +++++++++++++++++----------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index c7d54c5..879779c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -14,7 +14,7 @@ jobs: fail-fast: false matrix: os: ["ubuntu-latest", "macos-latest"] - python-version: ["3.3", "3.8", "3.10", "3.12"] + python-version: ["3.3.7", "3.8", "3.10", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/stream.py b/stream.py index 96ba7aa..12fbfe2 100644 --- a/stream.py +++ b/stream.py @@ -808,6 +808,18 @@ def join(self): def __repr__(self): return '' % (self.poolsize, hex(id(self))) +def process_queue(func, inqueue, outqueue, failqueue, + args, kwargs): + inp, dupinput = itertools.tee(_iterqueue(inqueue)) + output = func(inp, *args, **kwargs) + while 1: + try: + outqueue.put(next(output)) + next(dupinput) + except StopIteration: + break + except Exception as e: + failqueue.put((next(dupinput), e)) class ProcessPool(Stream): """Work on the input stream asynchronously using a pool of processes. @@ -834,20 +846,15 @@ def __init__(self, function, poolsize=_nCPU, args=[], kwargs={}): self.failqueue = multiprocessing.SimpleQueue() self.failure = Stream(_iterqueue(self.failqueue)) self.closed = False - def work(): - input, dupinput = itertools.tee(_iterqueue(self.inqueue)) - output = self.function(input, *args, **kwargs) - while 1: - try: - self.outqueue.put(next(output)) - next(dupinput) - except StopIteration: - break - except Exception as e: - self.failqueue.put((next(dupinput), e)) self.worker_processes = [] for _ in range(self.poolsize): - p = multiprocessing.Process(target=work) + p = multiprocessing.Process(target=process_queue, + args=(self.function, + self.inqueue, + self.outqueue, + self.failqueue, + args, + kwargs)) self.worker_processes.append(p) p.start() def cleanup(): From 125cf96fcec32dbfe4089035984026a9f0c45bbe Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Tue, 5 Nov 2024 21:25:45 -0500 Subject: [PATCH 08/12] Removed lambda from example (OSX can't pickle it) --- stream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/stream.py b/stream.py index 12fbfe2..5ed28e8 100644 --- a/stream.py +++ b/stream.py @@ -824,8 +824,9 @@ def process_queue(func, inqueue, outqueue, failqueue, class ProcessPool(Stream): """Work on the input stream asynchronously using a pool of processes. - >>> range(10) >> ProcessPool(map(lambda x: x*x)) >> sum - 285 + >>> from math import sqrt + >>> range(10) >> ProcessPool(map(sqrt)) >> sum + 19.30600052603572 The pool object is an iterable over the output values. If an input value causes an Exception to be raised, the tuple (value, From 9f0d4fa5104f0d887e3d6174e994fde4a5275e16 Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Tue, 5 Nov 2024 21:39:03 -0500 Subject: [PATCH 09/12] Removed lambda-s from test/sorter.py --- .github/workflows/python-package.yml | 4 ++-- stream.py | 6 +++--- test/sorter.py | 10 ++++++---- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 879779c..3ebaef0 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -13,8 +13,8 @@ jobs: strategy: fail-fast: false matrix: - os: ["ubuntu-latest", "macos-latest"] - python-version: ["3.3.7", "3.8", "3.10", "3.12"] + os: ["ubuntu-20.04", "macos-12", "windows-2019"] + python-version: ["3.5", "3.8", "3.10", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/stream.py b/stream.py index 5ed28e8..e793e0f 100644 --- a/stream.py +++ b/stream.py @@ -824,9 +824,9 @@ def process_queue(func, inqueue, outqueue, failqueue, class ProcessPool(Stream): """Work on the input stream asynchronously using a pool of processes. - >>> from math import sqrt - >>> range(10) >> ProcessPool(map(sqrt)) >> sum - 19.30600052603572 + >>> def square(x): return x*x + >>> range(10) >> ProcessPool(map(square)) >> sum + 285 The pool object is an iterable over the output values. If an input value causes an Exception to be raised, the tuple (value, diff --git a/test/sorter.py b/test/sorter.py index c8d8399..62f4e80 100644 --- a/test/sorter.py +++ b/test/sorter.py @@ -6,21 +6,23 @@ from stream import ForkedFeeder, ThreadedFeeder, PSorter, QSorter +def irange(*args): + return iter(range(*args)) def test_PSorter(): sorter = PSorter() # TODO: throw an error in the iter function to test robust PSorter behavior #ForkedFeeder(lambda: iter(yrange(10))) >> sorter - ForkedFeeder(lambda: iter(range(10))) >> sorter - ForkedFeeder(lambda: iter(range(0, 20, 2))) >> sorter + ForkedFeeder(irange, 10) >> sorter + ForkedFeeder(irange, 0, 20, 2) >> sorter assert sorter >> list == [0, 0, 1, 2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 12, 14, 16, 18] def test_QSorter(): sorter = QSorter() # TODO: throw an error in the iter function to test robust QSorter behavior #ThreadedFeeder(lambda: iter(zrange(10))) >> sorter - ThreadedFeeder(lambda: iter(range(10))) >> sorter - ThreadedFeeder(lambda: iter(range(0, 20, 2))) >> sorter + ThreadedFeeder(irange, 10) >> sorter + ThreadedFeeder(irange, 0, 20, 2) >> sorter assert sorter >> list == [0, 0, 1, 2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 12, 14, 16, 18] From 0163b314987a9352bc1509eb5dc34e1107dd876e Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Tue, 5 Nov 2024 21:57:14 -0500 Subject: [PATCH 10/12] Removed function defs from doctests. --- .github/workflows/python-package.yml | 2 +- stream.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 3ebaef0..f5232e6 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -14,7 +14,7 @@ jobs: fail-fast: false matrix: os: ["ubuntu-20.04", "macos-12", "windows-2019"] - python-version: ["3.5", "3.8", "3.10", "3.12"] + python-version: ["3.6", "3.8", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/stream.py b/stream.py index e793e0f..0983730 100644 --- a/stream.py +++ b/stream.py @@ -417,9 +417,8 @@ class map(Stream): """Invoke a function using each element of the input stream as its only argument, a la `map` - >>> square = lambda x: x*x - >>> range(10) >> map(square) >> list - [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] + >>> range(65, 75) >> map(chr) >> ','.join + 'A,B,C,D,E,F,G,H,I,J' """ def __init__(self, function): """function: to be called with each stream element as its @@ -824,9 +823,8 @@ def process_queue(func, inqueue, outqueue, failqueue, class ProcessPool(Stream): """Work on the input stream asynchronously using a pool of processes. - >>> def square(x): return x*x - >>> range(10) >> ProcessPool(map(square)) >> sum - 285 + >>> seq(122,0) >> take(3) >> ProcessPool(map(chr)) >> ''.join + 'zzz' The pool object is an iterable over the output values. If an input value causes an Exception to be raised, the tuple (value, From 06b2ffff15fe43be5e5b013ca28952794a2e7e91 Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Tue, 5 Nov 2024 22:37:25 -0500 Subject: [PATCH 11/12] Why software fails - a lesson in quirks and corner cases. --- .github/workflows/python-package.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f5232e6..bc08fec 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -1,5 +1,9 @@ name: Python package +defaults: + run: + shell: sh + on: push: branches: [ "master", "updates" ] @@ -8,7 +12,6 @@ on: jobs: build: - runs-on: ${{ matrix.os }} strategy: fail-fast: false From 0f3ce1a984793ee2a701ff91e86949c6fe373973 Mon Sep 17 00:00:00 2001 From: "David M. Rogers" Date: Tue, 5 Nov 2024 23:00:21 -0500 Subject: [PATCH 12/12] More pickle terror. --- .github/workflows/python-package.yml | 2 +- stream.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index bc08fec..ad5548b 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -17,7 +17,7 @@ jobs: fail-fast: false matrix: os: ["ubuntu-20.04", "macos-12", "windows-2019"] - python-version: ["3.6", "3.8", "3.12"] + python-version: ["3.8", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/stream.py b/stream.py index 0983730..a310b5f 100644 --- a/stream.py +++ b/stream.py @@ -698,6 +698,14 @@ def join(self): def __repr__(self): return '' % hex(id(self)) +def _feed(generator, inpipe, args, kwargs): + i = generator(*args, **kwargs) + while 1: + try: + inpipe.send(next(i)) + except StopIteration: + inpipe.send(StopIteration) + break class ForkedFeeder(Iterable): def __init__(self, generator, *args, **kwargs): @@ -712,15 +720,11 @@ def __init__(self, generator, *args, **kwargs): be costly. """ self.outpipe, inpipe = multiprocessing.Pipe(duplex=False) - def feed(): - i = generator(*args, **kwargs) - while 1: - try: - inpipe.send(next(i)) - except StopIteration: - inpipe.send(StopIteration) - break - self.process = multiprocessing.Process(target=feed) + self.process = multiprocessing.Process(target=_feed, + args=(generator, + inpipe, + args, + kwargs)) self.process.start() def __iter__(self):