-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuttons.py
More file actions
171 lines (142 loc) · 5.35 KB
/
buttons.py
File metadata and controls
171 lines (142 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python3
"""
Simplified Raspberry Pi Button Handler with WebSocket
Serves static files and broadcasts GPIO events via WebSocket
Pin Configuration:
- Button 1: GPIO18 (Physical pin 12)
- Button 2: GPIO19 (Physical pin 35)
Requirements:
- pip3 install websockets --break-system-packages
Usage:
sudo python3 buttons.py
"""
import RPi.GPIO as GPIO
import asyncio
import websockets
import json
import time
from pathlib import Path
from http.server import SimpleHTTPRequestHandler
from socketserver import TCPServer
from threading import Thread
# Configuration
BUTTON_1_PIN = 18
BUTTON_2_PIN = 19
WS_PORT = 8765
HTTP_PORT = 8000
# Connected WebSocket clients
clients = set()
def setup_gpio():
"""Initialize GPIO pins"""
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(BUTTON_1_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(BUTTON_2_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# Setup edge detection for both rising and falling edges
GPIO.add_event_detect(BUTTON_1_PIN, GPIO.BOTH,
callback=lambda ch: handle_button_event(1, ch),
bouncetime=50)
GPIO.add_event_detect(BUTTON_2_PIN, GPIO.BOTH,
callback=lambda ch: handle_button_event(2, ch),
bouncetime=50)
print(f"✅ GPIO setup complete")
print(f" Button 1: GPIO{BUTTON_1_PIN}")
print(f" Button 2: GPIO{BUTTON_2_PIN}")
def handle_button_event(button_num, channel):
"""Handle button press/release events"""
state = GPIO.input(channel)
event_type = "PRESSED" if state == GPIO.HIGH else "RELEASED"
message = {
"event": event_type,
"button": button_num,
"state": event_type,
"timestamp": time.time()
}
print(f"[{time.strftime('%H:%M:%S')}] Button {button_num}: {event_type}")
# Broadcast to all connected WebSocket clients
print("Broadcasting")
asyncio.run(broadcast(json.dumps(message)))
async def broadcast(message):
"""Send message to all connected clients"""
if clients:
await asyncio.gather(
*[client.send(message) for client in clients],
return_exceptions=True
)
async def websocket_handler(websocket):
"""Handle WebSocket connections"""
clients.add(websocket)
print(f"✅ Client connected (total: {len(clients)})")
try:
# Send initial connection confirmation
await websocket.send(json.dumps({
"event": "CONNECTED",
"message": "Button server ready",
"timestamp": time.time()
}))
# Keep connection alive
async for message in websocket:
pass # Echo or handle client messages if needed
except websockets.exceptions.ConnectionClosed:
pass
finally:
clients.remove(websocket)
print(f"❌ Client disconnected (total: {len(clients)})")
async def start_websocket_server():
"""Start the WebSocket server"""
# WebSocket server with permissive settings - no origin checking
async with websockets.serve(
websocket_handler,
"0.0.0.0",
WS_PORT,
# Allow all origins, no restrictions
origins=None,
compression=None
):
print(f"🌐 WebSocket server running on ws://0.0.0.0:{WS_PORT}")
print(f" CORS: Fully open, all origins allowed")
await asyncio.Future() # Run forever
def start_http_server():
"""Start simple HTTP server for static files"""
class Handler(SimpleHTTPRequestHandler):
def log_message(self, format, *args):
pass # Suppress log messages
def end_headers(self):
# Add CORS headers to all responses
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', '*')
self.send_header('Access-Control-Allow-Headers', '*')
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate')
super().end_headers()
def do_OPTIONS(self):
# Handle preflight requests
self.send_response(200)
self.end_headers()
with TCPServer(("0.0.0.0", HTTP_PORT), Handler) as httpd:
print(f"📁 HTTP server running on http://0.0.0.0:{HTTP_PORT}")
httpd.serve_forever()
def main():
print("=== Simplified Button WebSocket Server ===\n")
# Setup GPIO
try:
setup_gpio()
except Exception as e:
print(f"❌ GPIO setup failed: {e}")
print(" Make sure to run with: sudo python3 buttons.py")
return
# Start HTTP server in background thread
http_thread = Thread(target=start_http_server, daemon=True)
http_thread.start()
print(f"\n✨ Server ready!")
print(f" HTTP: http://0.0.0.0:{HTTP_PORT} (CORS: fully open)")
print(f" WebSocket: ws://0.0.0.0:{WS_PORT} (CORS: fully open)")
print(f"\nPress Ctrl+C to exit\n")
# Start WebSocket server (blocks)
try:
asyncio.run(start_websocket_server())
except KeyboardInterrupt:
print("\n\n👋 Shutting down...")
GPIO.cleanup()
print("Goodbye!")
if __name__ == "__main__":
main()