-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_datanode.py
More file actions
101 lines (90 loc) · 3.42 KB
/
Copy pathrun_datanode.py
File metadata and controls
101 lines (90 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import socket
import json
import logging
from common import load_config, file_default_config
from datanode import DataNode
if __name__ == "__main__":
# socket.setdefaulttimeout(20)
config = load_config("config/datanode.json")
client_datanode_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ""
port = config["client_comm_port"]
client_datanode_socket.bind((host, port))
client_datanode_socket.listen(config["max_client_num"])
datanode_instance = DataNode(config)
while True:
client, addr = client_datanode_socket.accept()
print("Connect from {}".format(addr))
# receive command from client
client_command = client.recv(2048).decode('utf-8')
# accept command successfully
client.send(bytes("ack", encoding="utf-8"))
client_command = json.loads(client_command, encoding="utf-8")
blockinfo = client_command.get("block_info", None)
block = None
ret_block = None
if client_command["command"] == "write_whole_block":
# recv block and return ack signal
length = client_command["block_info"]["length"]
total_recd = 0
chunks = []
success = True
while total_recd < length:
chunk = client.recv(min(2048, length - total_recd))
if chunk == b"":
success = False
break
chunks.append(chunk)
total_recd += len(chunk)
if not success:
client.close()
continue
block = b''.join(chunks)
client.send(b"ack")
result = datanode_instance.create_and_recv_block(blockinfo, block)
elif client_command["command"] == "write_block":
# recv block and return ack signal
length = client_command["block_info"]["length"]
total_recd = 0
chunks = []
success = True
while total_recd < length:
chunk = client.recv(min(2048, length - total_recd))
if chunk == b"":
success = False
break
chunks.append(chunk)
total_recd += len(chunk)
if not success:
client.close()
continue
block = b''.join(chunks)
client.send(b"ack")
result = datanode_instance.recv_block(blockinfo, block)
elif client_command["command"] == "read_block":
result, ret_block = datanode_instance.read_block(blockinfo)
else:
result = {"success": False, "message": "Unknown command"}
# client ready for result message
client.recv(2048)
# send result message
result = bytes(json.dumps(result), encoding=("utf-8"))
client.send(result)
# client recv result message successfully
a = client.recv(2048)
logging.debug(a)
# for read block
if ret_block is not None:
length = len(ret_block)
total_sent = 0
success = True
while total_sent < length:
sent = client.send(ret_block[total_sent:])
if sent == 0:
success = False
break
total_sent += sent
if not success:
client.close()
continue
client.close()