forked from achow101/forkmon
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode_updates.py
More file actions
317 lines (271 loc) · 12.9 KB
/
node_updates.py
File metadata and controls
317 lines (271 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import logging
from django.utils import timezone
from django.utils.crypto import get_random_string
import requests
import os
import datetime
from .models import *
logger = logging.getLogger("forkmon.task")
def update_nodes():
update_id = get_random_string(length=10)
logger.info("Update ID: " + update_id + " - Beginning update at " + str(datetime.datetime.now()))
# Retrieve the database lock
lock = UpdateLock.objects.all().first()
lock_version = lock.version
# Check that the lock is currently not in use
if lock.in_use:
logger.info("Update ID: " + update_id + " - Database in use, exiting at " + str(datetime.datetime.now()))
return
# Update the lock
locked = UpdateLock.objects.filter(version=lock_version).update(in_use = True, version=lock_version + 1)
# exit if 0 objects were updated as that means someone was locking at the same time
if locked == 0:
logger.info("Update ID: " + update_id + " - Database lock version was updated by another process, exiting at " + str(datetime.datetime.now()))
return
# update in-db chain for each node
nodes = Node.objects.all()
for node in nodes:
# try except statements for catching any errors that come from the requests. if there is an error, just skip
# the node and continue
try:
url = node.url
# get best block hash
r = requests.post(url, data='{"method": "getbestblockhash", "params": [] }',
auth=(os.environ['RPC_USER'], os.environ['RPC_PASSWORD']))
if r.status_code != 200:
continue
rj = r.json()
best_block = rj['result']
# Get the block header for best block hash
r = requests.post(url, data='{"method": "getblockheader", "params": ["'+ best_block + '"] }',
auth=(os.environ['RPC_USER'], os.environ['RPC_PASSWORD']))
if r.status_code != 200:
continue
rj = r.json()
header = rj['result']
prev = header['previousblockhash']
height = header['height']
hash = header['hash']
# Update node's MTP
node.mtp = datetime.datetime.fromtimestamp(header['mediantime'], timezone.utc)
node.best_block_time = datetime.datetime.fromtimestamp(header['time'], timezone.utc)
# Update node's chainwork and difficulty
node.difficulty = header['difficulty']
node.chainwork = header['chainwork']
# check that this node's current top block is this block or the previous block
blocks = Block.objects.all().filter(node=node, active=True).order_by("-height")
# If there is no blockchain, add the first block
if not blocks:
Block(hash=hash, height=height, node=node).save()
node.best_block_hash = hash
node.best_block_height = height
node.prev_block_hash = prev
# same block
elif blocks[0].hash == hash:
node.best_block_hash = hash
node.best_block_height = height
node.prev_block_hash = prev
# different block
# next block: prev hash matches
elif prev == blocks[0].hash:
# Add block to db
Block(hash=hash, height=height, prev=blocks[0], node=node).save()
node.best_block_hash = hash
node.best_block_height = height
node.prev_block_hash = prev
# otherwise need to reorg
else:
# node's height is ahead
blocks_to_add = [hash]
i = 0
# walk backwards until node height matches db height
while height > blocks[i].height:
r = requests.post(url, data='{"method": "getblockheader", "params": ["' + prev + '"] }',
auth=(os.environ['RPC_USER'], os.environ['RPC_PASSWORD']))
if r.status_code != 200:
continue
rj = r.json()
header = rj['result']
prev = header['previousblockhash']
hash = header['hash']
height = header['height']
if height > blocks[i].height:
blocks_to_add.append(hash)
# walk down db chain until node height matches
deactivated = 0
while blocks[i].height > height:
# deactivate the block here
block = blocks[i]
block.active = False
block.save()
deactivated += 1
# increment
i += 1
# now DB and node are at same height, walk backwards through both to find common ancestor
while blocks[i].hash != hash:
# deactivate the block here
block = blocks[i]
block.active = False
block.save()
deactivated += 1
# increment
i += 1
# Add this hash to add
blocks_to_add.append(hash)
# get block from node
r = requests.post(url, data='{"method": "getblockheader", "params": ["' + prev + '"] }',
auth=(os.environ['RPC_USER'], os.environ['RPC_PASSWORD']))
if r.status_code != 200:
continue
rj = r.json()
header = rj['result']
prev = header['previousblockhash']
hash = header['hash']
# at common ancestor
# now add new blocks
prev_block = blocks[i]
for hash in blocks_to_add[::-1]:
block = Block(hash=hash, height=prev_block.height+1, node=node, active=True, prev=prev_block)
block.save()
node.best_block_hash = block.hash
node.best_block_height = block.height
node.prev_block_hash = prev_block.hash
prev_block = block
# update node's tip and if it has reorged
node.has_reorged = node.has_reorged or deactivated > 2 # only reorged if reorg was greater than 2 blocks
# Gather stats from stats node
if node.stats_node:
r = requests.post(url, data='{"method": "getblockchaininfo", "params": [] }',
auth=(os.environ['RPC_USER'], os.environ['RPC_PASSWORD']))
if r.status_code != 200:
continue
rj = r.json()
forks = rj['result']['bip9_softforks']
current = rj['result']['blocks']
for name, info in forks.items():
# Get status
state = info['status']
# Get the fork from the database
db_forks = BIP9Fork.objects.all().filter(name=name)
# skip if state is active or defined and was not already in the db
if (state == "active" or state == 'defined') and not db_forks:
continue
# Only get stats if started
if state == 'started':
# Get statistics
period = info['statistics']['period']
threshold = info['statistics']['threshold']
elapsed = info['statistics']['elapsed']
count = info['statistics']['count']
# If the fork does not exist, make it
if not db_forks:
BIP9Fork(name=name, state=state, period=period, threshold=threshold, elapsed=elapsed, count=count).save()
# otherwise update it
else:
fork = db_forks[0]
fork.elapsed = elapsed
fork.count = count
fork.state = state
fork.current = current
fork.since = info['since']
fork.save()
else:
fork = db_forks[0]
fork.state = state
fork.since = info['since']
fork.current = current
fork.save()
# mark as up and save
node.is_up = True
node.save()
except Exception as e:
print(e)
# mark that node is currently down
node.is_up = False
node.save()
continue
# now that nodes are updated, check for chain splits
nodes = Node.objects.all()
has_split = False
no_split = True
for node in nodes:
blockchain = Block.objects.all().filter(node=node, active=True).order_by("-height")
# skip if there is no blockchain for some reason or the node is down
if not blockchain or not node.is_up:
continue
for cmp_node in nodes:
# don't compare to self
if node == cmp_node:
continue
# check top block is same
if node.best_block_hash == cmp_node.best_block_hash:
node.is_behind = False
continue
# top block hashes are not the same. find if the divergence is within the past 6 blocks
# once the block is found, it will be saved until a new divergence is found
cmp_it = 0
it = 0
diverged = 0
cmp_blockchain = Block.objects.all().filter(node=cmp_node, active=True).order_by("-height")
# skip if there is no blockchain for some reason or if the node is down
if not cmp_blockchain or not cmp_node.is_up:
continue
# If the two nodes have mtp forks, skip their comparison
if cmp_node.mtp_fork and node.mtp_fork and cmp_node.mtp > cmp_node.mtp_fork.activation_time and node.mtp > node.mtp_fork.activation_time:
continue
no_split = False
# get these to matching heights
while cmp_blockchain[cmp_it].height > blockchain[it].height and diverged <= 100:
cmp_it += 1
diverged += 1
while blockchain[it].height > cmp_blockchain[cmp_it].height and diverged <= 100:
it += 1
diverged += 1
# walk down both chains until common ancestor found
while blockchain[it].hash != cmp_blockchain[cmp_it].hash and diverged <= 100:
cmp_it += 1
it += 1
diverged += 1
# updated diverged block if within the last 6
if it > 0 and cmp_it > 0 and blockchain[it].hash == cmp_blockchain[cmp_it].hash and blockchain[it - 1].hash != cmp_blockchain[cmp_it - 1].hash:
if blockchain[it - 1].height > node.highest_divergence and diverged > 1:
node.highest_divergence = blockchain[it - 1].height
node.highest_diverged_hash = blockchain[it - 1].hash
node.common_ancestor_hash = blockchain[it].hash
node.common_ancestor_height = blockchain[it].height
# Normal split detected, mark as such
if diverged > 1:
# Only mark node has having MTP forked if node's mtp is past the mtp fork time
if node.mtp_fork and node.mtp > node.mtp_fork.activation_time:
node.sched_forked = True
# If the cmp_node had an mtp fork, ignore this divergence.
elif cmp_node.mtp_fork and cmp_node.mtp > cmp_node.mtp_fork.activation_time:
pass
# Otherwise this is a chain split
else:
has_split = True
if it - 1 < 0:
node.is_behind = True
else:
node.is_behind = False
node.save()
# Update fork state if split detected
states = ForkState.objects.all()
if not states:
ForkState().save()
if has_split:
state = ForkState.objects.all()[0]
state.has_forked = True
state.is_currently_forked = True
state.save()
if no_split:
state = ForkState.objects.all()[0]
state.is_currently_forked = False
state.save()
# reset node stuff
for node in nodes:
node.highest_divergence = 0
node.save()
# release database lock
UpdateLock.objects.filter(version=lock_version + 1).update(in_use = False, version=lock_version + 2)
logger.info("Update ID: " + update_id + " - Completed at " + str(datetime.datetime.now()))