-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_streeam.py
More file actions
26 lines (19 loc) · 791 Bytes
/
data_streeam.py
File metadata and controls
26 lines (19 loc) · 791 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# =========================================
# PYTHON SNIPPETS
# CREATING YOUR OWN DATA STREAM IN PYTHON
# =========================================
from __future__ import annotations
from typing import Callable, TextIO
def processor(reader: TextIO, converter: Callable[[str], str], writer: TextIO) -> int:
"""Stream lines from reader → converter → writer. Returns number of lines processed."""
if not callable(converter):
raise TypeError("converter must be callable")
count = 0
for line in reader:
try:
writer.write(converter(line))
except Exception as e:
raise RuntimeError(f"processor failed on line {count + 1}") from e
count += 1
writer.flush()
return count