diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..90030fb --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "rmltk-templates"] + path = rmltk-templates + url = https://github.com/SimonBin/kgc-challenge-tool-template.git diff --git a/bench_executor/collector.py b/bench_executor/collector.py index da101d2..a295bf7 100644 --- a/bench_executor/collector.py +++ b/bench_executor/collector.py @@ -324,7 +324,10 @@ def __init__(self, case_name: str, results_run_path: str, system_os_version = 'UNKNOWN' try: system_os_name = platform.freedesktop_os_release()['NAME'] - system_os_version = platform.freedesktop_os_release()['VERSION'] + try: + system_os_version = platform.freedesktop_os_release()['VERSION_ID'] + except KeyError: + system_os_version = platform.freedesktop_os_release()['VERSION'] except (OSError, KeyError): self._logger.warning('Cannot extract Freedesktop OS release data') system_hostname = platform.node() diff --git a/bench_executor/container.py b/bench_executor/container.py index d947a51..57c28fd 100644 --- a/bench_executor/container.py +++ b/bench_executor/container.py @@ -96,7 +96,7 @@ def name(self) -> str: """The pretty name of the container""" return self._name - def run(self, command: str = '', detach=True) -> bool: + def run(self, command: str = '', *, working_dir=None, detach=True, environment=None) -> bool: """Run the container. This is used for containers which are long running to provide services @@ -107,6 +107,8 @@ def run(self, command: str = '', detach=True) -> bool: command : str The command to execute in the container, optionally and defaults to no command. + working_dir : str + Set a working directory in the container (optional) detach : bool If the container may run in the background, default True. @@ -115,11 +117,33 @@ def run(self, command: str = '', detach=True) -> bool: success : bool Whether running the container was successfull or not. """ - e = self._environment + if environment is None: + environment = {} + + def merge_env(e1, e2): + r = {} + for key in set(e1.keys()).union(e2.keys()): + if key in e2: + in_e1 = key in e1 + is_arr = isinstance(e2[key], list) or (in_e1 and isinstance(e1[key], list)) + if in_e1 and (is_arr or key == "JDK_JAVA_OPTIONS"): + if is_arr: + r[key] = [*e1[key], *e2[key]] + else: + r[key] = f'{e1[key]} {e2[key]}' + else: + r[key] = e2[key] + else: + r[key] = e1[key] + if isinstance(r[key], list): + r[key] = ' '.join(r[key]) + return r + + e = merge_env(self._environment, environment) v = self._volumes self._started, self._container_id = \ self._docker.run(self._container_name, command, self._name, detach, - self._ports, NETWORK_NAME, e, v) + self._ports, NETWORK_NAME, e, v, working_dir) if not self._started: self._logger.error(f'Starting container "{self._name}" failed!') @@ -155,7 +179,7 @@ def exec(self, command: str) -> Tuple[bool, List[str]]: return False, logs - def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool: + def run_and_wait_for_log(self, log_line: str, command: str = '', *, working_dir=None) -> bool: """Run the container and wait for a log line to appear. This blocks until the container's log contains the `log_line`. @@ -167,13 +191,15 @@ def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool: command : str The command to execute in the container, optionally and defaults to no command. + working_dir : str + Set a working directory in the container (optional) Returns ------- success : bool Whether the container exited with status code 0 or not. """ - if not self.run(command): + if not self.run(command, working_dir=working_dir): self._logger.error(f'Command "{command}" failed') return False @@ -212,7 +238,7 @@ def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool: self._logger.error(line) return False - def run_and_wait_for_exit(self, command: str = '') -> bool: + def run_and_wait_for_exit(self, command: str = '', *, working_dir=None, environment=None) -> bool: """Run the container and wait for exit This blocks until the container exit and gives a status code. @@ -222,13 +248,15 @@ def run_and_wait_for_exit(self, command: str = '') -> bool: command : str The command to execute in the container, optionally and defaults to no command. + working_dir : str + Set a working directory in the container (optional) Returns ------- success : bool Whether the container exited with status code 0 or not. """ - if not self.run(command): + if not self.run(command, working_dir=working_dir, environment=environment): return False if self._container_id is None: diff --git a/bench_executor/data/metadata.schema b/bench_executor/data/metadata.schema index 7f9de04..04dc2df 100644 --- a/bench_executor/data/metadata.schema +++ b/bench_executor/data/metadata.schema @@ -16,6 +16,10 @@ "description": "Short description of the case", "type": "string" }, + "global_environment": { + "description": "Variables to set in the environment to pass to the containers", + "type": "object" + }, "steps": { "description": "Short description of the case", "type": "array", diff --git a/bench_executor/docker.py b/bench_executor/docker.py index 5843ce7..3390b3d 100644 --- a/bench_executor/docker.py +++ b/bench_executor/docker.py @@ -9,7 +9,7 @@ import json import subprocess from time import sleep -from typing import List, Tuple +from typing import List, Tuple, Optional from bench_executor.logger import Logger @@ -145,7 +145,7 @@ def pull(self, image: str) -> bool: def run(self, image: str, command: str, name: str, detach: bool, ports: dict, network: str, environment: dict, - volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]: + volumes: List[str], workdir: Optional[str], must_pull: bool = True) -> Tuple[bool, str]: """Start a Docker container. Parameters @@ -166,6 +166,8 @@ def run(self, image: str, command: str, name: str, detach: bool, Environment variables to set. volumes : List[str] Volumes to mount on the container from the host. + workdir : str + Working directory for the container. must_pull: bool Whether the image should be pulled first, default is True. @@ -206,6 +208,8 @@ def run(self, image: str, command: str, name: str, detach: bool, for volume in volumes: cmd += f' -v "{volume}"' cmd += f' --network "{network}"' + if workdir is not None: + cmd += f' --workdir "{workdir}"' cmd += f' {image} {command}' self._logger.debug(f'Starting Docker container: {cmd}') status_code, container_id = subprocess.getstatusoutput(cmd) diff --git a/bench_executor/executor.py b/bench_executor/executor.py index 009207e..e287895 100644 --- a/bench_executor/executor.py +++ b/bench_executor/executor.py @@ -39,7 +39,7 @@ class Executor: """ def __init__(self, main_directory: str, verbose: bool = False, - progress_cb=_progress_cb): + progress_cb=_progress_cb, metadata_filename=METADATA_FILE): """Create an instance of the Executor class. Parameters @@ -51,6 +51,8 @@ def __init__(self, main_directory: str, verbose: bool = False, process_cb : function Callback to call when a step is completed of the case. By default, a dummy callback is provided if the argument is missing. + metadata_filename : str + File name to look for step definitions. By default, metadata.json """ self._main_directory = os.path.abspath(main_directory) self._schema = {} @@ -58,6 +60,7 @@ def __init__(self, main_directory: str, verbose: bool = False, self._class_module_mapping: Dict[str, Any] = {} self._verbose = verbose self._progress_cb = progress_cb + self._metadata_filename = metadata_filename self._logger = Logger(__name__, self._main_directory, self._verbose) self._init_resources() @@ -393,7 +396,8 @@ def run(self, case: dict, interval: float, module = self._class_module_mapping[step['resource']] resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, directory, - self._verbose, False) + self._verbose, False, + environment=data.get('global_environment')) if hasattr(resource, 'initialization'): if not resource.initialization(): self._logger.error('Failed to initialize resource ' @@ -416,7 +420,8 @@ def run(self, case: dict, interval: float, resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, directory, self._verbose, - expect_failure) + expect_failure, + environment=data.get('global_environment')) active_resources.append(resource) # Containers may need to start up first before executing a command @@ -553,7 +558,7 @@ def list(self) -> list: for directory in glob(self._main_directory): for root, dirs, files in os.walk(directory): for file in files: - if os.path.basename(file) == METADATA_FILE: + if os.path.basename(file) == self._metadata_filename: path = os.path.join(root, file) with open(path, 'r') as f: data = json.load(f) diff --git a/bench_executor/mysql.py b/bench_executor/mysql.py index faf5f16..1cf3d3e 100644 --- a/bench_executor/mysql.py +++ b/bench_executor/mysql.py @@ -29,7 +29,7 @@ class MySQL(Container): """MySQL container for executing SQL queries.""" def __init__(self, data_path: str, config_path: str, directory: str, - verbose: bool, expect_failure: bool = False): + verbose: bool, expect_failure: bool = False, environment=None): """Creates an instance of the MySQL class. Parameters @@ -44,6 +44,8 @@ def __init__(self, data_path: str, config_path: str, directory: str, Enable verbose logs. expect_failure : bool If we expect a failure or not. + environment : dict + Additional environment variables to use in the container. """ self._data_path = os.path.abspath(data_path) self._config_path = os.path.abspath(config_path) @@ -54,11 +56,14 @@ def __init__(self, data_path: str, config_path: str, directory: str, os.makedirs(tmp_dir, exist_ok=True) os.makedirs(os.path.join(self._data_path, 'mysql'), exist_ok=True) + if environment is None: + environment = {} super().__init__(f'kgconstruct/mysql:v{VERSION}', 'MySQL', self._logger, expect_failure=expect_failure, ports={PORT: PORT}, - environment={'MYSQL_ROOT_PASSWORD': 'root', + environment={**environment, + 'MYSQL_ROOT_PASSWORD': 'root', 'MYSQL_DATABASE': 'db'}, volumes=[f'{self._data_path}/shared/:/data/shared', f'{self._config_path}/mysql/' diff --git a/bench_executor/postgresql.py b/bench_executor/postgresql.py index 5710212..8b6c6d5 100644 --- a/bench_executor/postgresql.py +++ b/bench_executor/postgresql.py @@ -31,7 +31,7 @@ class PostgreSQL(Container): """PostgreSQL container for executing SQL queries""" def __init__(self, data_path: str, config_path: str, directory: str, - verbose: bool, expect_failure: bool = False): + verbose: bool, expect_failure: bool = False, environment=None): """Creates an instance of the PostgreSQL class. Parameters @@ -46,6 +46,8 @@ def __init__(self, data_path: str, config_path: str, directory: str, Enable verbose logs. expect_failure : bool If a failure is expected. + environment : dict + Additional environment variables to use in the container. """ self._data_path = os.path.abspath(data_path) self._config_path = os.path.abspath(config_path) @@ -57,10 +59,13 @@ def __init__(self, data_path: str, config_path: str, directory: str, os.makedirs(os.path.join(self._data_path, 'postgresql'), exist_ok=True) self._tables: List[str] = [] + if environment is None: + environment = {} super().__init__(f'blindreviewing/postgresql:v{VERSION}', 'PostgreSQL', self._logger, ports={PORT: PORT}, - environment={'POSTGRES_PASSWORD': PASSWORD, + environment={**environment, + 'POSTGRES_PASSWORD': PASSWORD, 'POSTGRES_USER': USER, 'POSTGRES_DB': DB, 'PGPASSWORD': PASSWORD, diff --git a/bench_executor/query.py b/bench_executor/query.py index 70ba141..f1310fd 100644 --- a/bench_executor/query.py +++ b/bench_executor/query.py @@ -18,7 +18,7 @@ class Query(): """Execute a query on a SPARQL endpoint.""" def __init__(self, data_path: str, config_path: str, directory: str, - verbose: bool, expect_failure: bool = False): + verbose: bool, expect_failure: bool = False, environment=None): """Creates an instance of the Query class. Parameters diff --git a/bench_executor/rmlmapper.py b/bench_executor/rmlmapper.py index 8367485..9e862ec 100644 --- a/bench_executor/rmlmapper.py +++ b/bench_executor/rmlmapper.py @@ -23,7 +23,7 @@ class RMLMapper(Container): """RMLMapper container for executing R2RML and RML mappings.""" def __init__(self, data_path: str, config_path: str, directory: str, - verbose: bool, expect_failure: bool = False): + verbose: bool, expect_failure: bool = False, environment=None): """Creates an instance of the RMLMapper class. Parameters @@ -38,6 +38,8 @@ def __init__(self, data_path: str, config_path: str, directory: str, Enable verbose logs. expect_failure : bool If a failure is expected, default False. + environment : dict + Additional environment variables to use in the container. """ self._data_path = os.path.abspath(data_path) self._config_path = os.path.abspath(config_path) @@ -47,6 +49,7 @@ def __init__(self, data_path: str, config_path: str, directory: str, os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True) super().__init__(f'kgconstruct/rmlmapper:v{VERSION}', 'RMLMapper', self._logger, expect_failure=expect_failure, + environment=environment, volumes=[f'{self._data_path}/rmlmapper:/data', f'{self._data_path}/shared:/data/shared']) diff --git a/bench_executor/rpt.py b/bench_executor/rpt.py new file mode 100644 index 0000000..68eecec --- /dev/null +++ b/bench_executor/rpt.py @@ -0,0 +1,84 @@ +""" +RPT is a general purpose RDF tool + +**Website**: https://github.com/SmartDataAnalytics/RdfProcessingToolkit + +""" + +VERSION='1.9.8-SNAPSHOT' +TIMEOUT = 6 * 3600 # 6 hours + +import os +import shlex +from timeout_decorator import timeout, TimeoutError # type: ignore +from bench_executor.container import Container +from bench_executor.logger import Logger + + +class Rpt(Container): + """RPT container for executing rmltk, sansa etc.""" + + _INSTANCES = 0 + + def __init__(self, data_path: str, config_path: str, directory: str, + verbose: bool, expect_failure: bool = False, environment=None): + self._instance = Rpt._INSTANCES + Rpt._INSTANCES = Rpt._INSTANCES + 1 + + if environment is None: + environment = {} + self._data_path = os.path.abspath(data_path) + self._config_path = os.path.abspath(config_path) + self._logger = Logger(__name__ + '.' + str(self._instance), directory, verbose) + self._verbose = verbose + + os.makedirs(os.path.join(self._data_path, 'rpt'), exist_ok=True) + super().__init__(f'aksw/rpt:{VERSION}', 'rpt-kgcc', + self._logger, expect_failure=expect_failure, + environment=environment, + volumes=[f'{self._data_path}/rpt:/data', + f'{self._data_path}/shared:/data/shared', + f'{self._data_path}/tmp:/tmp']) + + @timeout(TIMEOUT) + def _execute_with_timeout(self, arguments: list, *, working_dir=None, environment=None) -> bool: + """Execute a mapping with a provided timeout. + + Returns + ------- + success : bool + Whether the execution was successfull or not. + """ + return self.run_and_wait_for_exit(' '.join(map(shlex.quote, arguments)), + working_dir=working_dir, + environment=environment) + + def execute(self, command, arguments=None, environment=None, working_dir='/data/shared') -> bool: + """Execute rpt with given arguments. + + Parameters + ---------- + command : str + Command to run + arguments : list + Arguments to supply to rpt. + + Returns + ------- + success : bool + Whether the execution succeeded or not. + """ + if arguments is None: + arguments = [] + self._logger.debug(f'{self._instance}: Calling rpt {command} with {arguments!r} and {environment!r}') + try: + result = self._execute_with_timeout([*command.split(' '), *arguments], + working_dir=working_dir, + environment=environment) + self.stop() + return result + except TimeoutError: + msg = f'{self._instance}: Timeout ({TIMEOUT}s) reached for rpt' + self._logger.warning(msg) + + return False diff --git a/bench_executor/validate.py b/bench_executor/validate.py index 52ae702..1f91f36 100644 --- a/bench_executor/validate.py +++ b/bench_executor/validate.py @@ -13,7 +13,7 @@ class Validate(): """Validate the RDF graph by comparing it with an expected graph""" def __init__(self, data_path: str, config_path: str, directory: str, - verbose: bool, expect_failure: bool): + verbose: bool, expect_failure: bool, environment=None): """Creates an instance of the Validate class. Parameters diff --git a/bench_executor/virtuoso.py b/bench_executor/virtuoso.py index 8db5f7e..7e8f21f 100644 --- a/bench_executor/virtuoso.py +++ b/bench_executor/virtuoso.py @@ -43,7 +43,7 @@ class Virtuoso(Container): """Virtuoso container to execute SPARQL queries""" def __init__(self, data_path: str, config_path: str, directory: str, - verbose: bool, expect_failure: bool = False): + verbose: bool, expect_failure: bool = False, environment=None): """Creates an instance of the Virtuoso class. Parameters @@ -58,6 +58,8 @@ def __init__(self, data_path: str, config_path: str, directory: str, Enable verbose logs. expect_failure : bool If a failure is expected, default False. + environment : dict + Additional environment variables to use in the container. """ self._data_path = os.path.abspath(data_path) self._config_path = os.path.abspath(config_path) @@ -71,7 +73,10 @@ def __init__(self, data_path: str, config_path: str, directory: str, * NUMBER_OF_BUFFERS_PER_GB) max_dirty_buffers = int(psutil.virtual_memory().total / (10**9) * MAX_DIRTY_BUFFERS_PER_GB) - environment = {'DBA_PASSWORD': PASSWORD, + if environment is None: + environment = {} + environment = {**environment, + 'DBA_PASSWORD': PASSWORD, 'VIRT_SPARQL_ResultSetMaxRows': MAX_ROWS, 'VIRT_SPARQL_MaxQueryExecutionTime': QUERY_TIMEOUT, 'VIRT_SPARQL_ExecutionTimeout': QUERY_TIMEOUT, diff --git a/exectool b/exectool index 76012d3..6e1f4fe 100755 --- a/exectool +++ b/exectool @@ -23,7 +23,7 @@ EXIT_CODE_INTERRUPTED = -7 EXIT_CODE_DOWNLOAD_FAILURE = -8 KGC_CHALLENGE_2023_URL = 'https://zenodo.org/record/7689310/files/challenge.tar.gz?download=1' # noqa: E501 KGC_CHALLENGE_2023_FILE_NAME = 'eswc-kgc-challenge-2023.tar.gz' -KGC_CHALLENGE_2024_URL = 'https://zenodo.org/record/10721875/files/challenge.tar.gz?download=1' # noqa: E501 +KGC_CHALLENGE_2024_URL = 'https://zenodo.org/record/10973433/files/challenge.tar.gz?download=1' # noqa: E501 KGC_CHALLENGE_2024_FILE_NAME = 'eswc-kgc-challenge-2024.tar.gz' DOWNLOAD_DIR = 'downloads' CHUNK_SIZE = 8192 @@ -416,6 +416,10 @@ if __name__ == '__main__': parser.add_argument('--wait-for-user', dest='wait_for_user', help='Show a prompt when a step is executed before ' 'going to the next one', action='store_true') + parser.add_argument('--metadata', dest='metadata_filename', default='metadata.json', + help='File name with pipeline steps, ' + 'defaults to metadata.json (RMLMapper sample pipeline)', + type=str) args = parser.parse_args() # Resolve path @@ -433,6 +437,7 @@ if __name__ == '__main__': print(f'{parser.prog} {VERSION}') print(f'Command: {args.command}') print(f'Root directory: {main_directory}') + print(f'Metadata filename: {args.metadata_filename}') print(f'Verbose enabled: {args.verbose}') print(f'Number of runs: {args.number_of_runs}') print(f'Measurement sample interval: {args.interval}s') @@ -459,7 +464,7 @@ if __name__ == '__main__': except FileNotFoundError: pass - e = Executor(main_directory, verbose=args.verbose, progress_cb=progress_cb) + e = Executor(main_directory, verbose=args.verbose, progress_cb=progress_cb, metadata_filename=args.metadata_filename) if args.command == 'list': print_cases(e) diff --git a/requirements.txt b/requirements.txt index 0e05c9e..fe439b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ psutil requests rdflib timeout-decorator +cryptography +pyyaml diff --git a/rmltk-templates b/rmltk-templates new file mode 160000 index 0000000..c1158c2 --- /dev/null +++ b/rmltk-templates @@ -0,0 +1 @@ +Subproject commit c1158c23e475bb9e6a453588b732a93a58bae896