-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
57 lines (42 loc) · 1.49 KB
/
example.py
File metadata and controls
57 lines (42 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""示例:异步数据管道 — 拉取、变换、存储"""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import AsyncIterator
@dataclass
class Event:
ts: datetime
kind: str
payload: dict = field(default_factory=dict)
async def fetch_events(since: datetime) -> AsyncIterator[Event]:
"""模拟拉取事件流(替换为真实 API 调用)"""
for i in range(5):
await asyncio.sleep(0.1)
yield Event(
ts=since + timedelta(minutes=i * 10),
kind="metric",
payload={"cpu": 42 + i * 3, "mem": 0.7 - i * 0.05},
)
async def transform(source: AsyncIterator[Event]) -> AsyncIterator[dict]:
async for ev in source:
yield {
"time": ev.ts.isoformat(),
"type": ev.kind,
"cpu_percent": ev.payload["cpu"],
"mem_fraction": ev.payload["mem"],
}
async def store(rows: AsyncIterator[dict]) -> list[dict]:
results = []
async for row in rows:
results.append(row)
print(f" stored: {row['time']} cpu={row['cpu_percent']}% mem={row['mem_fraction']:.2f}")
return results
async def main():
since = datetime(2026, 5, 23, 9, 0)
print(f"Pipeline start, since={since.isoformat()}\n")
events = fetch_events(since)
transformed = transform(events)
results = await store(transformed)
print(f"\nDone. {len(results)} rows stored.")
if __name__ == "__main__":
asyncio.run(main())