-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
79 lines (68 loc) · 2.8 KB
/
server.py
File metadata and controls
79 lines (68 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import asyncio
import asyncore
from bidict import bidict
import requests
import socket
import struct
import sys
from threading import Thread
import packets
from legend import Legend
from packets import Packet, PingPacket, PongPacket, LoginPacket, LoginResultPacket
class ClientHandler(asyncore.dispatcher_with_send):
def __init__(self, sock=None, map=None, legend=None):
super().__init__(sock, map)
self.legend: Legend = legend
def send_packet(self, packet: Packet):
packet_id: int = packet.id
data: bytes = packet.encode()
packet_bytes = packets.create_packet(packet_id, data)
self.send(packet_bytes)
def handle_read(self):
# Get Packet Length
msg_len = self.recv(4)
if not msg_len:
return
msg_len = struct.unpack(">I", msg_len)[0]
packet_contents = self.recv(msg_len)
packet_id: int = struct.unpack(">h", packet_contents[0:2])[0]
packet_data: bytes = packet_contents[2:]
packet = packets.decode(packet_id, packet_data)
packet_type = type(packet)
if packet_type == PingPacket:
packet: PingPacket
response = PongPacket(packet.msg)
self.send_packet(response)
elif packet_type == LoginPacket:
packet: LoginPacket
headers = {"User-Agent": "Legend Plus login bot v0.1", "Authorization": "Bearer " + packet.access_token}
r = requests.get("https://discordapp.com/api/users/@me", headers=headers)
result = r.json()
if "id" in result and "message" not in result:
# Successful login
response = LoginResultPacket(1, result["id"])
self.send_packet(response)
else:
# Failed
response = LoginResultPacket(0)
self.send_packet(response)
class Server(asyncore.dispatcher):
def __init__(self, config, loop, legend: Legend):
print("Starting Server")
asyncore.dispatcher.__init__(self)
print("Init")
self.legend: Legend = legend
self.config = config
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
print("Socket creating, setting reuse")
self.set_reuse_addr()
print("Binding server to " + self.config["ip"] + ":" + str(self.config["port"]))
self.bind((self.config["ip"], self.config["port"]))
self.listen(10)
def handle_accept(self):
print("Handle Accept")
pair = self.accept()
if pair is not None:
sock, addr = pair
print("Connection from " + str(addr))
handler = ClientHandler(sock=sock, legend=self.legend)