本專案實現了完整的即時振動數據分析與監控系統,能夠在 25.6 kHz 高採樣率下實時處理感測器數據,進行特徵提取、異常檢測並通過 WebSocket 推送結果給前端。系統採用非同步架構,結合 PostgreSQL、Redis 和 WebSocket 技術,實現高效能的即時數據處理。
┌─────────────┐
│ 前端應用 │
│ (Vue 3) │
└──────┬──────┘
│ WebSocket
▼
┌─────────────────────────────────────────────┐
│ FastAPI 後端 │
│ ┌──────────────┐ ┌─────────────┐ │
│ │ WebSocket │ │ RealTime │ │
│ │ Manager │◄─┤ Analyzer │ │
│ └──────┬───────┘ └──────┬──────┘ │
│ │ │ │
│ │ ┌─────▼──────┐ │
│ │ │ Buffer │ │
│ │ │ Manager │ │
│ │ └─────┬──────┘ │
│ │ │ │
│ ┌──────▼──────────┐ ┌───▼──────────┐ │
│ │ Redis Client │ │ Async DB │ │
│ └──────┬──────────┘ └───┬──────────┘ │
└─────────┼────────────────┼────────────────┘
│ │
┌─────▼─────┐ ┌────▼─────────┐
│ Redis │ │ PostgreSQL │
│ (快取/流) │ │ (持久化) │
└───────────┘ └─────────────┘
檔案位置: backend/buffer_manager.py (第 1-356 行)
程式碼來源:
SensorBuffer類: 第 20-159 行BufferManager類: 第 161-352 行- 批量寫入優化: 第 224-238 行
- 使用循環緩衝區 (Circular Buffer) 管理高頻率感測器數據
- 預設緩衝區大小:25,600 個樣本點 (1 秒 @ 25.6 kHz)
- 提供時間窗口數據訪問接口
- 同步數據到 Redis Streams (批量寫入優化)
- 支持多感測器並發緩衝
單一感測器的循環緩衝區實作。
初始化參數:
buffer = SensorBuffer(
sensor_id=1, # 感測器 ID
buffer_size=25600 # 緩衝區大小 (預設 1 秒 @ 25.6 kHz)
)主要方法:
# 添加單一樣本
add_sample(timestamp, h_acc, v_acc)
# 批量添加樣本
add_batch(samples)
# 獲取時間窗口數據
get_window(window_seconds=1.0) -> Dict
# 檢查是否就緒
is_ready(min_samples=10000) -> bool
# 獲取統計信息
get_stats() -> Dict返回的窗口數據結構:
{
'sensor_id': int,
'window_start': datetime,
'window_end': datetime,
'h_data': np.ndarray, # 水平加速度數組
'v_data': np.ndarray, # 垂直加速度數組
'sample_count': int
}多感測器緩衝區管理器。
關鍵功能:
- 線程安全的緩衝區訪問 (
asyncio.Lock) - 自動數據持久化到 Redis (批量寫入優化)
- 支持清理舊緩衝區 (預設 60 分鐘)
主要方法:
# 獲取或創建緩衝區
async get_buffer(sensor_id) -> SensorBuffer
# 添加數據到緩衝區
async add_data(sensor_id, data)
# 優化:使用批量寫入 Redis (add_sensor_data_batch)
# 獲取窗口數據
async get_window(sensor_id, window_seconds) -> Dict
# 保存窗口數據到數據庫
async save_to_database(sensor_id, window_data)
# 清理舊緩衝區
async cleanup_old_buffers(max_age_minutes=60)性能優化:
# 原程式碼:循環逐個寫入 Redis (性能差)
# for sample in data:
# await redis_client.add_sensor_data(sensor_id, sample)
# 優化後:批量寫入 Redis
redis_data = [
{
'timestamp': sample['timestamp'].isoformat(),
'h_acc': str(sample['h_acc']),
'v_acc': str(sample['v_acc'])
}
for sample in data
]
await redis_client.add_sensor_data_batch(sensor_id, redis_data)檔案位置: backend/realtime_analyzer.py (第 1-416 行)
程式碼來源:
RealTimeAnalyzer類: 第 34-411 行- 分析循環
_analysis_loop: 第 101-173 行 - 特徵提取
_extract_features: 第 175-250 行 - 警報檢查
_check_alerts: 第 329-363 行 - 時間戳處理: 第 193-241 行
- 連續的特徵提取循環 (10 Hz 更新頻率)
- 時域、頻域特徵計算
- 閾值監控與警報生成
- 實時特徵推送
1. 啟動分析 (start_analysis)
└─ 創建異步任務 _analysis_loop
2. 分析循環 (_analysis_loop)
├─ 獲取窗口數據 (1 秒窗口)
├─ 檢查樣本數量 (>= 10000)
├─ 提取特徵 (_extract_features)
│ ├─ 時域特徵 (RMS, Peak, Kurtosis, Crest Factor)
│ └─ 頻域特徵 (Dominant Frequency)
├─ 保存到數據庫 (PostgreSQL)
├─ 廣播到 WebSocket
├─ 快取到 Redis
├─ 檢查閾值 (_check_alerts)
└─ 等待 0.1 秒 (10 Hz)
3. 警報檢測 (_check_alerts)
└─ 對比數據庫中的閾值配置
├─ 超過上限 → 生成警報
└─ 低於下限 → 生成警報
# 啟動分析
async start_analysis(sensor_id)
# 停止分析
async stop_analysis(sensor_id)
# 提取特徵
async _extract_features(window_data) -> Dict
# 保存特徵
async _save_features(sensor_id, features)
# 檢查警報
async _check_alerts(sensor_id, features)
# 獲取狀態
get_status() -> Dict時域特徵:
- RMS (Root Mean Square): 均方根
rms = sqrt(mean(x^2))
- Peak: 最大絕對值
peak = max(|x|)
- Kurtosis: 峰度
kurtosis = mean(((x - mean) / std)^4)
- Crest Factor: 波峰因數
crest_factor = peak / rms
頻域特徵:
- Dominant Frequency: 使用 FFT 計算主頻
fft_result = fft(data) peak_idx = argmax(|fft_result|) dominant_freq = freqs[peak_idx]
數據格式處理:
# 時間戳處理:轉換為 ISO 字串供 WebSocket 使用
features = {
'window_start': window_data['window_start'].isoformat(),
'window_end': window_data['window_end'].isoformat(),
'timestamp': window_data['window_end'].isoformat(), # 前端使用的時間戳
# ... 其他特徵
}
# 保存到數據庫前轉回 datetime 對象
features_for_db = features.copy()
for key in ['window_start', 'window_end']:
if isinstance(features_for_db.get(key), str):
features_for_db[key] = datetime.fromisoformat(
features_for_db[key].replace('Z', '+00:00')
)檔案位置: backend/websocket_manager.py (第 1-252 行)
程式碼來源:
ConnectionManager類: 第 16-247 行- 連接方法
connect: 第 30-60 行 - 斷開方法
disconnect: 第 62-101 行 - 廣播方法
broadcast_feature_update: 第 186-207 行 - 警報廣播
broadcast_alert: 第 171-184 行
- 管理多個 WebSocket 連線
- 按感測器 ID 組織連線
- 廣播消息到訂閱客戶端
- 自動清理斷開的連線
- 連線狀態追蹤
sensor_id=1: [WS1, WS2, WS3]
sensor_id=2: [WS4]
sensor_id=0: [WS5, WS6] # 全局訂閱 (警報)
# 連接客戶端
async connect(websocket, sensor_id)
# 斷開連接
async disconnect(websocket)
# 發送個人消息
async send_personal_message(message, websocket)
# 廣播到特定感測器
async broadcast_to_sensor(sensor_id, message)
# 廣播到所有連線
async broadcast_to_all(message)
# 廣播警報
async broadcast_alert(alert)
# 廣播特徵更新
async broadcast_feature_update(sensor_id, features)
# 獲取連線數量
get_connection_count(sensor_id=None) -> int
# 獲取活躍感測器
get_active_sensors() -> list
# 獲取連線信息
get_connection_info() -> dict特徵更新消息:
{
"type": "feature_update",
"sensor_id": 1,
"data": {
"timestamp": "2026-01-20T10:30:01.123456",
"rms_h": 0.1234,
"rms_v": 0.0987,
"window_start": "2026-01-20T10:30:00",
"window_end": "2026-01-20T10:30:01",
"peak_h": 0.8901,
"peak_v": 0.7654,
"kurtosis_h": 3.4567,
"kurtosis_v": 3.2345,
"crest_factor_h": 7.2123,
"crest_factor_v": 7.7543,
"dominant_freq_h": 123.45,
"dominant_freq_v": 123.45
}
}警報消息:
{
"type": "alert",
"data": {
"sensor_id": 1,
"alert_type": "threshold",
"severity": "critical",
"message": "rms_h is above threshold (0.5000 above 0.3000)",
"feature_name": "rms_h",
"current_value": 0.5000,
"threshold_value": 0.3000
}
}檔案位置: backend/redis_client.py (第 1-498 行)
程式碼來源:
RedisClient類: 第 28-497 行- Streams 操作: 第 65-167 行 (含批量寫入
add_sensor_data_batch: 第 94-127 行) - 快取操作: 第 168-228 行
- Pub/Sub 操作: 第 230-269 行
- 連線管理: 第 271-335 行
- 感測器狀態: 第 336-380 行
- 警報隊列: 第 382-431 行
- Streams: 時間序列數據存儲 (批量寫入優化)
- Hash: 特徵快取
- Pub/Sub: 消息發布訂閱
- Set: 連線追蹤
- List: 警報隊列
Streams 操作:
# 添加單個數據點
async add_sensor_data(sensor_id, data)
# 批量添加數據點 (性能優化)
async add_sensor_data_batch(sensor_id, data_list)
# 讀取最近的數據
async get_sensor_stream(sensor_id, count=100)
# 截斷流
async trim_sensor_stream(sensor_id, max_length=10000)快取操作:
# 快取特徵
async cache_features(sensor_id, features, ttl=300)
# 獲取快取特徵
async get_cached_features(sensor_id) -> DictPub/Sub 操作:
# 發布消息
async publish(channel, message)
# 訂閱頻道
async subscribe(channel) -> PubSub連線管理:
# 添加活躍連線
async add_active_connection(connection_id)
# 移除連線
async remove_active_connection(connection_id)
# 獲取活躍連線數
async get_active_connection_count() -> int感測器狀態:
# 更新狀態
async update_sensor_status(sensor_id, status)
# 獲取狀態
async get_sensor_status(sensor_id) -> Dict警報隊列:
# 推入警報
async push_alert(alert)
# 彈出警報
async pop_alert() -> Dict
# 獲取隊列長度
async get_alert_queue_length() -> int檔案位置: backend/database_async.py (第 1-200+ 行)
程式碼來源:
AsyncDatabase類: 第 25-267+ 行- 連線池初始化
init_pool: 第 37-61 行 - 批量插入
insert_sensor_data: 第 143-166 行 - 特徵插入
insert_features: 第 168-227+ 行 - 警報操作: 第 400-467+ 行
- 使用 asyncpg 實現非同步 PostgreSQL 訪問
- 連線池管理 (最小 10,最大 50 連線)
- 批量數據插入優化
- 視圖查詢優化
await asyncpg.create_pool(
DATABASE_URL,
min_size=10, # 最小連線數
max_size=50, # 最大連線數
max_queries=50000, # 每連線最大查詢數
max_inactive_connection_lifetime=300.0, # 5 分鐘
command_timeout=60 # 查詢超時
)# 批量插入感測器數據
async insert_sensor_data(sensor_id, data)
# 插入特徵
async insert_features(sensor_id, features)
# 獲取最新特徵
async get_latest_features(sensor_id) -> Dict
# 獲取活躍警報
async get_active_alerts(limit=100) -> List[Dict]
# 創建警報
async create_alert(alert) -> int
# 確認警報
async acknowledge_alert(alert_id, acknowledged_by) -> bool
# 獲取警報配置
async get_alert_configurations(sensor_id) -> List[Dict]
# 註冊感測器
async register_sensor(sensor_id, sensor_name, sensor_type, sampling_rate)
# 獲取感測器狀態
async get_sensor_status(sensor_id) -> Dict檔案位置: scripts/init_postgres.sql (第 1-264 行)
程式碼來源:
- 感測器表
sensors: 第 12-21 行 - 數據分區表
sensor_data: 第 27-53 行 - 即時特徵表
realtime_features: 第 58-98 行 - 警報表
alerts: 第 102-125 行 - 警報配置
alert_configurations: 第 149-166 行 - 流會話
stream_sessions: 第 129-144 行 - 視圖定義: 第 198-240 行
CREATE TABLE sensors (
sensor_id SERIAL PRIMARY KEY,
sensor_name VARCHAR(100) UNIQUE NOT NULL,
sensor_type VARCHAR(50) NOT NULL, -- 'accelerometer', 'temperature'
sampling_rate DECIMAL(10,2) DEFAULT 25600.00,
location VARCHAR(100),
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);-- 按時間分區 (Partitioned Table)
CREATE TABLE sensor_data (
data_id BIGSERIAL,
sensor_id INTEGER REFERENCES sensors(sensor_id),
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
horizontal_acceleration DECIMAL(12,6),
vertical_acceleration DECIMAL(12,6),
temperature DECIMAL(10,4),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (data_id, timestamp)
) PARTITION BY RANGE (timestamp);
-- 按月分區 (例如:2026-01, 2026-02...)
CREATE TABLE sensor_data_2026_01 PARTITION OF sensor_data
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');索引:
CREATE INDEX idx_sensor_data_sensor_ts
ON sensor_data(sensor_id, timestamp DESC);
CREATE INDEX idx_sensor_data_timestamp
ON sensor_data(timestamp DESC);CREATE TABLE realtime_features (
feature_id BIGSERIAL PRIMARY KEY,
sensor_id INTEGER REFERENCES sensors(sensor_id),
window_start TIMESTAMP WITH TIME ZONE NOT NULL,
window_end TIMESTAMP WITH TIME ZONE NOT NULL,
-- 時域特徵
rms_h DECIMAL(10,6),
rms_v DECIMAL(10,6),
peak_h DECIMAL(10,6),
peak_v DECIMAL(10,6),
kurtosis_h DECIMAL(10,6),
kurtosis_v DECIMAL(10,6),
crest_factor_h DECIMAL(10,6),
crest_factor_v DECIMAL(10,6),
-- 頻域特徵
fm0_h DECIMAL(10,6),
fm0_v DECIMAL(10,6),
dominant_freq_h DECIMAL(10,2),
dominant_freq_v DECIMAL(10,2),
-- 包絡特徵
nb4_h DECIMAL(10,6),
nb4_v DECIMAL(10,6),
-- 高階統計特徵
na4_h DECIMAL(10,6),
na4_v DECIMAL(10,6),
fm4_h DECIMAL(10,6),
fm4_v DECIMAL(10,6),
computed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);索引:
CREATE INDEX idx_realtime_features_sensor_time
ON realtime_features(sensor_id, window_start DESC);
CREATE INDEX idx_realtime_features_computed_at
ON realtime_features(computed_at DESC);CREATE TABLE alerts (
alert_id BIGSERIAL PRIMARY KEY,
sensor_id INTEGER REFERENCES sensors(sensor_id),
alert_type VARCHAR(50) NOT NULL, -- 'threshold', 'trend', 'anomaly'
severity VARCHAR(20) NOT NULL, -- 'info', 'warning', 'critical'
message TEXT NOT NULL,
feature_name VARCHAR(100),
current_value DECIMAL(12,6),
threshold_value DECIMAL(12,6),
is_acknowledged BOOLEAN DEFAULT false,
acknowledged_by VARCHAR(100),
acknowledged_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);索引:
CREATE INDEX idx_alerts_sensor_created
ON alerts(sensor_id, created_at DESC);
CREATE INDEX idx_alerts_acknowledged
ON alerts(is_acknowledged, created_at DESC);
CREATE INDEX idx_alerts_severity
ON alerts(severity, created_at DESC);CREATE TABLE alert_configurations (
config_id SERIAL PRIMARY KEY,
sensor_id INTEGER REFERENCES sensors(sensor_id),
feature_name VARCHAR(100) NOT NULL,
threshold_min DECIMAL(12,6),
threshold_max DECIMAL(12,6),
severity VARCHAR(20) DEFAULT 'warning',
enabled BOOLEAN DEFAULT true,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(sensor_id, feature_name)
);CREATE TABLE stream_sessions (
session_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
sensor_id INTEGER REFERENCES sensors(sensor_id),
client_id VARCHAR(255),
connected_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
disconnected_at TIMESTAMP WITH TIME ZONE,
status VARCHAR(20) DEFAULT 'active', -- 'active', 'closed', 'error'
bytes_received BIGINT DEFAULT 0,
data_points_received BIGINT DEFAULT 0
);CREATE VIEW v_latest_features AS
SELECT DISTINCT ON (sensor_id)
sensor_id,
window_start,
window_end,
rms_h, rms_v,
peak_h, peak_v,
kurtosis_h, kurtosis_v,
crest_factor_h, crest_factor_v,
fm0_h, fm0_v,
dominant_freq_h, dominant_freq_v,
computed_at
FROM realtime_features
ORDER BY sensor_id, computed_at DESC;CREATE VIEW v_active_alerts AS
SELECT
a.*,
s.sensor_name
FROM alerts a
JOIN sensors s ON a.sensor_id = s.sensor_id
WHERE a.is_acknowledged = false
ORDER BY a.created_at DESC;CREATE VIEW v_sensor_status AS
SELECT
s.sensor_id,
s.sensor_name,
s.sensor_type,
s.sampling_rate,
s.is_active,
s.location,
COUNT(DISTINCT sess.session_id) FILTER (WHERE sess.status = 'active') as active_connections,
MAX(sess.connected_at) as last_connection,
COUNT(a.alert_id) FILTER (WHERE a.is_acknowledged = false) as active_alerts
FROM sensors s
LEFT JOIN stream_sessions sess ON s.sensor_id = sess.sensor_id
LEFT JOIN alerts a ON s.sensor_id = a.sensor_id
GROUP BY s.sensor_id, s.sensor_name, s.sensor_type, s.sampling_rate, s.is_active, s.location
ORDER BY s.sensor_id;後端檔案: backend/main.py
程式碼來源:
- POST
/api/sensor/data: 第 1577-1634 行 - POST
/api/sensor/data/stream: 第 1635-1707 行
用途: 接收批量感測器數據並添加到 Buffer Manager
請求格式:
{
"sensor_id": 1,
"data": [
{
"timestamp": "2026-01-20T10:30:00.123",
"h_acc": 0.1234,
"v_acc": 0.0987
},
{
"timestamp": "2026-01-20T10:30:00.125",
"h_acc": 0.1256,
"v_acc": 0.1001
}
]
}欄位說明:
sensor_id: 感測器 ID (integer)data: 數據點陣列timestamp: ISO 8601 格式時間戳 (datetime)h_acc: 水平加速度值 (float)v_acc: 垂直加速度值 (float)
響應:
{
"status": "success",
"sensor_id": 1,
"processed": 2,
"message": "Successfully processed 2 data points"
}數據處理流程:
機台數據
│
▼
POST /api/sensor/data
│
├─► BufferManager.add_data()
│ ├─► 記憶體循環緩衝區
│ └─► Redis Streams (批量寫入)
│
└─► RealTimeAnalyzer._analysis_loop()
└─► 特徵提取與分析
用途: 流式接收高頻感測器數據(陣列形式)
適用於需要高效傳輸大量數據的場景,如每秒 25600 樣本的高頻採集。
請求格式 (Query Parameters):
sensor_id=1
h_acc=[0.1234, 0.1256, 0.1278, ...]
v_acc=[0.0987, 0.1001, 0.1015, ...]
timestamp_start=2026-01-20T10:30:00.123
sampling_rate=25600.0
參數說明:
sensor_id: 感測器 ID (integer)h_acc: 水平加速度陣列 (array of floats)v_acc: 垂直加速度陣列 (array of floats)timestamp_start: 起始時間戳 (ISO 8601 datetime)sampling_rate: 採樣率 Hz,預設 25600 (float)
響應:
{
"status": "success",
"sensor_id": 1,
"processed": 25600,
"time_range": [
"2026-01-20T10:30:00.123000",
"2026-01-20T10:31:00.123000"
]
}計算邏輯:
- 根據起始時間和採樣率自動計算每個樣本的時間戳
- 確保 h_acc 和 v_acc 陣列長度一致
程式碼來源:
/ws/realtime/{sensor_id}: 第 1712-1748 行/ws/alerts: 第 1749-1768 行
用途: 特定感測器的即時數據流
功能:
- 接收特徵更新
- 接收警報通知
- Ping/Pong 保活
範例:
ws://localhost:8081/ws/realtime/1
實作:
@app.websocket("/ws/realtime/{sensor_id}")
async def websocket_realtime_sensor(websocket: WebSocket, sensor_id: int):
await manager.connect(websocket, sensor_id)
try:
# Start analysis if not already running
await analyzer.start_analysis(sensor_id)
# Keep connection alive
while True:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_json({
"type": "pong",
"timestamp": datetime.now().isoformat()
})
except WebSocketDisconnect:
await manager.disconnect(websocket)
if manager.get_connection_count(sensor_id) == 0:
await analyzer.stop_analysis(sensor_id)用途: 全局警報流
功能:
- 接收所有感測器的警報
範例:
ws://localhost:8081/ws/alerts
實作:
@app.websocket("/ws/alerts")
async def websocket_alerts(websocket: WebSocket):
await websocket.accept()
await manager.connect(websocket, sensor_id=0) # 0 = global
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await manager.disconnect(websocket)程式碼來源:
- POST
/api/stream/start: 第 1774-1799 行 - POST
/api/stream/stop: 第 1801-1820+ 行 - GET
/api/stream/status: 第 1822+ 行 - GET
/api/realtime/features/{sensor_id}: 第 1890+ 行 - GET
/api/alerts/active: 第 1915+ 行 - POST
/api/alerts/acknowledge/{alert_id}: 第 1938+ 行
用途: 開始即時流傳輸
請求體:
{
"sensor_id": 1,
"sampling_rate": 25600
}響應:
{
"status": "started",
"sensor_id": 1,
"message": "Real-time analysis started",
"sampling_rate": 25600
}用途: 停止即時流傳輸
參數: sensor_id (query parameter)
響應:
{
"status": "stopped",
"sensor_id": 1,
"message": "Real-time analysis stopped"
}用途: 獲取流傳輸狀態
響應:
{
"active_streams": 2,
"active_connections": 5,
"active_sensors": [1, 2],
"sensor_connections": {
"1": 3,
"2": 2
}
}用途: 獲取最新特徵
響應:
{
"sensor_id": 1,
"window_start": "2026-01-20T10:30:00",
"window_end": "2026-01-20T10:30:01",
"rms_h": 0.1234,
"rms_v": 0.0987,
"kurtosis_h": 3.4567,
"kurtosis_v": 3.2345,
"peak_h": 0.8901,
"peak_v": 0.7654,
"crest_factor_h": 7.2123,
"crest_factor_v": 7.7543,
"dominant_freq_h": 123.45,
"dominant_freq_v": 123.45
}用途: 獲取活躍警報
參數: limit (預設 100)
響應:
{
"alerts": [
{
"alert_id": 1,
"sensor_id": 1,
"alert_type": "threshold",
"severity": "critical",
"message": "rms_h is above threshold",
"feature_name": "rms_h",
"current_value": 0.5000,
"threshold_value": 0.3000,
"created_at": "2026-01-20T10:30:00",
"is_acknowledged": false
}
],
"count": 1
}用途: 確認警報
請求體:
{
"acknowledged_by": "user"
}響應:
{
"status": "acknowledged",
"alert_id": 1
}檔案位置: scripts/continuous_machine_simulator.py (第 1-354 行)
程式碼來源:
ContinuousDataStreamer類: 第 32-241 行- 信號生成
_generate_vibration_signal: 第 65-94 行 - 推送循環
_stream_loop: 第 96-171 行 - 啟動/停止方法: 第 172-227 行
- 主程序入口
main: 第 269-350 行
- 持續推送模擬振動數據 (25.6 kHz 採樣率)
- 支持多線程異步運行
- 自動生成正弦波 + 噪音的模擬信號
- 統計推送速率和數據量
- 優雅的中斷處理
class ContinuousDataStreamer:
def __init__(self, sensor_id: int = 1, api_url: str = "http://localhost:8081"):
self.sensor_id = sensor_id
self.api_url = api_url
self.sampling_rate = 25600 # 25.6 kHz
self.batch_size = 25600 # 每次推送 1 秒數據def _generate_vibration_signal(self, num_samples: int) -> tuple:
"""
生成模擬振動信號 (正弦波 + 高斯噪音)
Returns:
(h_acc, v_acc) 水平和垂直加速度陣列
"""
t = np.linspace(0, num_samples / self.sampling_rate, num_samples)
# 模擬振動訊號: 多個頻率的正弦波疊加 + 噪音
h_acc = (
0.05 * np.sin(2 * np.pi * 10 * t) + # 基頻 10 Hz
0.02 * np.sin(2 * np.pi * 50 * t) + # 諧波 1 (50 Hz)
0.01 * np.sin(2 * np.pi * 100 * t) + # 諧波 2 (100 Hz)
np.random.normal(0, 0.1, num_samples) # 高斯噪音
)
v_acc = (
0.04 * np.cos(2 * np.pi * 10 * t) + # 基頻 (相位差)
0.015 * np.cos(2 * np.pi * 50 * t) + # 諧波 1
0.008 * np.cos(2 * np.pi * 100 * t) + # 諧波 2
np.random.normal(0, 0.08, num_samples) # 高斯噪音
)
return h_acc.tolist(), v_acc.tolist()def _stream_loop(self):
"""持續推送數據的循環 (在獨立線程中運行)"""
while self.running:
# 生成 1 秒的模擬數據
start_time = datetime.now()
h_acc, v_acc = self._generate_vibration_signal(self.batch_size)
# 準備數據點
data_points = []
for i in range(self.batch_size):
timestamp = start_time + timedelta(
microseconds=i * (1000000 / self.sampling_rate)
)
data_points.append({
"timestamp": timestamp.isoformat(),
"h_acc": round(float(h_acc[i]), 6),
"v_acc": round(float(v_acc[i]), 6)
})
# 推送到後端
payload = {
"sensor_id": self.sensor_id,
"data": data_points
}
response = requests.post(
f"{self.api_url}/api/sensor/data",
json=payload,
timeout=60 # 大批次數據的超時時間
)
# 控制推送頻率 (大約實時)
time.sleep(1.0 - elapsed)# 使用預設配置啟動 (sensor_id=1)
python scripts/continuous_machine_simulator.py
# 指定感測器 ID
python scripts/continuous_machine_simulator.py --sensor-id 2
# 指定後端 URL
python scripts/continuous_machine_simulator.py --url http://192.168.1.100:8081
# 指定運行時長 (秒)
python scripts/continuous_machine_simulator.py --duration 60==================================================
持續數據推送已啟動
感測器 ID: 1
採樣率: 25600 Hz
批次大小: 25600 樣本 (1 秒)
API 端點: http://localhost:8081/api/sensor/data
按 Ctrl+C 停止
==================================================
已推送 10 批次 (256000 點) - 平均速率: 1.00 批次/秒
已推送 20 批次 (512000 點) - 平均速率: 1.00 批次/秒
...
{
'total_batches': 120, # 總推送批次
'total_points': 3072000, # 總數據點數
'start_time': ..., # 開始時間
'elapsed_seconds': 120.0, # 運行時間
'average_rate': 1.0, # 平均速率 (批次/秒)
'success_count': 120, # 成功次數
'error_count': 0, # 錯誤次數
'success_rate': 100.0 # 成功率 (%)
}檔案位置: frontend/src/services/websocket.js (第 1-192 行)
程式碼來源:
RealtimeService類: 第 8-186 行- 連接方法
connect: 第 23-90 行 - 斷開方法
disconnect: 第 95-106 行 - 事件監聽
on/off: 第 133-153 行 - 重連邏輯: 第 72-89 行
- 自動重連機制 (最多 10 次,指數退避)
- 事件驅動架構
- Ping/Pong 保活機制
- 錯誤處理
// 連接到感測器
connect(sensorId)
// 斷開連接
disconnect()
// 發送消息
send(message)
// Ping
ping()
// 事件監聽
on(event, callback)
// 移除監聽器
off(event, callback)
// 獲取連線狀態
getConnectionStatus() -> boolean// 連接建立
'connected'
// 連接斷開
'disconnected'
// 特徵更新
'feature_update'
// 警報
'alert'
// Pong 響應
'pong'
// 錯誤
'error'
// 重連失敗
'reconnect_failed'reconnectAttempts < maxReconnectAttempts (10)
└─ 指數退避: min(1000 * 2^(n-1), 30000ms)
└─ 1s → 2s → 4s → 8s → ... → 30s檔案位置: frontend/src/stores/realtime.js (第 1-334 行)
程式碼來源:
- Store 定義: 第 10-333 行
- 狀態定義: 第 12-46 行 (含
MAX_BUFFER_POINTS: 第 22 行,windowSize: 第 45 行) - 連接方法
connect: 第 97-139 行 - 更新特徵
updateFeatures: 第 156-193 行 - 滾動窗口
scrollWindow: 第 86-91 行 - 清空緩衝區
clearBuffers: 第 239-264 行
{
isConnected: boolean, // 連線狀態
currentSensor: number, // 當前感測器 ID
latestFeatures: {}, // 最新特徵
alertHistory: [], // 警報歷史
isStreaming: boolean, // 是否在流傳輸
connectionStatus: string, // 連線狀態字串
signalBuffer: {}, // 信號緩衝區
featureBuffer: {}, // 特徵緩衝區
MAX_BUFFER_POINTS: 1000, // 緩衝區最大點數 (已優化)
windowSize: 1000 // 視窗顯示範圍 (已優化)
}// 是否有警報
hasAlerts
// 最新警報
latestAlert
// 特徵數量
featureCount
// 視窗結束位置
windowEnd
// 當前視窗數據
currentWindow// 連接到感測器
connect(sensorId)
// 斷開連接
disconnect()
// 更新特徵
updateFeatures(data)
// 添加警報
addAlert(alert)
// 清空緩衝區
clearBuffers()
// 確認警報
acknowledgeAlert(alertId)
// 格式化特徵值
formatFeature(key) -> string
// 滾動窗口 (新增)
scrollWindow()signalBuffer:
{
timestamps: [],
horizontal: [],
vertical: []
}featureBuffer:
{
timestamps: [],
rms_h: [],
rms_v: [],
kurtosis_h: [],
kurtosis_v: [],
peak_h: [],
peak_v: [],
crest_factor_h: [],
crest_factor_v: []
}// 視窗配置
const MAX_BUFFER_POINTS = 1000 // 緩衝區大小
const windowSize = ref(1000) // 視窗大小
// 滾動窗口方法
function scrollWindow() {
if (featureCount.value > windowSize.value) {
// 窗口始終顯示最新的 windowSize 個數據點
windowStart.value = featureCount.value - windowSize.value
}
}
// 在 updateFeatures 中自動調用
function updateFeatures(data) {
// ... 更新特徵
trimBuffers()
scrollWindow() // 自動滾動
}檔案位置: frontend/src/views/RealtimeAnalysis.vue (第 1-854 行)
程式碼來源:
- Template 結構: 第 1-126 行 (控制面板、警報面板、特徵卡片、圖表)
- Script Setup: 第 128-552 行
- 開始/停止監控: 第 196-229 行
- 圖表初始化
initCharts: 第 250-452 行 - 圖表更新
updateCharts: 第 454-520 行 - 深色主題樣式: 第 554-853 行
- 圖表配置優化: 第 250-452 行 (網格線、軸標籤、自動間隔)
-
控制面板
- 感測器 ID 輸入 (
el-input-number) - 開始/停止監控按鈕
- 連線狀態標籤
- 感測器 ID 輸入 (
-
警報面板
- 顯示最近 5 條警報
- 嚴重程度標籤
- 時間戳顯示
-
特徵卡片 (8 個)
- RMS (水平/垂直)
- Kurtosis (水平/垂直)
- Peak (水平/垂直)
- Crest Factor (水平/垂直)
-
即時圖表 (4 個 ECharts)
- RMS 趨勢
- Kurtosis 趨勢
- Peak 趨勢
- Crest Factor 趨勢
// 開始流傳輸
async startStreaming()
// 停止流傳輸
stopStreaming()
// 格式化特徵值
formatFeatureValue(key) -> string
// 格式化時間
formatTime(timestamp) -> string
// 獲取警報類型
getAlertType(severity) -> string
// 初始化圖表
initCharts()
// 更新圖表
updateCharts()使用 ECharts 實現,包含以下配置:
- 深色主題背景
- 白色文字與軸線
- 平滑曲線 (
smooth: true) - 響應式調整
- Tooltip 顯示
圖表初始化:
function initCharts() {
// Common chart options
const commonOption = {
animation: false,
backgroundColor: 'transparent',
grid: {
top: 30,
right: 20,
bottom: 50,
left: 60,
borderColor: 'rgba(255, 255, 255, 0.1)'
},
xAxis: {
type: 'category',
data: [],
axisLabel: {
rotate: 0,
interval: 'auto', // 自動計算間隔
color: '#ffffff',
fontSize: 13
},
axisLine: { lineStyle: { color: '#ffffff' } },
splitLine: { lineStyle: { color: 'rgba(255, 255, 255, 0.1)' } }
},
yAxis: {
type: 'value',
axisLabel: {
color: '#ffffff',
fontSize: 14
},
axisLine: { lineStyle: { color: '#ffffff' } },
splitLine: { lineStyle: { color: 'rgba(255, 255, 255, 0.1)' } }
},
tooltip: {
trigger: 'axis',
backgroundColor: 'rgba(30, 30, 30, 0.9)',
borderColor: 'var(--accent-primary)',
textStyle: { color: '#ffffff' }
},
legend: {
textStyle: {
color: '#ffffff',
fontSize: 15
}
}
}
// Initialize charts
rmsChart = echarts.init(rmsChartRef.value)
rmsChart.setOption({
...commonOption,
legend: { data: ['水平', '垂直'] },
series: [
{ name: '水平', type: 'line', data: [], smooth: true, ... },
{ name: '垂直', type: 'line', data: [], smooth: true, ... }
]
})
// ... 其他圖表
}圖表更新:
function updateCharts() {
const timestamps = currentWindow.value.timestamps.map(t => {
const date = new Date(t)
return date.toLocaleTimeString('zh-TW', {
hour12: false,
hour: '2-digit',
minute: '2-digit',
second: '2-digit'
})
})
// Update RMS Chart
if (currentWindow.value.rms_h.length > 0) {
rmsChart.setOption({
xAxis: { data: timestamps },
series: [
{ data: currentWindow.value.rms_h },
{ data: currentWindow.value.rms_v }
]
})
}
// ... 更新其他圖表
}
// Watch for feature updates
watch(featureCount, (newCount) => {
if (newCount > 0) {
updateCharts()
}
})onMounted()
├─ initCharts()
└─ window resize 監聽
onUnmounted()
├─ dispose charts
└─ disconnect WebSocket- 深色主題 (Apple Keynote 風格)
- 卡片發光陰影效果
- 響應式文字大小
- 動畫過渡效果
.realtime-analysis {
background: var(--bg-primary);
min-height: 100vh;
}
.feature-card {
background: var(--bg-card);
box-shadow: 0 2px 12px var(--shadow-glow);
transition: transform 0.3s, box-shadow 0.3s;
}
.feature-card:hover {
transform: translateY(-2px);
box-shadow: 0 4px 16px rgba(102, 126, 234, 0.4);
}感測器數據
│
▼
┌──────────────────┐
│ BufferManager │
│ .add_data() │
└────────┬─────────┘
│
├─► 記憶體循環緩衝區 (25,600 樣本)
│
└─► Redis Streams (批量寫入)
(臨時持久化, 24h TTL)
分析任務 (10 Hz)
│
▼
┌─────────────────────┐
│ 獲取窗口數據 │
│ (1秒, >=10000樣本)│
└────────┬──────────┘
│
▼
┌─────────────────────┐
│ 特徵提取 │
│ - 時域特徵 │
│ - 頻域特徵 │
└────────┬──────────┘
│
├─► PostgreSQL (持久化)
│
├─► Redis Cache (快速查詢)
│
├─► WebSocket (推送到前端)
│
└─► 閾值檢查
│
▼
警報生成
│
├─► PostgreSQL (警報記錄)
│
└─► WebSocket (推送到前端)
前端 ←─→ 後端
連接建立:
前端: connect(sensorId)
└─► ws://localhost:8081/ws/realtime/{sensorId}
└─► manager.connect()
└─► analyzer.start_analysis()
數據推送:
後端: analyzer._analysis_loop()
└─► manager.broadcast_feature_update()
└─► 所有訂閱該感測器的客戶端
警報推送:
後端: analyzer._create_alert()
└─► manager.broadcast_alert()
└─► 所有客戶端 (sensor_id=0)
連接斷開:
前端: disconnect()
└─► manager.disconnect()
└─► analyzer.stop_analysis() (如果是最後一個連線)
1. 用戶操作
前端輸入感測器 ID
│
▼
點擊 "開始監控"
│
▼
websocketService.connect(sensorId)
2. WebSocket 連接建立
ws://localhost:8081/ws/realtime/{sensorId}
│
▼
FastAPI 接收連接
│
├─► manager.connect(websocket, sensorId)
│ └─► 追蹤連線 (Redis)
│
└─► analyzer.start_analysis(sensorId)
└─► 創建異步分析任務
3. 數據採集 (模擬機台)
模擬器 → BufferManager.add_data()
│
├─► 記憶體循環緩衝區
└─► Redis Streams (批量寫入)
4. 分析循環 (10 Hz)
analyzer._analysis_loop()
│
├─► buffer_manager.get_window(1.0s)
│ └─► 檢查樣本數 >= 10000
│
├─► _extract_features()
│ ├─► 時域: RMS, Peak, Kurtosis, Crest
│ └─► 頻域: Dominant Freq
│
├─► db.insert_features()
│ └─► PostgreSQL
│
├─► redis_client.cache_features()
│ └─► Redis Hash (TTL 300s)
│
├─► manager.broadcast_feature_update()
│ └─► WebSocket 客戶端
│
└─► _check_alerts()
└─► 對比閾值
├─► 超過/低於 → db.create_alert()
└─► manager.broadcast_alert()
└─► 所有客戶端
5. 前端處理
websocketService.on('feature_update')
│
├─► realtimeStore.updateFeatures(data)
│ ├─► 更新 latestFeatures
│ ├─► 更新 featureBuffer
│ ├─► trimBuffers() (保持 1000 點)
│ └─► scrollWindow() (自動滾動)
│
└─► Vue watch → updateCharts()
└─► ECharts 更新
websocketService.on('alert')
│
└─► realtimeStore.addAlert(alert)
├─► 更新 alertHistory
└─► Vue 重新渲染警報列表
6. 用戶斷開
點擊 "停止監控"
│
▼
websocketService.disconnect()
│
▼
FastAPI manager.disconnect(websocket)
│
├─► 檢查連線數
└─► 如果為 0 → analyzer.stop_analysis()
└─► 取消異步任務
T=0s: 用戶連接
T=0.1s: WebSocket 建立
T=0.1s: 分析任務啟動
T=0.2s: 緩衝區累積數據
T=1.0s: 第一次分析 (10000 樣本)
T=1.1s: 特徵推送到前端
T=1.2s: 前端圖表更新
T=2.0s: 第二次分析
T=2.1s: 特徵推送
...
T=10s: 檢測到異常
T=10.1s: 生成警報
T=10.1s: 警報推送
T=10.2s: 前端警報面板更新
...
T=Ns: 用戶斷開
警報配置存儲在 alert_configurations 表中:
CREATE TABLE alert_configurations (
config_id SERIAL PRIMARY KEY,
sensor_id INTEGER,
feature_name VARCHAR(50), -- 特徵名稱 (rms_h, kurtosis_v, etc.)
threshold_min NUMERIC, -- 下限閾值
threshold_max NUMERIC, -- 上限閾值
severity VARCHAR(20), -- 嚴重程度: critical, warning, info
enabled BOOLEAN DEFAULT true
);1. 特徵計算完成
└─► _check_alerts(sensor_id, features)
2. 獲取配置
db.get_alert_configurations(sensor_id)
└─► 返回該感測器的所有啟用配置
3. 逐個檢查
對每個配置:
├─► 獲取特徵值: features[feature_name]
├─► 檢查上限: value > threshold_max?
│ └─► 是 → _create_alert(direction='above')
└─► 檢查下限: value < threshold_min?
└─► 是 → _create_alert(direction='below')
4. 創建警報
db.create_alert(alert)
└─► 插入 alerts 表
5. 廣播警報
manager.broadcast_alert(alert)
└─► 推送到所有 WebSocket 客戶端
{
sensor_id: 1,
alert_type: "threshold",
severity: "critical",
message: "rms_h is above threshold (0.5000 above 0.3000)",
feature_name: "rms_h",
current_value: 0.5000,
threshold_value: 0.3000
}用戶可以確認警報以標記為已處理:
async function acknowledgeAlert(alertId) {
await fetch(`/api/alerts/acknowledge/${alertId}`, {
method: 'POST',
body: JSON.stringify({ acknowledged_by: 'user' })
})
// 從本地歷史中移除
alertHistory.value = alertHistory.value.filter(a => a.alert_id !== alertId)
}循環緩衝區:
- 使用
collections.deque(maxlen=25600) - 自動覆蓋舊數據,無需手動清理
- 固定記憶體佔用
數據類型優化:
- 使用 NumPy 數組 (連續記憶體)
- 批量操作減少循環開銷
異步 I/O:
- 所有數據庫操作使用 asyncpg
- Redis 使用 redis.asyncio
- WebSocket 使用 FastAPI 異步處理
連線池:
- PostgreSQL 連線池 (10-50 連線)
- 自動連線復用
- 連線超時管理
Redis 多層快取:
L1: 記憶體緩衝區 (1 秒)
└─► 最快,容量小
L2: Redis Hash (5 分鐘 TTL)
└─► 跨進程共享,中等容量
L3: PostgreSQL (持久化)
└─► 最慢,無限容量
查詢優化:
# 視圖查詢 (預計算)
SELECT * FROM v_latest_features WHERE sensor_id = $1
# 索引優化
CREATE INDEX idx_sensor_data_timestamp ON sensor_data(sensor_id, timestamp);Redis 批量寫入:
# 使用 pipeline 減少網路往返
pipeline = self.redis.pipeline()
for data in data_list:
pipeline.xadd(key, data)
await pipeline.execute()WebSocket:
- 二進制數據壓縮 (可選)
- 批量消息合併
- 心跳檢測 (ping/pong)
前端優化:
- 緩衝區大小優化:1000 點 (原 100)
- 滾動窗口:自動顯示最新數據
- 虛擬滾動:只渲染可見範圍
系統指標:
- 活躍連線數
- 分析任務數
- 緩衝區使用率
- 警報隊列長度
性能指標:
- 分析延遲 (目標 < 100ms)
- 端到端延遲 (目標 < 500ms)
- 特徵提取吞吐量 (10 Hz)
- WebSocket 消息延遲
WebSocket 斷開:
# 自動重連 (最多 10 次)
reconnectAttempts < maxReconnectAttempts
└─ 指數退避重試Redis 故障:
# 優雅降級
if not redis_client._is_connected:
logger.warning("Redis not connected, skipping cache")
# 繼續處理,只跳過 Redis 操作PostgreSQL 故障:
# 重試機制
try:
await db.insert_features(sensor_id, features)
except Exception as e:
logger.error(f"Error saving to database: {e}")
# 記錄錯誤,不中斷分析循環樣本不足:
if sample_count < min_samples:
# 等待更多數據
continueNaN/Inf 處理:
# 特徵計算中處理異常值
rms = np.sqrt(np.mean(h_data ** 2))
if np.isnan(rms) or np.isinf(rms):
rms = 0.0任務清理:
async def stop_analysis(sensor_id):
task = self.analysis_tasks[sensor_id]
task.cancel() # 取消異步任務
del self.analysis_tasks[sensor_id] # 清除引用緩衝區清理:
async def cleanup_old_buffers(max_age_minutes=60):
# 自動清理 60 分鐘未更新的緩衝區
...services:
backend:
depends_on:
- postgres
- redis
environment:
- DATABASE_URL_POSTGRESQL=postgresql://...
- REDIS_URL=redis://...
postgres:
image: postgres:15
environment:
- POSTGRES_DB=vibration_analysis
- POSTGRES_USER=vibration
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
command: redis-server --requirepass redis_pass
volumes:
- redis_data:/data# 資料庫
DATABASE_URL_POSTGRESQL=postgresql://user:pass@host:5432/db
# Redis
REDIS_URL=redis://:pass@host:6379/0
# 應用
FASTAPI_PORT=8081
LOG_LEVEL=INFO結構化日誌:
import logging
logger = logging.getLogger(__name__)
logger.info(
"Feature extracted",
extra={
"sensor_id": sensor_id,
"rms_h": rms_h,
"window_start": window_start
}
)本專案的即時分析與監控系統採用了現代化的異步架構,具備以下特點:
✅ 高效能: 25.6 kHz 採樣率,10 Hz 分析頻率 ✅ 可擴展: 支持多感測器並發,連線池優化 ✅ 可靠性: 自動重連,優雅降級,故障恢復 ✅ 實時性: WebSocket 推送,Redis 快取,低延遲 ✅ 監控性: 完整的警報機制,狀態追蹤 ✅ 可維護: 模塊化設計,清晰的資料流 ✅ 優化: 批量寫入、滾動窗口、深色主題
系統透過精心的架構設計和效能優化,能夠穩定地處理高頻振動數據,並提供即時的監控與警報功能。