diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cf782c4..233b9cb 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -12,8 +12,6 @@ jobs: format: # check formatting with ruff runs-on: ubuntu-latest - # temporarily disable this job - if: false steps: - uses: actions/checkout@v5 diff --git a/dtrx/dtrx.py b/dtrx/dtrx.py index 9bb11e3..00d73b2 100755 --- a/dtrx/dtrx.py +++ b/dtrx/dtrx.py @@ -34,7 +34,6 @@ import shutil import signal import stat -import string import struct import sys import tempfile @@ -225,9 +224,7 @@ def __init__(self, filename, encoding): # option based on what's supported, since this behavior changed if encoding in ("lrzip", "lrz"): # need to check if this version of lrzip supports the -Q option - output = subprocess.check_output( - "lrzip --help", stderr=subprocess.STDOUT, shell=True - ) + output = subprocess.check_output("lrzip --help", stderr=subprocess.STDOUT, shell=True) if b"-Q" in output: decoder = ["lrzcat", "-Q"] else: @@ -266,9 +263,7 @@ def add_process(self, processes, command, stdin, stdout): try: logger.debug("running command: {}".format(command)) processes.append( - subprocess.Popen( - command, stdin=stdin, stdout=stdout, stderr=subprocess.PIPE - ) + subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=subprocess.PIPE) ) except OSError as error: if error.errno == errno.ENOENT: @@ -302,9 +297,7 @@ def send_stdout_to_dev_null(self): return True def run_pipes(self, final_stdout=None): - has_output_target = ( - True if final_stdout or self.send_stdout_to_dev_null() else False - ) + has_output_target = True if final_stdout or self.send_stdout_to_dev_null() else False if not self.pipes: return elif final_stdout is None: @@ -345,9 +338,9 @@ def check_included_archives(self): self.file_count += len(filenames) path = path[start_index:] for filename in filenames: - if ExtractorBuilder.try_by_mimetype( + if ExtractorBuilder.try_by_mimetype(filename) or ExtractorBuilder.try_by_extension( filename - ) or ExtractorBuilder.try_by_extension(filename): + ): self.included_archives.append(os.path.join(path, filename)) def check_contents(self): @@ -403,12 +396,8 @@ def first_bad_exit_code(self): def check_success(self, got_files): error_index, error_code = self.first_bad_exit_code() - logger.debug( - "success results: %s %s %s" % (got_files, error_index, self.exit_codes) - ) - if self.is_fatal_error(error_code) or ( - (not got_files) and (error_code is not None) - ): + logger.debug("success results: %s %s %s" % (got_files, error_index, self.exit_codes)) + if self.is_fatal_error(error_code) or ((not got_files) and (error_code is not None)): command = " ".join(self.pipes[error_index][0]) self.pw_prompt = False # Don't silently fail with wrong password raise ExtractorError( @@ -486,9 +475,7 @@ def get_filenames(self): # compression extensions, even if those files shouldn't actually be # handled this way. So, we call out to the file command to do a quick # check and make sure this actually looks like a compressed file. - if "compress" not in [ - match[0] for match in ExtractorBuilder.try_by_magic(self.filename) - ]: + if "compress" not in [match[0] for match in ExtractorBuilder.try_by_magic(self.filename)]: raise ExtractorError("doesn't look like a compressed file") yield self.basename() @@ -573,9 +560,7 @@ def prepare(self): encoding = mimetypes.guess_type(data_filename)[1] if not encoding: raise ExtractorError("data.tar file has unrecognized encoding") - self.pipe( - ["ar", "p", self.filename, data_filename], "extracting data.tar from .deb" - ) + self.pipe(["ar", "p", self.filename, data_filename], "extracting data.tar from .deb") self.pipe(self.decoders[encoding], "decoding data.tar") def basename(self): @@ -594,9 +579,7 @@ def check_contents(self): class DebMetadataExtractor(DebExtractor): def prepare(self): - self.pipe( - ["ar", "p", self.filename, "control.tar.gz"], "control.tar.gz extraction" - ) + self.pipe(["ar", "p", self.filename, "control.tar.gz"], "control.tar.gz extraction") self.pipe(["zcat"], "control.tar.gz decompression") @@ -646,9 +629,7 @@ def extract_archive(self): extract_fmt_args = { "OUTPUT_FILE": os.path.splitext(os.path.basename(self.filename))[0], } - formatted_extract_commands = [ - x.format(**extract_fmt_args) for x in self.extract_command - ] + formatted_extract_commands = [x.format(**extract_fmt_args) for x in self.extract_command] self.extract_pipe = formatted_extract_commands + [self.filename] BaseExtractor.extract_archive(self) @@ -722,7 +703,6 @@ class SevenExtractor(NoPipeExtractor): file_type = "7z file" list_command = ["7z", "l", "-ba"] border_re = re.compile("^[- ]+$") - extract_command = ["7z", "x"] space_re = re.compile(" ") @property @@ -927,19 +907,17 @@ def __init__(self, extractor, options): def handle(self): command = "find" - status = subprocess.call( - [ - "find", - self.extractor.target, - "-type", - "d", - "-exec", - "chmod", - "u+rwx", - "{}", - ";", - ] - ) + status = subprocess.call([ + "find", + self.extractor.target, + "-type", + "d", + "-exec", + "chmod", + "u+rwx", + "{}", + ";", + ]) if status == 0: command = "chmod" status = subprocess.call(["chmod", "-R", "u+rwX", self.extractor.target]) @@ -950,9 +928,7 @@ def handle(self): def set_target(self, target, checker): self.target = checker(target).check() if self.target != target: - logger.warning( - "extracting %s to %s" % (self.extractor.filename, self.target) - ) + logger.warning("extracting %s to %s" % (self.extractor.filename, self.target)) # The "where to extract" table, with options and archive types. @@ -983,9 +959,7 @@ def organize(self): if not os.path.isdir(newdir): os.makedirs(newdir) for filename in filenames: - os.rename( - os.path.join(curdir, filename), os.path.join(newdir, filename) - ) + os.rename(os.path.join(curdir, filename), os.path.join(newdir, filename)) os.rmdir(curdir) @@ -1011,9 +985,7 @@ def can_handle(contents, options): ) def organize(self): - source = os.path.join( - self.extractor.target, os.listdir(self.extractor.target)[0] - ) + source = os.path.join(self.extractor.target, os.listdir(self.extractor.target)[0]) if os.path.isdir(source): checker = DirectoryChecker else: @@ -1059,9 +1031,7 @@ def organize(self): @total_ordering class BasePolicy(object): try: - size = fcntl.ioctl( - sys.stdout.fileno(), termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0) - ) + size = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)) width = struct.unpack("HHHH", size)[1] except IOError: width = 80 @@ -1155,9 +1125,7 @@ def prep(self, archive_filename, extractor): question.append(" Expected: " + extractor.basename()) question.append(" Actual: " + extractor.content_name) choice_vars = (extractor.content_type, extractor.basename()) - self.choices = [ - text % choice_vars[: text.count("%s")] for text in self.choice_template - ] + self.choices = [text % choice_vars[: text.count("%s")] for text in self.choice_template] self.current_policy = self.permanent_policy or self.ask_question(question) def ok_for_match(self): @@ -1191,9 +1159,7 @@ def __init__(self, options): def prep(self, current_filename, target, extractor): archive_count = len(extractor.included_archives) - if (self.permanent_policy is not None) or ( - (archive_count * 10) <= extractor.file_count - ): + if (self.permanent_policy is not None) or ((archive_count * 10) <= extractor.file_count): self.current_policy = self.permanent_policy or RECURSE_NOT_NOW return question = self.wrap( @@ -1213,12 +1179,10 @@ def prep(self, current_filename, target, extractor): break print( "\n%s\n" - % "\n".join( - [ - os.path.join(target, included_root, filename) - for filename in extractor.included_archives - ] - ) + % "\n".join([ + os.path.join(target, included_root, filename) + for filename in extractor.included_archives + ]) ) if self.current_policy in (RECURSE_ALWAYS, RECURSE_NEVER): self.permanent_policy = self.current_policy @@ -1427,9 +1391,7 @@ def magic_map_matches(self, output, magic_map): def try_by_magic(self, filename): try: - process = subprocess.Popen( - ["file", "-zL", filename], stdout=subprocess.PIPE - ) + process = subprocess.Popen(["file", "-zL", filename], stdout=subprocess.PIPE) status = process.wait() if status != 0: return [] @@ -1524,9 +1486,9 @@ def reverser(x, y): print("%s/" % (filename,)) new_filenames = os.listdir(filename) new_filenames = sorted(new_filenames, key=cmp_to_key(reverser)) - filenames.extend( - [pathjoin(filename, new_filename) for new_filename in new_filenames] - ) + filenames.extend([ + pathjoin(filename, new_filename) for new_filename in new_filenames + ]) else: print(filename) @@ -1610,13 +1572,11 @@ def get_supported_extensions(): # get the lists of built-in extensions and combine them ext_map_base = set(ExtractorBuilder.extension_map.keys()) ext_map = set( - itertools.chain( - *[ - x["extensions"] - for x in ExtractorBuilder.extractor_map.values() - if "extensions" in x - ] - ) + itertools.chain(*[ + x["extensions"] + for x in ExtractorBuilder.extractor_map.values() + if "extensions" in x + ]) ) ext_map = ext_map_base.union(ext_map) @@ -1670,10 +1630,7 @@ def parse_options(self, arguments): "--one-entry", dest="one_entry_default", default=None, - help=( - "specify extraction policy for one-entry " - + "archives: inside/rename/here" - ), + help=("specify extraction policy for one-entry " + "archives: inside/rename/here"), ) parser.add_option( "-n", @@ -1795,9 +1752,7 @@ def try_extractors(self, filename, builder): self.current_extractor = extractor # For the abort() method. error = self.action.run(filename, extractor) if error: - errors.append( - (extractor.file_type, extractor.encoding, error, extractor.stderr) - ) + errors.append((extractor.file_type, extractor.encoding, error, extractor.stderr)) if extractor.target is not None: self.clean_destination(extractor.target) else: diff --git a/tests/compare.py b/tests/compare.py index 0bee33e..f5f2b3a 100755 --- a/tests/compare.py +++ b/tests/compare.py @@ -114,9 +114,7 @@ def __init__(self, **kwargs): self.input = self.input + "\n" def start_proc(self, command, stdin=None, output=None): - process = subprocess.Popen( - command, stdin=subprocess.PIPE, stdout=output, stderr=output - ) + process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=output, stderr=output) if stdin: process.stdin.write(bytes(str(stdin).encode("utf-8"))) process.stdin.close() @@ -194,9 +192,7 @@ def clean(self): raise ExtractorTestError("cleanup exited with status code %s" % (status,)) def show_pass(self): - self.status_writer.show( - "Passed %i/%i: %s" % (self.test_num, NUM_TESTS, self.name) - ) + self.status_writer.show("Passed %i/%i: %s" % (self.test_num, NUM_TESTS, self.name)) return "passed" def show_report(self, status, message=None): @@ -295,9 +291,7 @@ def __init__(self): Loader=yaml.FullLoader, ) self.name_regexps = [re.compile(s) for s in sys.argv[1:]] - self.tests = [ - ExtractorTest(**data) for data in self.test_data if self.wanted_test(data) - ] + self.tests = [ExtractorTest(**data) for data in self.test_data if self.wanted_test(data)] self.add_subdir_tests() def wanted_test(self, data): @@ -307,18 +301,14 @@ def wanted_test(self, data): def add_subdir_tests(self): for odata in self.test_data: - if ( - (not self.wanted_test(odata)) - or "directory" in odata - or ("baseline" not in odata) - ): + if (not self.wanted_test(odata)) or "directory" in odata or ("baseline" not in odata): continue data = odata.copy() data["name"] += " in .." data["directory"] = "inside-dir" - data["filenames"] = " ".join( - ["../%s" % filename for filename in data.get("filenames", "").split()] - ) + data["filenames"] = " ".join([ + "../%s" % filename for filename in data.get("filenames", "").split() + ]) self.tests.append(ExtractorTest(**data)) def run(self):