Complete API reference for PyFerris - a high-performance parallel processing library for Python.
Apply a function to every item in an iterable in parallel.
Parameters:
func(callable): Function to apply to each itemiterable(iterable): Input data to processchunk_size(int, optional): Size of chunks for parallel processingprogress(ProgressTracker, optional): Progress tracking object
Returns:
- Iterator of results
Example:
from pyferris import parallel_map
results = parallel_map(lambda x: x**2, range(1000))Filter elements in parallel based on a predicate function.
Parameters:
predicate(callable): Function that returns True/Falseiterable(iterable): Input data to filterchunk_size(int, optional): Size of chunks for parallel processingprogress(ProgressTracker, optional): Progress tracking object
Returns:
- Iterator of filtered results
Apply a reduction function to an iterable in parallel.
Parameters:
function(callable): Reduction function taking two argumentsiterable(iterable): Input data to reduceinitial(any, optional): Initial value for reductionchunk_size(int, optional): Size of chunks for parallel processing
Returns:
- Single reduced result
Apply a function to arguments unpacked from tuples in parallel.
Parameters:
function(callable): Function to applyiterable(iterable): Iterable of tuples containing function argumentschunk_size(int, optional): Size of chunks for parallel processingprogress(ProgressTracker, optional): Progress tracking object
Returns:
- Iterator of results
Sort a large dataset in parallel using merge sort.
Parameters:
iterable(iterable): Data to sortkey(callable, optional): Key function for sortingreverse(bool): Sort in descending order if Truechunk_size(int, optional): Size of chunks for parallel processing
Returns:
- List of sorted items
Group elements by a key function in parallel.
Parameters:
iterable(iterable): Data to groupkey(callable): Key function for groupingchunk_size(int, optional): Size of chunks for parallel processing
Returns:
- Dictionary mapping keys to lists of values
Remove duplicates from a dataset in parallel.
Parameters:
iterable(iterable): Input datakey(callable, optional): Key function for uniqueness comparisonchunk_size(int, optional): Size of chunks for parallel processing
Returns:
- List of unique items
Partition data into two groups based on a predicate.
Parameters:
iterable(iterable): Data to partitionpredicate(callable): Function returning True/Falsechunk_size(int, optional): Size of chunks for parallel processing
Returns:
- Tuple of (true_items, false_items)
Split data into chunks for batch processing.
Parameters:
iterable(iterable): Data to chunkchunk_size(int): Size of each chunk
Returns:
- List of chunks
Process large datasets in configurable batches.
class BatchProcessor:
def __init__(self, batch_size=1000, max_memory_mb=100, progress=True)
def process(self, iterable, process_function)Methods:
process(iterable, process_function): Process data in batches
Track progress of parallel operations.
class ProgressTracker:
def __init__(self, total, desc="Processing", update_frequency=1, show_eta=True, show_speed=True)Parameters:
total(int): Total number of items to processdesc(str): Description for progress barupdate_frequency(int): Update frequency for progress displayshow_eta(bool): Show estimated time to completionshow_speed(bool): Show processing speed
Collect and manage results from parallel operations.
class ResultCollector:
def __init__(self, max_size=10000, auto_save=False, save_path=None)
def add(self, result)
def get_results(self)
def filter(self, predicate)
def save_to_file(self, path)Get the current default chunk size for parallel operations.
Set the default chunk size for parallel operations.
Parameters:
size(int): New chunk size
Get the current number of worker threads.
Set the number of worker threads.
Parameters:
count(int): Number of worker threads
Configuration class for PyFerris settings.
class Config:
@staticmethod
def get_optimal_chunk_size(iterable_size, operation_type="default")
@staticmethod
def auto_configure(workload_type="balanced")Advanced task execution and thread pool management.
class Executor:
def __init__(self, max_workers=None, queue_size=1000, thread_name_prefix="PyFerris-Worker")
def submit(self, fn, *args, **kwargs)
def map(self, fn, iterable, chunksize=1)
def shutdown(self, wait=True)Methods:
submit(fn, *args, **kwargs): Submit a single taskmap(fn, iterable, chunksize=1): Map function over iterableshutdown(wait=True): Shutdown the executor
Read a single file.
Write content to a file.
Read multiple files in parallel.
Write multiple files in parallel.
Parameters:
file_data(List[Tuple[str, str]]): List of (path, content) tuples
Read CSV file into list of dictionaries.
Write data to CSV file.
Read large CSV file in chunks.
Read JSON file.
Write data to JSON file.
Read multiple JSON files in parallel.
Write multiple JSON files in parallel.
Read JSON Lines format file.
Write records in JSON Lines format.
Read and process multiple files in parallel.
Process files in batches.
Read large file as stream of chunks.
Asynchronous task executor.
class AsyncExecutor:
def __init__(self, max_workers=None)
async def submit(self, coro)
async def map(self, coro_func, iterable)
async def shutdown()Asynchronous parallel map operation.
Asynchronous parallel filter operation.
Wrapper for asynchronous tasks.
class AsyncTask:
def __init__(self, coro)
async def result()
def done()
def cancel()Base class for shared arrays.
class SharedArray:
def __init__(self, data=None, size=None)
def __getitem__(self, index)
def __setitem__(self, index, value)
def __len__()
def to_list()Typed shared arrays for integers, strings, and objects respectively.
Create a shared array from data.
Thread-safe shared dictionary.
class SharedDict:
def __init__(self, initial_data=None)
def get(self, key, default=None)
def put(self, key, value)
def keys()
def values()
def items()Thread-safe shared queue.
class SharedQueue:
def __init__(self, maxsize=0)
def put(self, item, block=True, timeout=None)
def get(self, block=True, timeout=None)
def empty()
def full()
def qsize()Thread-safe shared counter.
class SharedCounter:
def __init__(self, initial_value=0)
def increment(self, amount=1)
def decrement(self, amount=1)
def value()
def reset()Work-stealing task scheduler.
class WorkStealingScheduler:
def __init__(self, num_workers=None)
def submit(self, task, priority=TaskPriority.NORMAL)
def shutdown()Round-robin task scheduler.
Adaptive task scheduler that adjusts based on workload.
Priority-based task scheduler.
Enumeration for task priorities.
class TaskPriority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
URGENT = 4Execute function with specified priority.
Create a priority task.
Thread-safe hash map.
class ConcurrentHashMap:
def __init__(self, initial_capacity=16)
def put(self, key, value)
def get(self, key, default=None)
def remove(self, key)
def size()Lock-free queue implementation.
class LockFreeQueue:
def __init__(self)
def enqueue(self, item)
def dequeue()
def is_empty()Atomic counter for thread-safe counting.
class AtomicCounter:
def __init__(self, initial_value=0)
def increment()
def decrement()
def get()
def set(self, value)Reader-writer lock protected dictionary.
class RwLockDict:
def __init__(self)
def read(self, key, default=None)
def write(self, key, value)
def read_all()Memory pool for efficient allocation.
class MemoryPool:
def __init__(self, block_size=1024, initial_blocks=10)
def allocate()
def deallocate(self, block)
def get_stats()Create memory-mapped array.
Create 2D memory-mapped array.
Get information about memory-mapped array.
Create temporary memory-mapped array.
Intelligent cache with configurable eviction policies.
class SmartCache:
def __init__(self, max_size=1000, policy=EvictionPolicy.LRU, ttl=None)
def get(self, key, default=None)
def put(self, key, value)
def evict(self, key)
def clear()
def size()
def hit_rate()Cache eviction policies.
class EvictionPolicy(Enum):
LRU = "lru" # Least Recently Used
LFU = "lfu" # Least Frequently Used
FIFO = "fifo" # First In, First Out
RANDOM = "random" # Random eviction
ADAPTIVE = "adaptive" # Adaptive policyDecorator for function caching.
Example:
@cached(max_size=100, policy=EvictionPolicy.LRU)
def expensive_function(x):
return x ** 2Distributed computing cluster.
class DistributedCluster:
def __init__(self, nodes=None, coordinator_host='localhost', coordinator_port=8080)
def add_node(self, host, port)
def remove_node(self, node_id)
def get_status()
def shutdown()Create a distributed cluster.
Distributed map operation.
Distributed filter operation.
Distributed reduce operation.
Asynchronous distributed map operation.
Manage distributed cluster operations.
Load balancing for distributed tasks.
Executor for distributed task execution.
Batch processing across distributed nodes.
Chainable data processing pipeline.
class Pipeline:
def __init__(self, initial_data=None)
def map(self, func, parallel=True)
def filter(self, predicate, parallel=True)
def reduce(self, function, initial=None)
def sort(self, key=None, reverse=False)
def group_by(self, key)
def collect()
def to_list()Chain multiple operations together.
class Chain:
def __init__(self, operations=None)
def add(self, operation)
def execute(self, data)Apply pipeline to iterable.
Base exception class for PyFerris.
Exceptions related to executor operations.
Exceptions related to memory operations.
Exceptions related to I/O operations.
Exceptions related to distributed operations.
PyFerris includes comprehensive type hints. Import them as:
from pyferris.types import (
ParallelFunction,
ReduceFunction,
PredicateFunction,
KeyFunction,
ProcessFunction
)DEFAULT_CHUNK_SIZE = 1000
DEFAULT_WORKER_COUNT = os.cpu_count()
DEFAULT_QUEUE_SIZE = 1000
DEFAULT_CACHE_SIZE = 1000MAX_WORKERS = 128
MIN_CHUNK_SIZE = 1
MAX_CHUNK_SIZE = 100000
DEFAULT_TIMEOUT = 300 # secondsThis API reference provides a comprehensive overview of all PyFerris functionality. For detailed examples and usage patterns, see the specific module documentation and the Examples section.