-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprogram.py
More file actions
159 lines (132 loc) · 5.36 KB
/
program.py
File metadata and controls
159 lines (132 loc) · 5.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from antlr4 import InputStream, CommonTokenStream
from detectors.language.SolidityParser import SolidityParser
from detectors.language.SolidityLexer import SolidityLexer
from detectors.language.SolidityVisitor import SolidityVisitor
from detectors import reentrancy, statement_checker, uncheckedCall, selfdestruct, versions, integer, timestamp, denialOfService, require, dashboard
from utils import report, front, openAI
import subprocess
import os
import time
import threading
def add_findings(findings, f):
# Retrieve the recommendations from the detectors
for find in f:
findings.append(find)
return findings
def get_recommendations(contractPath, findings, codefunction, chatbot):
recommendations = []
for find in findings:
start, end = find[3][0], find[3][1]
with open(contractPath, "r") as f:
function = f.readlines()[start-1:end]
codefunction.append(''.join(function))
if(chatbot):
recommendations.append(openAI.get_vulnerability_recommendation(function, find))
if not chatbot:
recommendations = None
print(recommendations)
return recommendations, codefunction
def parse_require(contractPath, require):
requires = []
for value in require.values():
if(len(value) >= 3):
print("There's a require appearing 3 or more times")
with open(contractPath, "r") as f:
line = f.readlines()[value[0]-1]
requires.append([line, value])
return requires
def main():
contractPath, chatbot = front.main()
#contractPath = "./contracts/all.sol"
#chatbot = False
if(contractPath == None):
return
start_time = time.time()
with open(contractPath, "r") as file:
input_code = file.read()
# Create a lexer and parser for Solidity
input_stream = InputStream(input_code)
lexer = SolidityLexer(input_stream)
token_stream = CommonTokenStream(lexer)
parser = SolidityParser(token_stream)
# Generate the abstract syntax tree (AST)
tree = parser.sourceUnit()
def reentrancy_visitor():
visitor = reentrancy.SolidityCallVisitor()
visitor.visit(tree)
def unchecked_call_visitor():
visitor = uncheckedCall.SolidityCallVisitor()
visitor.visit(tree)
def selfdestruct_visitor():
visitor = selfdestruct.SolidityCallVisitor()
visitor.visit(tree)
def versions_visitor():
visitor = versions.SolidityCallVisitor()
visitor.visit(tree)
def integer_visitor():
visitor = integer.SolidityCallVisitor()
visitor.visit(tree)
def timestamp_visitor():
visitor = timestamp.SolidityCallVisitor()
visitor.visit(tree)
def statement_checker_visitor():
visitor = statement_checker.SolidityCallVisitor()
visitor.visit(tree)
def denial_of_service_visitor():
visitor = denialOfService.SolidityCallVisitor()
visitor.visit(tree)
def require_visitor():
visitor = require.SolidityCallVisitor()
visitor.visit(tree)
#Threading for the detectors
threading.Thread(target=reentrancy_visitor).start()
threading.Thread(target=unchecked_call_visitor).start()
threading.Thread(target=selfdestruct_visitor).start()
threading.Thread(target=versions_visitor).start()
threading.Thread(target=integer_visitor).start()
threading.Thread(target=timestamp_visitor).start()
threading.Thread(target=statement_checker_visitor).start()
threading.Thread(target=denial_of_service_visitor).start()
threading.Thread(target=require_visitor).start()
# Wait for the threads to finish
while threading.active_count() > 1:
time.sleep(0.0001)
findings = add_findings([], reentrancy.findings)
findings = add_findings(findings, uncheckedCall.findings)
findings = add_findings(findings, selfdestruct.findings)
findings = add_findings(findings, versions.findings)
findings = add_findings(findings, integer.findings)
findings = add_findings(findings, timestamp.findings)
findings = add_findings(findings, statement_checker.findings)
findings = add_findings(findings, denialOfService.findings)
require_dict = require.require_dict
execution_time = time.time() - start_time
print("Findings: " + str(findings))
print("Requires: " + str(require_dict))
print("Generating dashboard...")
visitor = dashboard.SolidityCallVisitor()
visitor.visit(tree)
requires = parse_require(contractPath, require_dict)
print(requires)
print(len(requires))
print("--- %s seconds ---" % (execution_time))
# Get dashboard data
data = dashboard.get_data()
data.append([len(require_dict), len(requires)])
data.append(len(findings))
print("ALL DATA: " + str(data))
# Data = [lines, contracts, functions, emits, [requires, requires_repeated], findings
recommendations, fun = get_recommendations(contractPath, findings, [], chatbot)
report.generate_report(findings, recommendations, fun, requires, data)
print(len(findings))
print("Opening report...")
url = "report.html"
try: # should work on Windows
os.startfile(url)
except AttributeError:
try: # should work on MacOS and most linux versions
subprocess.call(['open', url])
except Exception as e:
print('Could not open URL')
if __name__ == "__main__":
main()