-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproblemManager.py
More file actions
99 lines (79 loc) · 3.28 KB
/
problemManager.py
File metadata and controls
99 lines (79 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import subprocess
import sys
import multiprocessing
import time
import os
import json
class ProblemManager:
def __init__(self, problemNumber, studentCode = "", username="jb2396"):
self.q = multiprocessing.Queue()
self.queue = []
self.file = "./files/swapFiles/temp_problem_" + str(problemNumber) + "_" + username
self.problemNumber = str(problemNumber)
self.studentCode = studentCode
def correctSyntax(self):
if subprocess.run(["pyflakes", self.file], capture_output=True).returncode != 0:
return False
return True
def testSyntax(self):
with open(self.file, "w") as writer:
writer.write("import sys\nimport json\n")
writer.write(self.studentCode)
writer.write("\nprint(check(*json.loads(sys.argv[1])))")
if self.correctSyntax() == True:
memory, cpu = self.runMultiThread()
os.remove(self.file)
if len(self.queue):
return memory, cpu, sorted(self.queue, key = lambda x: x[0])
return False
def getInfo(self):
first2Lines = open("/proc/meminfo", "r")
return int(first2Lines.readline().split()[1]) - int(first2Lines.readline().split()[1])
def testCode(self, idx, data, testCase):
### data -> answer, memoryLimit
### testCase -> input: list always
data = data.split()
_ans = data[0]
_timeout = int(data[1])
try:
response = subprocess.run(["python3", self.file, json.dumps(testCase)], capture_output=True, timeout=_timeout).stdout.decode().strip()
print(response, _ans)
except:
response = "Timed Out"
if response == "Timed Out":
self.q.put([idx, response])
else:
self.q.put([idx, response == _ans])
def runMultiThread(self):
baseline = self.getInfo()
mem = []
startTime = time.time()
with open("./files/problems/problem_" + self.problemNumber) as f:
answers = f.read().strip().split("\n")
with open("./files/testCases/testCase_" + self.problemNumber) as f:
testCases = json.loads(f.read())
for idx in range(len(answers)):
exec('p{a} = multiprocessing.Process(target = {b}, args = {c})'.format(a = str(idx), b='self.testCode', c = (idx, answers[idx], testCases[idx],)))
exec('p{a}.start()'.format(a=idx))
for idx in range(len(answers)):
exec('p{a}.join()'.format(a=idx))
mem.append(abs(self.getInfo()-baseline))
while self.q.empty() is False:
self.queue.append(self.q.get())
return sum(mem)/len(mem), time.time() - startTime
def isCorrect(self):
result = self.testSyntax()
if result == False:
return False
for caseIdx in range(len(result[-1])):
if result[-1][caseIdx][1] == False:
# Returns testCase that fails
return caseIdx
# Returns (memoryLoad, cpuLoad)
return result[:-1]
#test = ProblemManager(sys.argv[1], sys.argv[2])
with open(sys.argv[2]) as f:
code = f.read()
test = ProblemManager(sys.argv[1], code)
print(test.testSyntax())
print(test.isCorrect())