196 lines
8.0 KiB
Python
196 lines
8.0 KiB
Python
from os import listdir, mkdir
|
|
from os.path import join, exists
|
|
from subprocess import call, TimeoutExpired
|
|
from tempfile import TemporaryDirectory
|
|
|
|
from Main.models.progress import Progress
|
|
from Sprint.settings import CONSTS
|
|
from SprintLib.queue import notify, send_to_queue
|
|
from SprintLib.utils import Timer
|
|
|
|
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
|
|
class BaseTester:
|
|
working_directory = "app"
|
|
checker_code = None
|
|
path = ""
|
|
|
|
def before_test(self):
|
|
files = [
|
|
file
|
|
for file in listdir(self.path)
|
|
if file.endswith("." + self.solution.language.file_type)
|
|
]
|
|
code = self.solution.exec_command(
|
|
f'{self.build_command} {" ".join(files)}',
|
|
working_directory=self.working_directory,
|
|
)
|
|
if code != 0:
|
|
raise TestException("CE")
|
|
|
|
def test(self, filename):
|
|
with Timer(self.solution, filename):
|
|
code = self.solution.exec_command(
|
|
f"< {filename} {self.command} > output.txt",
|
|
timeout=self.solution.task.time_limit / 1000,
|
|
)
|
|
if code != 0:
|
|
raise TestException("RE")
|
|
result = open(join(self.path, "output.txt"), "r").read().strip().replace('\r\n', '\n')
|
|
print("got result", result)
|
|
if self.checker_code is not None:
|
|
print('using checker')
|
|
with open(join(self.path, 'expected.txt'), 'w') as fs:
|
|
fs.write(self.predicted)
|
|
with open(join(self.path, 'checker.py'), 'w') as fs:
|
|
fs.write(self.checker_code)
|
|
code = call(f'docker exec -i solution_{self.solution.id}_checker sh -c "cd app && python checker.py"', shell=True, timeout=1)
|
|
if code != 0:
|
|
raise TestException("WA")
|
|
else:
|
|
print('using simple check')
|
|
if result != self.predicted:
|
|
print('incorrect')
|
|
raise TestException("WA")
|
|
print('correct')
|
|
|
|
def after_test(self):
|
|
pass
|
|
|
|
@property
|
|
def command(self):
|
|
return "./executable.exe"
|
|
|
|
@property
|
|
def build_command(self):
|
|
return ""
|
|
|
|
def call(self, command):
|
|
print(f"Executing command: {command}")
|
|
if exists(self.path):
|
|
return call(f'cd {self.path} && {command}', shell=True)
|
|
else:
|
|
return call(command, shell=True)
|
|
|
|
def __init__(self, solution):
|
|
self.solution = solution
|
|
|
|
def save_solution(self):
|
|
self.solution.save()
|
|
|
|
def _setup_networking(self):
|
|
self.call(f"docker network create solution_network_{self.solution.id}")
|
|
for file in self.solution.task.dockerfiles:
|
|
add_name = file.filename[11:]
|
|
with open(join(self.path, 'Dockerfile'), 'w') as fs:
|
|
fs.write(file.text)
|
|
self.call(f"docker build -t solution_image_{self.solution.id}_{add_name} .")
|
|
run_command = f"docker run "\
|
|
f"--hostname {add_name} "\
|
|
f"--network solution_network_{self.solution.id} "\
|
|
f"--name solution_container_{self.solution.id}_{add_name} "\
|
|
f"-t -d solution_image_{self.solution.id}_{add_name}"
|
|
print('run command', run_command)
|
|
self.call(run_command)
|
|
|
|
def notify(self):
|
|
self.solution.user.userinfo.refresh_from_db()
|
|
notify(
|
|
self.solution.user,
|
|
"solution_result",
|
|
f"Задача: {self.solution.task.name}\n"
|
|
f"Результат: {self.solution.result}\n"
|
|
f"Очки решения: {Progress.by_solution(self.solution).score}\n"
|
|
f"Текущий рейтинг: {self.solution.user.userinfo.rating}")
|
|
|
|
def cleanup(self):
|
|
self.solution.save()
|
|
send_to_queue("cleaner", {"type": "container", "name": f"solution_{self.solution.id}"})
|
|
if self.checker_code:
|
|
send_to_queue("cleaner", {"type": "container", "name": f"solution_{self.solution.id}_checker"})
|
|
for file in self.solution.task.dockerfiles:
|
|
add_name = file.filename[11:]
|
|
send_to_queue("cleaner", {"type": "container", "name": f"solution_container_{self.solution.id}_{add_name}"})
|
|
send_to_queue("cleaner", {"type": "image", "name": f"solution_image_{self.solution.id}_{add_name}"})
|
|
send_to_queue("cleaner", {"type": "network", "name": f"solution_network_{self.solution.id}"})
|
|
|
|
def save_progress(self):
|
|
progress = Progress.objects.get(
|
|
user=self.solution.user, task=self.solution.task
|
|
)
|
|
if progress.finished_time is None:
|
|
progress.finished_time = self.solution.time_sent
|
|
progress.finished = True
|
|
progress.save()
|
|
progress.increment_rating()
|
|
|
|
def execute(self):
|
|
self.solution.result = CONSTS["testing_status"]
|
|
self.save_solution()
|
|
with TemporaryDirectory(dir='/tmp') as self.path:
|
|
for file in self.solution.solutionfiles:
|
|
dirs = file.path.split("/")
|
|
for i in range(len(dirs) - 1):
|
|
name = join(
|
|
self.path, "/".join(dirs[: i + 1])
|
|
)
|
|
if not exists(name):
|
|
mkdir(name)
|
|
with open(
|
|
join(self.path, file.path), "wb"
|
|
) as fs:
|
|
fs.write(file.bytes.replace(b"\r\n", b"\n"))
|
|
for file in self.solution.task.extrafiles:
|
|
with open(
|
|
join(self.path, file.filename), 'wb'
|
|
) as fs:
|
|
bts = file.bytes
|
|
fs.write(bts)
|
|
print("Files copied")
|
|
self._setup_networking()
|
|
docker_command = f"docker run --network solution_network_{self.solution.id} --name solution_{self.solution.id} --volume={self.path}:/{self.working_directory} -t -d {self.solution.language.image}"
|
|
print(docker_command)
|
|
call(docker_command, shell=True)
|
|
checker = self.solution.task.checkerfile
|
|
if checker is not None:
|
|
self.checker_code = checker.text
|
|
call(f"docker run --network solution_network_{self.solution.id} --name solution_{self.solution.id}_checker --volume={self.path}:/app -t -d python:3.6", shell=True)
|
|
print("Container created")
|
|
try:
|
|
self.before_test()
|
|
print("before test finished")
|
|
for test in self.solution.task.tests:
|
|
if not test.filename.endswith(".a"):
|
|
self.predicted = open(join(self.path, test.filename + '.a'), 'r').read().strip().replace('\r\n', '\n')
|
|
print('predicted:', self.predicted)
|
|
self.solution.test = int(test.filename)
|
|
self.solution.extras[test.filename] = {'predicted': self.predicted, 'output': ''}
|
|
self.save_solution()
|
|
try:
|
|
self.test(test.filename)
|
|
finally:
|
|
if exists(join(self.path, "output.txt")):
|
|
try:
|
|
self.solution.extras[test.filename]['output'] = open(join(self.path, 'output.txt'), 'r').read()
|
|
except UnicodeDecodeError:
|
|
self.solution.extras[test.filename]['output'] = ''
|
|
self.save_solution()
|
|
self.after_test()
|
|
self.solution.result = CONSTS["ok_status"]
|
|
self.solution.test = None
|
|
self.save_solution()
|
|
self.save_progress()
|
|
except TestException as e:
|
|
self.solution.result = str(e)
|
|
except TimeoutExpired:
|
|
self.solution.result = "TL"
|
|
except Exception as e:
|
|
self.solution.result = "TE"
|
|
print(e)
|
|
self.save_solution()
|
|
self.cleanup()
|
|
self.notify()
|