diff --git a/.gitignore b/.gitignore index b11ca80..2cfee8f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ __pycache__ *.pyc sources.list +_version.py* + +/.idea/ +/.venv/ +/dist/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..9f7c803 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +include *.rst *.txt *.md +recursive-include docs *.rst *.txt *.md +recursive-include src/apt_select * +recursive-include typeshed/pyi *.py *.pyi diff --git a/README.rst b/README.rst index 7dc716e..cd1d6ad 100644 --- a/README.rst +++ b/README.rst @@ -7,7 +7,7 @@ Features -------- * Tests latency to mirrors in a given country's mirror list at `mirrors.ubuntu.com `_. - - 3 requests are sent to each mirror, minumum round trip time being used for rank. + - 3 requests are sent to each mirror, minimum round trip time being used for rank. * Reports latency, status, and bandwidth capacity of the fastest mirrors in a ranked list. - Status and bandwidth are scraped from `launchpad `_. diff --git a/apt_select/__init__.py b/apt_select/__init__.py deleted file mode 100644 index 36a511e..0000000 --- a/apt_select/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = '2.2.1' diff --git a/apt_select/__main__.py b/apt_select/__main__.py deleted file mode 100644 index 200a579..0000000 --- a/apt_select/__main__.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/env python -"""Main apt-select script""" - -import requests -import re - -from sys import exit, stderr, version_info -from os import getcwd -from apt_select.arguments import get_args, DEFAULT_COUNTRY, SKIPPED_FILE_GENERATION -from apt_select.mirrors import Mirrors -from apt_select.apt import System, Sources, SourcesFileError -from apt_select.utils import DEFAULT_REQUEST_HEADERS - -# Support input for Python 2 and 3 -get_input = input -if version_info[:2] <= (2, 7): - get_input = raw_input - - -def set_args(): - """Set arguments, disallow bad combination""" - parser = get_args() - args = parser.parse_args() - - # Convert status argument to format used by Launchpad - args.min_status = args.min_status.replace('-', ' ') - if not args.ping_only and (args.min_status != 'unknown'): - args.min_status = args.min_status.capitalize() - - if args.choose and (not args.top_number or args.top_number < 2): - parser.print_usage() - exit(( - "error: -c/--choose option requires -t/--top-number NUMBER " - "where NUMBER is greater than 1." - )) - - if not args.country: - stderr.write('WARNING: no country code provided. defaulting to US.\n') - args.country = DEFAULT_COUNTRY - elif not re.match(r'^[a-zA-Z]{2}$', args.country): - exit(( - "Invalid country. %s is not in ISO 3166-1 alpha-2 " - "format" % args.country - )) - - return args - - -def get_mirrors(mirrors_url, country): - """Fetch list of Ubuntu mirrors""" - stderr.write("Getting list of mirrors...") - response = requests.get(mirrors_url, headers=DEFAULT_REQUEST_HEADERS) - if response.status_code == requests.codes.NOT_FOUND: - exit( - "The mirror list for country: %s was not found at %s" % ( - country, mirrors_url - ) - ) - - stderr.write("done.\n") - - return response.text.splitlines() - - -def print_status(info, rank): - """Print full mirror status report for ranked item""" - for key in ("Org", "Speed"): - info.setdefault(key, "N/A") - - print(( - "%(rank)d. %(mirror)s\n" - "%(tab)sLatency: %(ms).2f ms\n" - "%(tab)sOrg: %(org)s\n" - "%(tab)sStatus: %(status)s\n" - "%(tab)sSpeed: %(speed)s" % { - 'tab': ' ', - 'rank': rank , - 'mirror': info['Host'], - 'ms': info['Latency'], - 'org': info['Organisation'], - 'status': info['Status'], - 'speed': info['Speed'] - } - )) - - -def print_latency(info, rank, max_host_len): - """Print latency information for mirror in ranked report""" - print("%(rank)d. %(mirror)s: %(padding)s%(ms).2f ms" % { - 'rank': rank, - 'padding': (max_host_len - info.get('host_len', max_host_len)) * ' ', - 'mirror': info['Host'], - 'ms': info['Latency'] - }) - - -def ask(query): - """Ask for unput from user""" - answer = get_input(query) - return answer - - -def get_selected_mirror(list_size): - """Prompt for user input to select desired mirror""" - key = ask("Choose a mirror (1 - %d)\n'q' to quit " % list_size) - while True: - try: - key = int(key) - except ValueError: - if key == 'q': - exit() - else: - if (key >= 1) and (key <= list_size): - break - - key = ask("Invalid entry ") - - return key - - -def yes_or_no(query): - """Get definitive answer""" - opts = ('yes', 'no') - answer = ask(query) - while answer != opts[0]: - if answer == opts[1]: - exit(0) - answer = ask("Please enter '%s' or '%s': " % opts) - - -def apt_select(): - """Run apt-select: Ubuntu archive mirror reporting tool""" - - try: - system = System() - except OSError as err: - exit("Error setting system information:\n\t%s" % err) - - try: - sources = Sources(system.codename) - except SourcesFileError as err: - exit("Error with current apt sources:\n\t%s" % err) - - args = set_args() - mirrors_loc = "mirrors.ubuntu.com" - mirrors_url = "http://%s/%s.txt" % (mirrors_loc, args.country.upper()) - mirrors_list = get_mirrors(mirrors_url, args.country) - - archives = Mirrors(mirrors_list, args.ping_only, args.min_status) - archives.get_rtts() - if archives.got["ping"] < args.top_number: - args.top_number = archives.got["ping"] - - if args.top_number == 0: - exit("Cannot connect to any mirrors in %s\n." % mirrors_list) - - if not args.ping_only: - archives.get_launchpad_urls() - if not archives.abort_launch: - # Mirrors needs a limit to stop launching threads - archives.status_num = args.top_number - stderr.write("Looking up %d status(es)\n" % args.top_number) - archives.lookup_statuses( - system.codename.capitalize(), - system.arch, - args.min_status - ) - - if args.top_number > 1: - stderr.write('\n') - - if args.ping_only or archives.abort_launch: - archives.top_list = archives.ranked[:args.top_number] - - sources.set_current_archives() - current_url = sources.urls['current'] - if archives.urls.get(current_url): - archives.urls[current_url]['Host'] += " (current)" - - show_status = False - max_host_len = 0 - if not args.ping_only and not archives.abort_launch: - show_status = True - else: - def set_hostname_len(url, i): - hostname_len = len(str(i) + archives.urls[url]['Host']) - archives.urls[url]['host_len'] = hostname_len - return hostname_len - - max_host_len = max([set_hostname_len(url, i+1) - for i, url in enumerate(archives.top_list)]) - for i, url in enumerate(archives.top_list): - info = archives.urls[url] - rank = i + 1 - if show_status: - print_status(info, rank) - else: - print_latency(info, rank, max_host_len) - - key = 0 - if args.choose: - key = get_selected_mirror(len(archives.top_list)) - 1 - - if args.list_only: - exit() - - new_mirror = archives.top_list[key] - print("Selecting mirror %(mirror)s ..." % {'mirror': new_mirror}) - if current_url == new_mirror: - stderr.write( - "%(url)s is the currently used mirror.\n" - "%(message)s\n" % { - 'url': current_url, - 'message': sources.skip_gen_msg - }) - exit(SKIPPED_FILE_GENERATION) - - work_dir = getcwd() - if work_dir == sources.DIRECTORY[0:-1]: - query = ( - "'%(dir)s' is the current directory.\n" - "Generating a new '%(apt)s' file will " - "overwrite the current file.\n" - "You should copy or backup '%(apt)s' before replacing it.\n" - "Continue?\n[yes|no] " % { - 'dir': sources.DIRECTORY, - 'apt': sources.APT_FILE - } - ) - yes_or_no(query) - - new_mirror = archives.top_list[key] - try: - sources.generate_new_config(work_dir, new_mirror) - except SourcesFileError as err: - exit("Error generating new config file" % err) - else: - print("New config file saved to %s" % sources.new_file_path) - - exit() - - -def main(): - try: - apt_select() - except KeyboardInterrupt: - stderr.write("Aborting...\n") - -if __name__ == '__main__': - main() diff --git a/apt_select/apt.py b/apt_select/apt.py deleted file mode 100644 index 3b4fa28..0000000 --- a/apt_select/apt.py +++ /dev/null @@ -1,185 +0,0 @@ -#!/usr/bin/env python - -from subprocess import check_output -from os import path -from apt_select.utils import utf8_decode - -SUPPORTED_KERNEL = 'Linux' -SUPPORTED_DISTRIBUTION_TYPE = 'Ubuntu' - -UNAME = 'uname' -KERNEL_COMMAND = (UNAME, '-s') -MACHINE_COMMAND = (UNAME, '-m') -RELEASE_COMMAND = ('lsb_release', '-ics') -RELEASE_FILE = '/etc/lsb-release' - -LAUNCHPAD_ARCH_32 = 'i386' -LAUNCHPAD_ARCH_64 = 'amd64' -LAUNCHPAD_ARCHES = frozenset([ - LAUNCHPAD_ARCH_32, - LAUNCHPAD_ARCH_64 -]) - - -class System(object): - """System information for use in apt related operations""" - - def __init__(self): - _kernel = utf8_decode(check_output(KERNEL_COMMAND)).strip() - if _kernel != SUPPORTED_KERNEL: - raise OSError( - "Invalid kernel found: %s. Expected %s." % ( - _kernel, - SUPPORTED_KERNEL, - ) - ) - - try: - self.dist, self.codename = tuple( - utf8_decode(s).strip() - for s in check_output(RELEASE_COMMAND).split() - ) - except OSError: - # Fall back to using lsb-release info file if lsb_release command - # is not available. e.g. Ubuntu minimal (core, docker image). - try: - with open(RELEASE_FILE, 'rU') as release_file: - try: - lsb_info = dict( - line.strip().split('=') - for line in release_file.readlines() - ) - except ValueError: - raise OSError( - "Unexpected release file format found in %s." % RELEASE_FILE - ) - - try: - self.dist = lsb_info['DISTRIB_ID'] - self.codename = lsb_info['DISTRIB_CODENAME'] - except KeyError: - raise OSError( - "Expected distribution keys missing from %s." % RELEASE_FILE - ) - - except (IOError, OSError): - raise OSError(( - "Unable to determine system distribution. " - "%s is required." % SUPPORTED_DISTRIBUTION_TYPE - )) - - if self.dist != SUPPORTED_DISTRIBUTION_TYPE: - raise OSError( - "%s distributions are not supported. %s is required." % ( - self.dist, SUPPORTED_DISTRIBUTION_TYPE - ) - ) - - self.arch = LAUNCHPAD_ARCH_32 - if utf8_decode(check_output(MACHINE_COMMAND).strip()) == 'x86_64': - self.arch = LAUNCHPAD_ARCH_64 - - -class SourcesFileError(Exception): - """Error class for operations on an apt configuration file - - Operations include: - - verifying/reading from the current system file - - generating a new config file""" - pass - - -class Sources(object): - """Class for apt configuration files""" - - DEB_SCHEMES = frozenset(['deb', 'deb-src']) - PROTOCOLS = frozenset(['http', 'ftp', 'https']) - - DIRECTORY = '/etc/apt/' - LIST_FILE = 'sources.list' - _CONFIG_PATH = DIRECTORY + LIST_FILE - - def __init__(self, codename): - self._codename = codename.lower() - if not path.isfile(self._CONFIG_PATH): - raise SourcesFileError(( - "%s must exist as file" % self._CONFIG_PATH - )) - - self._required_component = "main" - self._lines = [] - self.urls = [] - self.skip_gen_msg = "Skipping file generation" - self.new_file_path = None - - def __set_sources_lines(self): - """Read system config file and store the lines in memory for parsing - and generation of new config file""" - try: - with open(self._CONFIG_PATH, 'r') as f: - self._lines = f.readlines() - except IOError as err: - raise SourcesFileError(( - "Unable to read system apt file: %s" % err - )) - - def __confirm_apt_source_uri(self, uri): - """Check if line follows correct sources.list URI""" - if (uri and (uri[0] in self.DEB_SCHEMES) and - uri[1].split('://')[0] in self.PROTOCOLS): - return True - - return False - - def __get_current_archives(self): - """Parse through all lines of the system apt file to find current - mirror urls""" - urls = {} - for line in self._lines: - fields = line.split() - if self.__confirm_apt_source_uri(fields): - if (not urls and - (self._codename in fields[2]) and - (fields[3] == self._required_component)): - urls['current'] = fields[1] - elif urls and (fields[2] == '%s-security' % self._codename): - urls['security'] = fields[1] - break - - return urls - - def set_current_archives(self): - """Read in the system apt config, parse to find current mirror urls - to set as attribute""" - try: - self.__set_sources_lines() - except SourcesFileError as err: - raise SourcesFileError(err) - - urls = self.__get_current_archives() - if not urls: - raise SourcesFileError(( - "Error finding current %s URI in %s\n%s\n" % - (self._required_component, self._CONFIG_PATH, - self.skip_gen_msg) - )) - - self.urls = urls - - def __set_config_lines(self, new_mirror): - """Replace all instances of the current urls with the new mirror""" - self._lines = ''.join(self._lines) - for url in self.urls.values(): - self._lines = self._lines.replace(url, new_mirror) - - def generate_new_config(self, work_dir, new_mirror): - """Write new configuration file to current working directory""" - self.__set_config_lines(new_mirror) - self.new_file_path = work_dir.rstrip('/') + '/' + self.LIST_FILE - try: - with open(self.new_file_path, 'w') as f: - f.write(self._lines) - except IOError as err: - raise SourcesFileError(( - "Unable to generate new sources.list:\n\t%s\n" % err - )) diff --git a/apt_select/utils.py b/apt_select/utils.py deleted file mode 100644 index 1bffd12..0000000 --- a/apt_select/utils.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env python -"""Collection of module neutral utility functions""" - -from sys import stderr - -import requests - -DEFAULT_REQUEST_HEADERS = { - 'User-Agent': 'apt-select' -} - - -def utf8_decode(encoded): - return encoded.decode('utf-8') - - -class URLGetTextError(Exception): - """Error class for fetching text from a URL""" - pass - - -def get_text(url): - """Return text from GET request response content""" - try: - result = requests.get(url, headers=DEFAULT_REQUEST_HEADERS) - result.raise_for_status() - except requests.HTTPError as err: - raise URLGetTextError(err) - - return result.text - - -def progress_msg(processed, total): - """Update user on percent done""" - if total > 1: - percent = int((float(processed) / total) * 100) - stderr.write("\r[%d/%d] %d%%" % (processed, total, percent)) - stderr.flush() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4566a00 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,693 @@ +[build-system] +requires = ["hatchling>=1.18.0", "hatch-vcs>=0.3.0"] +build-backend = "hatchling.build" + +[project] +name = "apt-select" +dynamic = ["version"] +description = "Ubuntu Archive Mirror reporting tool for apt sources configuration" +readme = "README.rst" +license-files = { paths = ["LICENSE"] } +authors = [ + { name = "John Blakeman", email = "john@johnblakeman.com" }, +] +keywords = [ + "apt", + "configuration", + "latency", + "rank", + "reporting", + "status", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Environment :: Console", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: System :: Installation/Setup", + "Topic :: System :: Networking", + "Topic :: System :: Software Distribution", + "Topic :: System :: Systems Administration", + "Topic :: Utilities", +] +dependencies = [ + "beautifulsoup4", + "requests", +] + +[project.scripts] +apt-select = "apt_select.__main__:main" + +[project.urls] +Homepage = "https://github.com/jblakeman/apt-select" + +[tool.hatch.build.hooks.vcs] +version-file = "src/apt_select/_version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src/apt_select", + "/typeshed/pyi", +] + +[tool.hatch.build.targets.wheel.force-include] +"typeshed/pyi" = "typeshed/pyi" + +[tool.hatch.envs.default] +python = "3.11" +dependencies = [ + "black", + "build", + "hatch", + "mypy", + "pylint", + "pytest", + "pytest-clarity", + "pytest-cov", + "pytest-describe", + "pytest-expecter", + "pytest-random", + "pytest-sugar", + "pytest-xdist[psutil]", + "ruff", + "types-beautifulsoup4", + "types-requests", + "wheel", +] + +[tool.hatch.envs.default.scripts] +dist = [ + "black src test", + "rm -rf 'typeshed/pyi'", + "stubgen --output=typeshed/pyi --search-path=src src", + "hatch build", +] +format = [ + "black src test", +] +lint = [ + "black src test", + "ruff check --fix src test", + "mypy src test", +] +lint-check = [ + "black --check src test", + "pylint src/apt_select", + "ruff check src test", + "mypy src test", +] +test = "pytest" +test-cov-xml = "pytest --cov-report=xml" + +[tool.hatch.version] +source = "vcs" + +[tool.mypy] +check_untyped_defs = true +disallow_any_generics = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +ignore_missing_imports = true +mypy_path = ["typeshed/pyi"] +no_implicit_optional = true +python_version = "3.11" +warn_redundant_casts = true +warn_return_any = true +warn_unused_configs = true +warn_unused_ignores = true + +[tool.pylint.main] +# Analyse import fallback blocks. This can be used to support both Python 2 and 3 +# compatible code, which means that the block might have code that exists only in +# one or another interpreter, leading to false positives when analysed. +# analyse-fallback-blocks = + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint in +# a server-like mode. +# clear-cache-post-run = + +# Always return a 0 (non-error) status code, even if lint errors are found. This +# is primarily useful in continuous integration scripts. +# exit-zero = + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +# extension-pkg-allow-list = + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +# extension-pkg-whitelist = + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +# fail-on = + +# Specify a score threshold under which the program will exit with error. +fail-under = 10.0 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +# from-stdin = + +# Files or directories to be skipped. They should be base names, not paths. +ignore = ["CVS"] + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because "\\" represents the directory delimiter on Windows systems, it +# can"t be used as an escape character. +# ignore-paths = + +# Files or directories matching the regular expression patterns are skipped. The +# regex matches against base names, not paths. The default value ignores Emacs +# file locks +# ignore-patterns = ["^\\.#"] +ignore-patterns = [ + "^_version.py$" +] + +# List of module names for which member attributes should not be checked (useful +# for modules/projects where namespaces are manipulated during runtime and thus +# existing member attributes cannot be deduced by static analysis). It supports +# qualified module names, as well as Unix pattern matching. +# ignored-modules = + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +# init-hook = + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs = 4 + +# Control the amount of potential inferred values when inferring a single object. +# This can help the performance when dealing with large functions or complex, +# nested conditions. +limit-inference-results = 100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +# load-plugins = + +# Pickle collected data for later comparisons. +persistent = true + +# Minimum Python version to use for version dependent checks. Will default to the +# version used to run pylint. +py-version = "3.11" + +# Discover python modules and packages in the file system subtree. +# recursive = + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +# source-roots = + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode = true + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +# unsafe-load-any-extension = + +[tool.pylint.basic] +# Naming style matching correct argument names. +argument-naming-style = "snake_case" + +# Regular expression matching correct argument names. Overrides argument-naming- +# style. If left empty, argument names will be checked with the set naming style. +# argument-rgx = + +# Naming style matching correct attribute names. +attr-naming-style = "snake_case" + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +# attr-rgx = + +# Bad variable names which should always be refused, separated by a comma. +bad-names = ["foo", "bar", "baz", "toto", "tutu", "tata"] + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +# bad-names-rgxs = + +# Naming style matching correct class attribute names. +class-attribute-naming-style = "any" + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +# class-attribute-rgx = + +# Naming style matching correct class constant names. +class-const-naming-style = "UPPER_CASE" + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +# class-const-rgx = + +# Naming style matching correct class names. +class-naming-style = "PascalCase" + +# Regular expression matching correct class names. Overrides class-naming-style. +# If left empty, class names will be checked with the set naming style. +# class-rgx = + +# Naming style matching correct constant names. +const-naming-style = "UPPER_CASE" + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming style. +# const-rgx = + +# Minimum line length for functions/classes that require docstrings, shorter ones +# are exempt. +docstring-min-length = -1 + +# Naming style matching correct function names. +function-naming-style = "snake_case" + +# Regular expression matching correct function names. Overrides function-naming- +# style. If left empty, function names will be checked with the set naming style. +# function-rgx = + +# Good variable names which should always be accepted, separated by a comma. +good-names = ["i", "j", "k", "ex", "Run", "_"] + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +# good-names-rgxs = + +# Include a hint for the correct naming format with invalid-name. +# include-naming-hint = + +# Naming style matching correct inline iteration names. +inlinevar-naming-style = "any" + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +# inlinevar-rgx = + +# Naming style matching correct method names. +method-naming-style = "snake_case" + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +# method-rgx = + +# Naming style matching correct module names. +module-naming-style = "snake_case" + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +# module-rgx = + +# Colon-delimited sets of names that determine each other"s naming style when the +# name regexes allow several styles. +# name-group = + +# Regular expression which should only match function or class names that do not +# require a docstring. +no-docstring-rgx = "^_" + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. These +# decorators are taken in consideration only for invalid-name. +property-classes = ["abc.abstractproperty"] + +# Regular expression matching correct type alias names. If left empty, type alias +# names will be checked with the set naming style. +# typealias-rgx = + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +# typevar-rgx = + +# Naming style matching correct variable names. +variable-naming-style = "snake_case" + +# Regular expression matching correct variable names. Overrides variable-naming- +# style. If left empty, variable names will be checked with the set naming style. +# variable-rgx = + +[tool.pylint.classes] +# Warn about protected attribute access inside special methods +# check-protected-access-in-special-methods = + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods = ["__init__", "__new__", "setUp", "asyncSetUp", "__post_init__"] + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected = ["_asdict", "_fields", "_replace", "_source", "_make", "os._exit"] + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg = ["cls"] + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg = ["mcs"] + +[tool.pylint.design] +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +# exclude-too-few-public-methods = + +# List of qualified class names to ignore when counting class parents (see R0901) +# ignored-parents = + +# Maximum number of arguments for function / method. +max-args = 5 + +# Maximum number of attributes for a class (see R0902). +max-attributes = 7 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr = 5 + +# Maximum number of branch for function / method body. +max-branches = 12 + +# Maximum number of locals for function / method body. +max-locals = 15 + +# Maximum number of parents for a class (see R0901). +max-parents = 7 + +# Maximum number of public methods for a class (see R0904). +max-public-methods = 20 + +# Maximum number of return / yield for function / method body. +max-returns = 6 + +# Maximum number of statements in function / method body. +max-statements = 50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods = 2 + +[tool.pylint.exceptions] +# Exceptions that will emit a warning when caught. +overgeneral-exceptions = ["builtins.BaseException", "builtins.Exception"] + +[tool.pylint.format] +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +# expected-line-ending-format = + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines = "^\\s*(# )??$" + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren = 4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string = " " + +# Maximum number of characters on a single line. +max-line-length = 120 + +# Maximum number of lines in a module. +max-module-lines = 1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +# single-line-class-stmt = + +# Allow the body of an if to be on the same line as the test if there is no else. +# single-line-if-stmt = + +[tool.pylint.imports] +# List of modules that can be imported at any level, not just the top level one. +# allow-any-import-level = + +# Allow explicit reexports by alias from a package __init__. +# allow-reexport-from-package = + +# Allow wildcard imports from modules that define __all__. +# allow-wildcard-with-all = + +# Deprecated modules which should not be used, separated by a comma. +# deprecated-modules = + +# Output a graph (.gv or any supported image format) of external dependencies to +# the given file (report RP0402 must not be disabled). +# ext-import-graph = + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be disabled). +# import-graph = + +# Output a graph (.gv or any supported image format) of internal dependencies to +# the given file (report RP0402 must not be disabled). +# int-import-graph = + +# Force import order to recognize a module as part of the standard compatibility +# libraries. +# known-standard-library = + +# Force import order to recognize a module as part of a third party library. +known-third-party = [ + "play_downloader", + "vtt2srt" +] + +# Couples of modules and preferred modules, separated by a comma. +# preferred-modules = + +[tool.pylint.logging] +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style = "old" + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules = ["logging"] + +[tool.pylint."messages control"] +# Only show warnings with the listed confidence levels. Leave empty to show all. +# Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, UNDEFINED. +confidence = ["HIGH", "CONTROL_FLOW", "INFERENCE", "INFERENCE_FAILURE", "UNDEFINED"] + +# Disable the message, report, category or checker with the given id(s). You can +# either give multiple identifiers separated by comma (,) or put this option +# multiple times (only on the command line, not in the configuration file where +# it should appear only once). You can also use "--disable=all" to disable +# everything first and then re-enable specific checks. For example, if you want +# to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +# disable = ["raw-checker-failed", "bad-inline-option", "locally-disabled", "file-ignored", "suppressed-message", "useless-suppression", "deprecated-pragma", "use-symbolic-message-instead", "empty-docstring", "missing-module-docstring", "missing-class-docstring", "missing-function-docstring", "too-few-public-methods"] +disable = [ + "broad-exception-caught", + "empty-docstring", + "missing-docstring", + "too-few-public-methods", + "too-many-arguments", + "too-many-branches", + "too-many-instance-attributes", + "too-many-locals", + "too-many-return-statements", + "too-many-statements", +] + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where it +# should appear only once). See also the "--disable" option for examples. +enable = ["c-extension-no-member"] + +[tool.pylint.method_args] +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. "requests.api.get,requests.api.post" +timeout-methods = ["requests.api.delete", "requests.api.get", "requests.api.head", "requests.api.options", "requests.api.patch", "requests.api.post", "requests.api.put", "requests.api.request"] + +[tool.pylint.miscellaneous] +# List of note tags to take in consideration, separated by a comma. +notes = ["FIXME", "XXX", "TODO"] + +# Regular expression of note tags to take in consideration. +# notes-rgx = + +[tool.pylint.refactoring] +# Maximum number of nested blocks for function / method body +max-nested-blocks = 5 + +# Complete name of functions that never returns. When checking for inconsistent- +# return-statements if a never returning function is called then it will be +# considered as an explicit return statement and no message will be printed. +never-returning-functions = ["sys.exit", "argparse.parse_error"] + +[tool.pylint.reports] +# Python expression which should return a score less than or equal to 10. You +# have access to the variables "fatal", "error", "warning", "refactor", +# "convention", and "info" which contain the number of messages in each category, +# as well as "statement" which is the total number of statements analyzed. This +# score is used by the global evaluation report (RP0004). +evaluation = "max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))" + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +# msg-template = + +# Set the output format. Available formats are text, parseable, colorized, json +# and msvs (visual studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +# output-format = + +# Tells whether to display a full report or only the messages. +# reports = + +# Activate the evaluation score. +score = true + +[tool.pylint.similarities] +# Comments are removed from the similarity computation +ignore-comments = true + +# Docstrings are removed from the similarity computation +ignore-docstrings = true + +# Imports are removed from the similarity computation +ignore-imports = true + +# Signatures are removed from the similarity computation +ignore-signatures = true + +# Minimum lines number of a similarity. +min-similarity-lines = 4 + +[tool.pylint.spelling] +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions = 4 + +# Spelling dictionary name. No available dictionaries : You need to install both +# the python package and the system dependency for enchant to work.. +# spelling-dict = + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives = "fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:" + +# List of comma separated words that should not be checked. +# spelling-ignore-words = + +# A path to a file that contains the private dictionary; one word per line. +# spelling-private-dict-file = + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +# spelling-store-unknown-words = + +[tool.pylint.typecheck] +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators = ["contextlib.contextmanager"] + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn"t trigger E1101 when accessed. Python regular +# expressions are accepted. +# generated-members = + +# Tells whether missing members accessed in mixin class should be ignored. A +# class is considered mixin if its name matches the mixin-class-rgx option. +# Tells whether to warn about missing members when the owner of the attribute is +# inferred to be None. +ignore-none = true + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference can +# return multiple potential results while evaluating a Python object, but some +# branches might not be evaluated, which results in partial inference. In that +# case, it might be useful to still emit no-member and other checks for the rest +# of the inferred objects. +ignore-on-opaque-inference = true + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins = ["no-member", "not-async-context-manager", "not-context-manager", "attribute-defined-outside-init"] + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes = ["optparse.Values", "thread._local", "_thread._local", "argparse.Namespace"] + +# Show a hint with possible names when a member name was not found. The aspect of +# finding the hint is based on edit distance. +missing-member-hint = true + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance = 1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices = 1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx = ".*[Mm]ixin" + +# List of decorators that change the signature of a decorated function. +# signature-mutators = + +[tool.pylint.variables] +# List of additional names supposed to be defined in builtins. Remember that you +# should avoid defining new builtins when possible. +# additional-builtins = + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables = true + +# List of names allowed to shadow builtins +# allowed-redefined-builtins = + +# List of strings which can identify a callback function by name. A callback name +# must start or end with one of those strings. +callbacks = ["cb_", "_cb"] + +# A regular expression matching the name of dummy variables (i.e. expected to not +# be used). +dummy-variables-rgx = "_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_" + +# Argument names that match this expression will be ignored. +ignored-argument-names = "_.*|^ignored_|^unused_" + +# Tells whether we should check for unused import in __init__ files. +# init-import = + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules = ["six.moves", "past.builtins", "future.builtins", "builtins", "io"] + +[tool.pytest.ini_options] +addopts = "-n 4 --cov=src/apt_select/ --cov-report=term-missing" +minversion = "7.2" +python_files = [ + "*Test.py", + "*_test.py", + "test_*.py", +] +testpaths = [ + "src", + "test", +] diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 3c6e79c..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[bdist_wheel] -universal=1 diff --git a/setup.py b/setup.py deleted file mode 100644 index c10c259..0000000 --- a/setup.py +++ /dev/null @@ -1,52 +0,0 @@ -"""A setuptools based setup module. -See: -https://packaging.python.org/en/latest/distributing.html """ - -from setuptools import setup, find_packages -from codecs import open -from os import path -from apt_select import __version__ - -here = path.abspath(path.dirname(__file__)) - -with open(path.join(here, 'README.rst'), encoding='utf-8') as f: - long_description = f.read() - -setup( - name='apt-select', - version=__version__, - description='Ubuntu Archive Mirror reporting tool for apt sources configuration', - long_description=long_description, - url='https://github.com/jblakeman/apt-select', - author='John Blakeman', - author_email='john@johnblakeman.com', - license='MIT', - classifiers=[ - 'Development Status :: 4 - Beta', - 'Environment :: Console', - 'Intended Audience :: Developers', - 'Intended Audience :: System Administrators', - 'License :: OSI Approved :: MIT License', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Topic :: System :: Installation/Setup', - 'Topic :: System :: Networking', - 'Topic :: System :: Software Distribution', - 'Topic :: System :: Systems Administration', - 'Topic :: Utilities', - ], - keywords='latency status rank reporting apt configuration', - packages=find_packages(exclude=['tests']), - install_requires=['requests', 'beautifulsoup4'], - entry_points = { - 'console_scripts': [ - 'apt-select = apt_select.__main__:main' - ] - } -) diff --git a/src/apt_select/__init__.py b/src/apt_select/__init__.py new file mode 100644 index 0000000..ac64ce1 --- /dev/null +++ b/src/apt_select/__init__.py @@ -0,0 +1 @@ +from apt_select._version import __version__, __version_tuple__ # noqa: F401 diff --git a/src/apt_select/__main__.py b/src/apt_select/__main__.py new file mode 100644 index 0000000..ee9e80d --- /dev/null +++ b/src/apt_select/__main__.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python +"""Main apt-select script""" + +import os +import re +import sys +from argparse import Namespace + +import requests +from apt_select import apt, argument, constant, mirror + + +def set_args() -> tuple[str | Namespace, int]: + """Set arguments, disallow bad combination""" + parser = argument.get_arg_parser() + args = parser.parse_args() + + # Convert status argument to format used by Launchpad + args.min_status = args.min_status.replace("-", " ") + if not args.ping_only and (args.min_status != "unknown"): + args.min_status = args.min_status.capitalize() + + if args.choose and (not args.top_number or args.top_number < 2): + parser.print_usage() + return ( + "error: -c/--choose option requires -t/--top-number NUMBER " + "where NUMBER is greater than 1." + ), constant.NOK + + if not args.country: + sys.stderr.write("WARNING: no country code provided. defaulting to US.\n") + args.country = argument.DEFAULT_COUNTRY + elif not re.match(r"^[a-zA-Z]{2}$", args.country): + return ( + f"Invalid country. {args.country} is not in ISO 3166-1 alpha-2 format", + constant.NOK, + ) + + return args, constant.OK + + +def get_mirrors( + mirrors_url: str, + country: str, + timeout_sec: float = constant.DEFAULT_REQUEST_TIMEOUT_SEC, +) -> tuple[list[str], int]: + """Fetch list of Ubuntu mirrors""" + sys.stderr.write("Getting list of mirrors...") + response = requests.get( + mirrors_url, headers=constant.DEFAULT_REQUEST_HEADERS, timeout=timeout_sec + ) + not_found = requests.codes.get("NOT_FOUND", None) + if response.status_code == not_found: + return ( + [f"The mirror list for country: {country} was not found at {mirrors_url}"], + constant.NOK, + ) + + sys.stderr.write("done.\n") + + return response.text.splitlines(), constant.OK + + +def print_status(info: dict[str, float | str], rank: int) -> None: + """Print full mirror status report for ranked item""" + for key in ("Org", "Speed"): + info.setdefault(key, "N/A") + + print( + ( + f"{rank}. {info['Host']}\n" + f"{' '}Latency: {info['Latency']:.2f} ms\n" + f"{' '}Org: {info['Organisation']}\n" + f"{' '}Status: {info['Status']}\n" + f"{' '}Speed: {info['Speed']}" + ) + ) + + +def print_latency( + info: dict[str, float | str], rank: int, max_hostname_length: int +) -> None: + """Print latency information for mirror in ranked report""" + hostname_length = info.get("host_length", max_hostname_length) + if isinstance(hostname_length, int): + print( + f"{rank}. {info['Host']}: " + f"{(max_hostname_length - hostname_length) * ' '}{info['Latency']:.2f} ms" + ) + + +def ask(query: str) -> str: + """Ask for unput from user""" + answer = input(query) + return answer + + +def get_selected_mirror(list_size: int) -> tuple[int | None, int]: + """Prompt for user input to select desired mirror""" + key = ask(f"Choose a mirror (1 - {list_size})\n'q' to quit ") + while True: + if key == "q": + return None, constant.USER_INTERRUPT + try: + if 1 <= int(key) <= list_size: + break + except ValueError: + key = ask("Invalid entry ") + + return int(key), constant.OK + + +def yes_or_no(query: str) -> int: + """Get definitive answer""" + opts = ("yes", "no") + answer = ask(query) + while answer != opts[0]: + if answer == opts[1]: + return constant.USER_INTERRUPT + answer = ask(f"Please enter '{opts[0]}' or '{opts[1]}': ") + return constant.OK + + +def apt_select() -> tuple[str | None, int]: + """Run apt-select: Ubuntu archive mirror reporting tool""" + + try: + system = apt.System() + except OSError as err: + return f"Error setting system information:\n\t{err}", constant.NOK + + try: + sources = apt.Sources(codename=system.codename) + except apt.SourcesFileError as err: + return f"Error with current apt sources:\n\t{err}", constant.NOK + + args, status = set_args() + if status != constant.OK or isinstance(args, str): + return f"{args}", status + + mirrors_loc = "mirrors.ubuntu.com" + mirrors_url = f"http://{mirrors_loc}/{args.country.upper()}.txt" + mirrors_list, status = get_mirrors(mirrors_url=mirrors_url, country=args.country) + if status != constant.OK: + return "".join(mirrors_list), status + + mirrors = mirror.Mirrors( + url_list=mirrors_list, min_status=args.min_status, ping_only=args.ping_only + ) + mirrors.measure_rtts() + if mirrors.got["ping"] < args.top_number: + args.top_number = mirrors.got["ping"] + + if args.top_number == 0: + return f"Cannot connect to any mirrors in {mirrors_list}\n.", constant.NOK + + if not args.ping_only: + mirrors.fetch_launchpad_urls() + if not mirrors.abort_launch: + # Mirrors needs a limit to stop launching threads + mirrors.status_num = args.top_number + sys.stderr.write(f"Looking up {args.top_number} status(es)\n") + mirrors.lookup_statuses( + codename=system.codename.capitalize(), arch=system.arch + ) + + if args.top_number > 1: + sys.stderr.write("\n") + + if args.ping_only or mirrors.abort_launch: + mirrors.top_list = mirrors.ranked[: args.top_number] + + sources.set_current_archives() + current_url = sources.urls["current"] + if mirrors.urls.get(current_url): + _v1: float | str = mirrors.urls[current_url]["Host"] + if isinstance(_v1, str): + mirrors.urls[current_url]["Host"] = f"{_v1} (current)" + + show_status = False + max_hostname_length = 0 + if not args.ping_only and not mirrors.abort_launch: + show_status = True + else: + max_hostname_length = max( + _set_hostname_length(index=i + 1, entry=mirrors.urls[url]) + for i, url in enumerate(mirrors.top_list) + ) + for i, url in enumerate(mirrors.top_list): + info = mirrors.urls[url] + rank = i + 1 + if show_status: + print_status(info=info, rank=rank) + else: + print_latency(info=info, rank=rank, max_hostname_length=max_hostname_length) + + key = 0 + if args.choose: + maybe_key, status = get_selected_mirror(list_size=len(mirrors.top_list)) + if status != constant.OK: + return None, constant.USER_INTERRUPT + if maybe_key is None: + return "Invalid mirror index", constant.INVALID_MIRROR_INDEX + key = maybe_key - 1 + + if args.list_only: + return None, constant.OK + + new_mirror = mirrors.top_list[key] + print(f"Selecting mirror {new_mirror} ...") + if current_url == new_mirror: + return ( + f"[{current_url}] is the currently used mirror.\n" + f"{sources.skip_gen_msg}\n" + ), constant.SKIPPED_FILE_GENERATION + + work_dir = os.getcwd() + if work_dir == sources.DIRECTORY[0:-1]: + query = ( + f"'{sources.DIRECTORY}' is the current directory.\n" + f"Generating a new '{sources.LIST_FILE}' file will " + "overwrite the current file.\n" + "You should copy or backup '%(apt)s' before replacing it.\n" + "Continue?\n[yes|no] " + ) + status = yes_or_no(query=query) + if status != constant.OK: + return None, status + + new_mirror = mirrors.top_list[key] + try: + sources.generate_new_config(work_dir=work_dir, new_mirror=new_mirror) + except apt.SourcesFileError as err: + return f"Error generating new config file {err}", constant.NOK + print(f"New config file saved to {sources.new_file_path}") + + return None, constant.OK + + +def _set_hostname_length(index: int, entry: dict[str, float | int | str]) -> int: + hostname_len = len(f"{index}{entry['Host']}") + entry["host_length"] = hostname_len + return hostname_len + + +def main() -> int: + try: + msg, status = apt_select() + if msg: + sys.stderr.write(msg) + return status + except KeyboardInterrupt: + sys.stderr.write("Aborting...\n") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/apt_select/apt.py b/src/apt_select/apt.py new file mode 100644 index 0000000..94b4496 --- /dev/null +++ b/src/apt_select/apt.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python + +from os import path +from subprocess import check_output + +from apt_select import constant, utility + +SUPPORTED_KERNEL = "Linux" +SUPPORTED_DISTRIBUTION_TYPE = "Ubuntu" + +UNAME = "uname" +KERNEL_COMMAND = (UNAME, "-s") +MACHINE_COMMAND = (UNAME, "-m") +RELEASE_COMMAND = ("lsb_release", "-ics") +RELEASE_FILE = "/etc/lsb-release" + +LAUNCHPAD_ARCH_32 = "i386" +LAUNCHPAD_ARCH_64 = "amd64" +LAUNCHPAD_ARCHES = frozenset([LAUNCHPAD_ARCH_32, LAUNCHPAD_ARCH_64]) + + +class System: + """System information for use in apt related operations""" + + def __init__(self) -> None: + _kernel = utility.utf8_decode(check_output(KERNEL_COMMAND)).strip() + if _kernel != SUPPORTED_KERNEL: + raise OSError( + f"Invalid kernel found: {_kernel}. Expected {SUPPORTED_KERNEL}." + ) + + try: + self.dist, self.codename = tuple( + utility.utf8_decode(s).strip() + for s in check_output(RELEASE_COMMAND).split() + ) + except OSError: + # Fall back to using lsb-release info file if lsb_release command + # is not available. e.g. Ubuntu minimal (core, docker image). + try: + with open( + RELEASE_FILE, "r", encoding=constant.ENCODING_UTF_8 + ) as release_file: + try: + lsb_info = dict( + line.strip().split("=") for line in release_file.readlines() + ) + except ValueError as err: + raise OSError( + f"Unexpected release file format found in {RELEASE_FILE}." + ) from err + + try: + self.dist = lsb_info["DISTRIB_ID"] + self.codename = lsb_info["DISTRIB_CODENAME"] + except KeyError as err: + raise OSError( + f"Expected distribution keys missing from {RELEASE_FILE}." + ) from err + + except (IOError, OSError) as err: + raise OSError( + ( + "Unable to determine system distribution. " + f"{SUPPORTED_DISTRIBUTION_TYPE} is required." + ) + ) from err + + if self.dist != SUPPORTED_DISTRIBUTION_TYPE: + raise OSError( + f"{self.dist} distributions are not supported. {SUPPORTED_DISTRIBUTION_TYPE} is required." + ) + + self.arch = LAUNCHPAD_ARCH_32 + if utility.utf8_decode(check_output(MACHINE_COMMAND).strip()) == "x86_64": + self.arch = LAUNCHPAD_ARCH_64 + + +class SourcesFileError(Exception): + """Error class for operations on an apt configuration file + + Operations include: + - verifying/reading from the current system file + - generating a new config file""" + + +class Sources: + """Class for apt configuration files""" + + DEB_SCHEMES = frozenset(["deb", "deb-src"]) + PROTOCOLS = frozenset(["http", "ftp", "https"]) + + DIRECTORY = "/etc/apt/" + LIST_FILE = "sources.list" + _CONFIG_PATH = DIRECTORY + LIST_FILE + + def __init__(self, codename: str) -> None: + self._codename = codename.lower() + if not path.isfile(self._CONFIG_PATH): + raise SourcesFileError(f"{self._CONFIG_PATH} must exist as file") + + self._required_component = "main" + self._lines: str | list[str] = [] + self.urls: dict[str, str] = {} + self.skip_gen_msg = "Skipping file generation" + self.new_file_path: str | None = None + + def __set_sources_lines(self) -> None: + """Read system config file and store the lines in memory for parsing + and generation of new config file""" + try: + with open(self._CONFIG_PATH, "r", encoding=constant.ENCODING_UTF_8) as f: + self._lines = f.readlines() + except IOError as err: + raise SourcesFileError(f"Unable to read system apt file: {err}") from err + + def __confirm_apt_source_uri(self, uris: list[str]) -> bool: + """Check if line follows correct sources.list URI""" + if ( + uris + and (uris[0] in self.DEB_SCHEMES) + and uris[1].split("://")[0] in self.PROTOCOLS + ): + return True + + return False + + def __get_current_archives(self) -> dict[str, str]: + """Parse through all lines of the system apt file to find current + mirror urls""" + urls: dict[str, str] = {} + for line in self._lines: + fields = line.split() + if self.__confirm_apt_source_uri(uris=fields): + if ( + not urls + and (self._codename in fields[2]) + and (fields[3] == self._required_component) + ): + urls["current"] = fields[1] + elif urls and (fields[2] == f"{self._codename}-security"): + urls["security"] = fields[1] + break + + return urls + + def set_current_archives(self) -> None: + """Read in the system apt config, parse to find current mirror urls + to set as attribute""" + try: + self.__set_sources_lines() + except SourcesFileError as err: + raise SourcesFileError(err) from err + + urls = self.__get_current_archives() + if not urls: + raise SourcesFileError( + f"Error finding current {self._required_component} URI in {self._CONFIG_PATH}\n{self.skip_gen_msg}\n" + ) + + self.urls = urls + + def __set_config_lines(self, new_mirror: str) -> None: + """Replace all instances of the current urls with the new mirror""" + self._lines = "".join(self._lines) + for url in self.urls.values(): + self._lines = self._lines.replace(url, new_mirror) + + def generate_new_config(self, work_dir: str, new_mirror: str) -> None: + """Write new configuration file to current working directory""" + self.__set_config_lines(new_mirror=new_mirror) + self.new_file_path = work_dir.rstrip("/") + "/" + self.LIST_FILE + try: + if isinstance(self._lines, str): + with open( + self.new_file_path, "w", encoding=constant.ENCODING_UTF_8 + ) as f: + f.write(self._lines) + except IOError as err: + raise SourcesFileError( + f"Unable to generate new sources.list:\n\t{err}\n" + ) from err diff --git a/apt_select/arguments.py b/src/apt_select/argument.py similarity index 56% rename from apt_select/arguments.py rename to src/apt_select/argument.py index d0fe763..d24e26e 100644 --- a/apt_select/arguments.py +++ b/src/apt_select/argument.py @@ -3,60 +3,60 @@ from argparse import ArgumentParser, RawTextHelpFormatter -DEFAULT_COUNTRY = 'US' +from apt_select import constant + +DEFAULT_COUNTRY = "US" DEFAULT_NUMBER = 1 STATUS_ARGS = ( "up-to-date", "one-day-behind", "two-days-behind", "one-week-behind", - "unknown" + "unknown", ) -SKIPPED_FILE_GENERATION = 4 -def get_args(): + +def get_arg_parser() -> ArgumentParser: """Get parsed command line arguments""" parser = ArgumentParser( description=( - "Find the fastest Ubuntu apt mirrors.\n" - "Generate new sources.list file." + "Find the fastest Ubuntu apt mirrors.\nGenerate new sources.list file." + ), + epilog=( + f"The exit code is {constant.OK} on success, {constant.NOK} on error" + f", and {constant.SKIPPED_FILE_GENERATION} if sources.list already has the chosen\n" + "mirror and a new one was not generated." ), - epilog="The exit code is 0 on success, 1 on error, and %d if "\ - "sources.list already has the chosen\n"\ - "mirror and a new one was not generated." % SKIPPED_FILE_GENERATION, - formatter_class=RawTextHelpFormatter + formatter_class=RawTextHelpFormatter, ) parser.add_argument( - '-C', - '--country', - nargs='?', + "-C", + "--country", + nargs="?", type=str, help=( "specify a country to test its list of mirrors\n" "used to match country list file names found at mirrors.ubuntu.com\n" "COUNTRY should follow ISO 3166-1 alpha-2 format\n" - "default: %s" % DEFAULT_COUNTRY + f"default: {DEFAULT_COUNTRY}" ), - metavar='COUNTRY' + metavar="COUNTRY", ) parser.add_argument( - '-t', - '--top-number', - nargs='?', + "-t", + "--top-number", + nargs="?", type=int, - help=( - "specify number of mirrors to return\n" - "default: 1\n" - ), + help="specify number of mirrors to return\n" "default: 1\n", const=DEFAULT_NUMBER, default=DEFAULT_NUMBER, - metavar='NUMBER' + metavar="NUMBER", ) test_group = parser.add_mutually_exclusive_group(required=False) test_group.add_argument( - '-m', - '--min-status', - nargs='?', + "-m", + "--min-status", + nargs="?", choices=STATUS_ARGS, type=str, help=( @@ -67,53 +67,55 @@ def get_args(): " %(two_day)s\n" " %(week)s\n" " %(unknown)s\n" - "default: %(up)s\n" % { - 'up': STATUS_ARGS[0], - 'day': STATUS_ARGS[1], - 'two_day': STATUS_ARGS[2], - 'week': STATUS_ARGS[3], - 'unknown': STATUS_ARGS[4] + "default: %(up)s\n" + % { + "up": STATUS_ARGS[0], + "day": STATUS_ARGS[1], + "two_day": STATUS_ARGS[2], + "week": STATUS_ARGS[3], + "unknown": STATUS_ARGS[4], } ), const=STATUS_ARGS[0], default=STATUS_ARGS[0], - metavar='STATUS' + metavar="STATUS", ) test_group.add_argument( - '-p', - '--ping-only', - action='store_true', + "-p", + "--ping-only", + action="store_true", help=( "rank mirror(s) by latency only, disregard status(es)\n" "cannot be used with -m/--min-status\n" ), - default=False + default=False, ) output_group = parser.add_mutually_exclusive_group(required=False) output_group.add_argument( - '-c', - '--choose', - action='store_true', + "-c", + "--choose", + action="store_true", help=( "choose mirror from a list\n" "requires -t/--top-num NUMBER where NUMBER > 1\n" ), - default=False + default=False, ) output_group.add_argument( - '-l', - '--list', - dest='list_only', - action='store_true', + "-l", + "--list", + dest="list_only", + action="store_true", help=( "print list of mirrors only, don't generate file\n" "cannot be used with -c/--choose\n" ), - default=False + default=False, ) return parser -if __name__ == '__main__': - get_args().parse_args() + +if __name__ == "__main__": + get_arg_parser().parse_args() diff --git a/src/apt_select/constant.py b/src/apt_select/constant.py new file mode 100644 index 0000000..00b514d --- /dev/null +++ b/src/apt_select/constant.py @@ -0,0 +1,9 @@ +DEFAULT_REQUEST_HEADERS = {"User-Agent": "apt-select"} +DEFAULT_REQUEST_TIMEOUT_SEC = 120.0 +ENCODING_UTF_8 = "utf-8" + +OK = 0 +NOK = 1 +INVALID_MIRROR_INDEX = 2 +USER_INTERRUPT = 3 +SKIPPED_FILE_GENERATION = 4 diff --git a/apt_select/mirrors.py b/src/apt_select/mirror.py similarity index 52% rename from apt_select/mirrors.py rename to src/apt_select/mirror.py index 2296f6f..8c3625f 100644 --- a/apt_select/mirrors.py +++ b/src/apt_select/mirror.py @@ -1,140 +1,131 @@ #!/usr/bin/env python """The mirrors module defines classes and methods for Ubuntu archive mirrors. - Provides latency testing and mirror attribute getting from Launchpad.""" +Provides latency testing and mirror attribute getting from Launchpad.""" -from sys import stderr -from socket import (socket, AF_INET, SOCK_STREAM, - gethostbyname, error, timeout, gaierror) +import sys +from queue import Queue, Empty +from socket import socket, AF_INET, SOCK_STREAM, gethostbyname, error, timeout, gaierror +from threading import Thread from time import time -from apt_select.utils import progress_msg, get_text, URLGetTextError -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse +from urllib.parse import urlparse -from threading import Thread +from bs4 import BeautifulSoup, FeatureNotFound -try: - from queue import Queue, Empty -except ImportError: - from Queue import Queue, Empty +from apt_select import utility -from bs4 import BeautifulSoup, FeatureNotFound PARSER = "lxml" try: BeautifulSoup("", PARSER) except FeatureNotFound: PARSER = "html.parser" -try: - xrange -except NameError: - xrange = range - class ConnectError(Exception): """Socket connection errors""" - pass -class Mirrors(object): +class Mirrors: """Base for collection of archive mirrors""" - def __init__(self, url_list, ping_only, min_status): - self.urls = {} + def __init__( + self, url_list: list[str], min_status: int, ping_only: bool | None = None + ) -> None: + if ping_only is None: + ping_only = False + self.urls: dict[str, dict[str, float | int | str]] = {} self._url_list = url_list self._num_trips = 0 self.got = {"ping": 0, "data": 0} - self.ranked = [] - self.top_list = [] - self._trip_queue = Queue() + self.ranked: list[str] = [] + self.top_list: list[str] = [] + self._trip_queue: Queue[tuple[str, float] | None] = Queue() if not ping_only: self._launchpad_base = "https://launchpad.net" - self._launchpad_url = ( - self._launchpad_base + "/ubuntu/+archivemirrors" - ) + self._launchpad_url = self._launchpad_base + "/ubuntu/+archivemirrors" self._launchpad_html = "" self.abort_launch = False - self._status_opts = ( + self._status_opts: tuple[str, ...] = ( "unknown", "One week behind", "Two days behind", "One day behind", - "Up to date" + "Up to date", ) index = self._status_opts.index(min_status) self._status_opts = self._status_opts[index:] # Default to top self.status_num = 1 - def get_launchpad_urls(self): + def fetch_launchpad_urls(self) -> None: """Obtain mirrors' corresponding launchpad URLs""" - stderr.write("Getting list of launchpad URLs...") + sys.stderr.write("Getting list of launchpad URLs...") try: - self._launchpad_html = get_text(self._launchpad_url) - except URLGetTextError as err: - stderr.write(( - "%s: %s\nUnable to retrieve list of launchpad sites\n" - "Reverting to latency only\n" % (self._launchpad_url, err) - )) + self._launchpad_html = utility.get_text(self._launchpad_url) + except utility.URLGetTextError as err: + sys.stderr.write( + ( + f"{self._launchpad_url}: {err}\nUnable to retrieve list of launchpad sites\n" + "Reverting to latency only\n" + ) + ) self.abort_launch = True else: - stderr.write("done.\n") + sys.stderr.write("done.\n") self.__parse_launchpad_list() - def __parse_launchpad_list(self): + def __parse_launchpad_list(self) -> None: """Parse Launchpad's list page to find each mirror's - Official page""" + Official page""" soup = BeautifulSoup(self._launchpad_html, PARSER) prev = "" - for element in soup.table.descendants: - try: - url = element.a - except AttributeError: - pass - else: + if soup.table is not None: + for element in soup.table.descendants: try: - url = url["href"] - except TypeError: + url = element.a + except AttributeError: pass else: - if url in self.urls: - self.urls[url]["Launchpad"] = ( - self._launchpad_base + prev - ) - - if url.startswith("/ubuntu/+mirror/"): - prev = url - - def __kickoff_trips(self): + try: + url = url["href"] + except TypeError: + pass + else: + if url in self.urls: + self.urls[url][ + "Launchpad" + ] = f"{self._launchpad_base}{prev}" + + if url.startswith("/ubuntu/+mirror/"): + prev = url + + def __kickoff_trips(self) -> None: """Instantiate round trips class for all, initiating queued threads""" for url in self._url_list: host = urlparse(url).netloc try: - thread = Thread( - target=_RoundTrip(url, host, self._trip_queue).min_rtt - ) + round_trip = _RoundTrip(url=url, host=host, trip_queue=self._trip_queue) + thread = Thread(target=round_trip.min_rtt) except gaierror as err: - stderr.write("%s: %s ignored\n" % (err, url)) + sys.stderr.write(f"{err}: {url} ignored\n") else: self.urls[url] = {"Host": host} thread.daemon = True thread.start() self._num_trips += 1 - def get_rtts(self): + def measure_rtts(self) -> None: """Test latency to all mirrors""" - stderr.write("Testing latency to mirror(s)\n") + sys.stderr.write("Testing latency to mirror(s)\n") self.__kickoff_trips() processed = 0 - progress_msg(processed, self._num_trips) - for _ in xrange(self._num_trips): + utility.progress_msg(processed=processed, total=self._num_trips) + for _ in range(self._num_trips): try: - min_rtt = self._trip_queue.get(block=True) + min_rtt: tuple[str, float] | None = self._trip_queue.get(block=True) except Empty: pass else: @@ -146,43 +137,45 @@ def get_rtts(self): self.got["ping"] += 1 processed += 1 - progress_msg(processed, self._num_trips) + utility.progress_msg(processed=processed, total=self._num_trips) - stderr.write('\n') + sys.stderr.write("\n") # Mirrors without latency info are removed - self.urls = { - key: val for key, val in self.urls.items() if "Latency" in val - } + self.urls = {key: val for key, val in self.urls.items() if "Latency" in val} - self.ranked = sorted( - self.urls, key=lambda x: self.urls[x]["Latency"] - ) + self.ranked = sorted(self.urls, key=lambda x: self.urls[x]["Latency"]) - def __queue_lookups(self, codename, arch, data_queue): + def __queue_lookups( + self, + codename: str, + arch: str, + data_queue: Queue[tuple[str, dict[str, str] | None]], + ) -> int: """Queue threads for data retrieval from launchpad.net - Returns number of threads started to fulfill number of - requested statuses""" + Returns number of threads started to fulfill number of + requested statuses""" num_threads = 0 for url in self.ranked: + launch_url: float | str | None = None try: launch_url = self.urls[url]["Launchpad"] except KeyError: pass else: - thread = Thread( - target=_LaunchData( - url, - launch_url, - codename, - arch, - data_queue - ).get_info - ) - thread.daemon = True - thread.start() - - num_threads += 1 + if launch_url is not None and isinstance(launch_url, str): + thread = Thread( + target=_LaunchData( + url=url, + launch_url=launch_url, + codename=codename, + arch=arch, + data_queue=data_queue, + ).get_info + ) + thread.daemon = True + thread.start() + num_threads += 1 # We expect number of retrieved status requests may already # be greater than 0. This would be the case anytime an initial @@ -192,16 +185,18 @@ def __queue_lookups(self, codename, arch, data_queue): return num_threads - def lookup_statuses(self, codename, arch, min_status): + def lookup_statuses(self, codename: str, arch: str) -> None: """Scrape statuses/info in from launchpad.net mirror pages""" while (self.got["data"] < self.status_num) and self.ranked: - data_queue = Queue() - num_threads = self.__queue_lookups(codename, arch, data_queue) + data_queue: Queue[tuple[str, dict[str, str] | None]] = Queue() + num_threads = self.__queue_lookups( + codename=codename, arch=arch, data_queue=data_queue + ) if num_threads == 0: break # Get output of all started thread methods from queue - progress_msg(self.got["data"], self.status_num) - for _ in xrange(num_threads): + utility.progress_msg(processed=self.got["data"], total=self.status_num) + for _ in range(num_threads): try: # We don't care about timeouts longer than 7 seconds as # we're only getting 16 KB @@ -214,14 +209,14 @@ def lookup_statuses(self, codename, arch, min_status): self.urls[info[0]].update(info[1]) self.got["data"] += 1 self.top_list.append(info[0]) - progress_msg(self.got["data"], self.status_num) + utility.progress_msg(self.got["data"], self.status_num) # Eliminate the url from the ranked list as long as # something is received from the queue (for selective # iteration if another queue needs to be built) self.ranked.remove(info[0]) - if (self.got["data"] == self.status_num): + if self.got["data"] == self.status_num: break # Reorder by latency as queue returns vary building final list @@ -230,93 +225,98 @@ def lookup_statuses(self, codename, arch, min_status): data_queue.join() -class _RoundTrip(object): +class _RoundTrip: """Socket connections for latency reporting""" - def __init__(self, url, host, trip_queue): + def __init__( + self, url: str, host: str, trip_queue: Queue[tuple[str, float] | None] + ) -> None: self._url = url self._host = host - self._trip_queue = trip_queue + self._trip_queue: Queue[tuple[str, float] | None] = trip_queue self._addr = gethostbyname(host) - def __tcp_ping(self): + def __tcp_ping(self) -> float: """Return socket latency to host's resolved IP address""" port = 80 sock = socket(AF_INET, SOCK_STREAM) sock.settimeout(2.5) - send_tstamp = time()*1000 + send_tstamp = time() * 1000 try: sock.connect((self._addr, port)) except (timeout, error) as err: - raise ConnectError(err) + raise ConnectError(err) from err - recv_tstamp = time()*1000 + recv_tstamp = time() * 1000 rtt = recv_tstamp - send_tstamp sock.close() return rtt - def min_rtt(self): + def min_rtt(self) -> None: """Return lowest rtt""" rtts = [] - for _ in xrange(3): + for _ in range(3): try: rtt = self.__tcp_ping() except ConnectError as err: - stderr.write("\tconnection to %s: %s\n" % (self._host, err)) + sys.stderr.write(f"\tconnection to {self._host}: {err}\n") self._trip_queue.put_nowait(None) return - else: - rtts.append(rtt) + rtts.append(rtt) self._trip_queue.put((self._url, min(rtts))) -class _LaunchData(object): - def __init__(self, url, launch_url, codename, arch, data_queue): +class _LaunchData: + def __init__( + self, + url: str, + launch_url: str, + codename: str, + arch: str, + data_queue: Queue[tuple[str, dict[str, str] | None]], + ) -> None: self._url = url self._launch_url = launch_url self._codename = codename self._arch = arch self._data_queue = data_queue - def __parse_mirror_html(self, launch_html): - info = {} + def __parse_mirror_html(self, launch_html: str) -> dict[str, str]: + info: dict[str, str] = {} soup = BeautifulSoup(launch_html, PARSER) # Find elements of the ids we need - for line in soup.find_all(id=['arches', 'speed', 'organisation']): - if line.name == 'table': + for line in soup.find_all(id=["arches", "speed", "organisation"]): + if line.name == "table": # Status information lives in a table column alongside # series name and machine architecture - for tr in line.find('tbody').find_all('tr'): - arches = [x.get_text() for x in tr.find_all('td')] - if (self._codename in arches[0] and - arches[1] == self._arch): + for tr in line.find("tbody").find_all("tr"): + arches = [x.get_text() for x in tr.find_all("td")] + if self._codename in arches[0] and arches[1] == self._arch: info.update({"Status": arches[2]}) else: # "Speed" lives in a dl, and we use the key -> value as such - info.update({ - line.dt.get_text().strip(':'): line.dd.get_text() - }) + info.update({line.dt.get_text().strip(":"): line.dd.get_text()}) return info - def get_info(self): + def get_info(self) -> None: """Parse launchpad page HTML for mirror information Ideally, launchpadlib would be used to get mirror information, but the Launchpad API doesn't support access to archivemirror statuses.""" try: - launch_html = get_text(self._launch_url) - except URLGetTextError as err: - stderr.write("connection to %s: %s\n" % (self._launch_url, err)) + launch_html = utility.get_text(self._launch_url) + except utility.URLGetTextError as err: + sys.stderr.write(f"connection to {self._launch_url}: {err}\n") self._data_queue.put_nowait((self._url, None)) else: - info = self.__parse_mirror_html(launch_html) + info = self.__parse_mirror_html(launch_html=launch_html) if "Status" not in info: - stderr.write(( - "Unable to parse status info from %s\n" % self._launch_url - )) + sys.stderr.write( + f"Unable to parse status info from {self._launch_url}\n" + ) self._data_queue.put_nowait((self._url, None)) return diff --git a/src/apt_select/utility.py b/src/apt_select/utility.py new file mode 100644 index 0000000..ca9b8c5 --- /dev/null +++ b/src/apt_select/utility.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +"""Collection of module neutral utility functions""" + +import sys + +import requests +from apt_select import constant + + +def utf8_decode(encoded: bytes) -> str: + return encoded.decode(constant.ENCODING_UTF_8) + + +class URLGetTextError(Exception): + """Error class for fetching text from a URL""" + + +def get_text( + url: str, timeout_sec: float = constant.DEFAULT_REQUEST_TIMEOUT_SEC +) -> str: + """Return text from GET request response content""" + try: + result = requests.get( + url, headers=constant.DEFAULT_REQUEST_HEADERS, timeout=timeout_sec + ) + result.raise_for_status() + except requests.HTTPError as err: + raise URLGetTextError(err) from err + + return result.text + + +def progress_msg(processed: float | int, total: float | int) -> None: + """Update user on percent done""" + if total > 1: + percent = int((float(processed) / total) * 100) + sys.stderr.write(f"\r[{processed}/{total}] {percent}%") + sys.stderr.flush() diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/typeshed/pyi/apt_select/__init__.pyi b/typeshed/pyi/apt_select/__init__.pyi new file mode 100644 index 0000000..50d8897 --- /dev/null +++ b/typeshed/pyi/apt_select/__init__.pyi @@ -0,0 +1 @@ +from apt_select._version import __version__ as __version__, __version_tuple__ as __version_tuple__ diff --git a/typeshed/pyi/apt_select/__main__.pyi b/typeshed/pyi/apt_select/__main__.pyi new file mode 100644 index 0000000..c490258 --- /dev/null +++ b/typeshed/pyi/apt_select/__main__.pyi @@ -0,0 +1,12 @@ +from apt_select import apt as apt, argument as argument, constant as constant, mirror as mirror +from argparse import Namespace + +def set_args() -> tuple[str | Namespace, int]: ... +def get_mirrors(mirrors_url: str, country: str, timeout_sec: float = ...) -> tuple[list[str], int]: ... +def print_status(info: dict[str, float | str], rank: int) -> None: ... +def print_latency(info: dict[str, float | str], rank: int, max_hostname_length: int) -> None: ... +def ask(query: str) -> str: ... +def get_selected_mirror(list_size: int) -> tuple[int | None, int]: ... +def yes_or_no(query: str) -> int: ... +def apt_select() -> tuple[str | None, int]: ... +def main() -> int: ... diff --git a/typeshed/pyi/apt_select/apt.pyi b/typeshed/pyi/apt_select/apt.pyi new file mode 100644 index 0000000..c078859 --- /dev/null +++ b/typeshed/pyi/apt_select/apt.pyi @@ -0,0 +1,33 @@ +from _typeshed import Incomplete +from apt_select import constant as constant, utility as utility + +SUPPORTED_KERNEL: str +SUPPORTED_DISTRIBUTION_TYPE: str +UNAME: str +KERNEL_COMMAND: Incomplete +MACHINE_COMMAND: Incomplete +RELEASE_COMMAND: Incomplete +RELEASE_FILE: str +LAUNCHPAD_ARCH_32: str +LAUNCHPAD_ARCH_64: str +LAUNCHPAD_ARCHES: Incomplete + +class System: + dist: Incomplete + codename: Incomplete + arch: Incomplete + def __init__(self) -> None: ... + +class SourcesFileError(Exception): ... + +class Sources: + DEB_SCHEMES: Incomplete + PROTOCOLS: Incomplete + DIRECTORY: str + LIST_FILE: str + urls: Incomplete + skip_gen_msg: str + new_file_path: Incomplete + def __init__(self, codename: str) -> None: ... + def set_current_archives(self) -> None: ... + def generate_new_config(self, work_dir: str, new_mirror: str) -> None: ... diff --git a/typeshed/pyi/apt_select/argument.pyi b/typeshed/pyi/apt_select/argument.pyi new file mode 100644 index 0000000..f3a9f53 --- /dev/null +++ b/typeshed/pyi/apt_select/argument.pyi @@ -0,0 +1,9 @@ +from _typeshed import Incomplete +from apt_select import constant as constant +from argparse import ArgumentParser + +DEFAULT_COUNTRY: str +DEFAULT_NUMBER: int +STATUS_ARGS: Incomplete + +def get_arg_parser() -> ArgumentParser: ... diff --git a/typeshed/pyi/apt_select/constant.pyi b/typeshed/pyi/apt_select/constant.pyi new file mode 100644 index 0000000..6d220e4 --- /dev/null +++ b/typeshed/pyi/apt_select/constant.pyi @@ -0,0 +1,10 @@ +from _typeshed import Incomplete + +DEFAULT_REQUEST_HEADERS: Incomplete +DEFAULT_REQUEST_TIMEOUT_SEC: float +ENCODING_UTF_8: str +OK: int +NOK: int +INVALID_MIRROR_INDEX: int +USER_INTERRUPT: int +SKIPPED_FILE_GENERATION: int diff --git a/typeshed/pyi/apt_select/mirror.pyi b/typeshed/pyi/apt_select/mirror.pyi new file mode 100644 index 0000000..e40890e --- /dev/null +++ b/typeshed/pyi/apt_select/mirror.pyi @@ -0,0 +1,27 @@ +from _typeshed import Incomplete +from apt_select import utility as utility +from queue import Queue + +PARSER: str + +class ConnectError(Exception): ... + +class Mirrors: + urls: Incomplete + got: Incomplete + ranked: Incomplete + top_list: Incomplete + abort_launch: bool + status_num: int + def __init__(self, url_list: list[str], min_status: int, ping_only: bool | None = None) -> None: ... + def fetch_launchpad_urls(self) -> None: ... + def measure_rtts(self) -> None: ... + def lookup_statuses(self, codename: str, arch: str) -> None: ... + +class _RoundTrip: + def __init__(self, url: str, host: str, trip_queue: Queue[tuple[str, float] | None]) -> None: ... + def min_rtt(self) -> None: ... + +class _LaunchData: + def __init__(self, url: str, launch_url: str, codename: str, arch: str, data_queue: Queue[tuple[str, dict[str, str] | None]]) -> None: ... + def get_info(self) -> None: ... diff --git a/typeshed/pyi/apt_select/utility.pyi b/typeshed/pyi/apt_select/utility.pyi new file mode 100644 index 0000000..80159bc --- /dev/null +++ b/typeshed/pyi/apt_select/utility.pyi @@ -0,0 +1,8 @@ +from apt_select import constant as constant + +def utf8_decode(encoded: bytes) -> str: ... + +class URLGetTextError(Exception): ... + +def get_text(url: str, timeout_sec: float = ...) -> str: ... +def progress_msg(processed: float | int, total: float | int) -> None: ... diff --git a/update.sh b/update.sh index 48c1864..5991ae4 100755 --- a/update.sh +++ b/update.sh @@ -1,83 +1,87 @@ -#!/bin/bash +#!/usr/bin/env bash -apt=/etc/apt -file=sources.list -apt_file=${apt}/${file} -backup=${apt_file}.backup +declare -r APT_DIR_PATH='/etc/apt' +declare -r APT_SOURCES_FILE_NAME='sources.list' +declare -r APT_SOURCES_FILE_PATH="${APT_DIR_PATH}/${APT_SOURCES_FILE_NAME}" +declare -r APT_SOURCES_BACKUP_FILE_PATH="${APT_SOURCES_FILE_PATH}.backup" -if [ $EUID -ne 0 ]; then - echo "$0 needs sudoer priveleges to modify ${apt_file}" +# shellcheck disable=SC2046 +if [ $(id -u) -ne 0 ]; then + echo "${0} needs root privilege to modify ${APT_SOURCES_FILE_PATH}" echo "please run script as super user (root)" exit 1 fi -updateApt (){ - mv $file $apt_file && - echo "apt has been updated" +update_apt (){ + mv "${APT_SOURCES_FILE_NAME}" "${APT_SOURCES_FILE_PATH}" && + echo "APT_DIR_PATH has been updated" } -updateBackup (){ - mv $apt_file $backup && - echo "$apt_file backed up to $backup" - updateApt +update_backup (){ + mv "${APT_SOURCES_FILE_PATH}" "${APT_SOURCES_BACKUP_FILE_PATH}" && + echo "${APT_SOURCES_FILE_PATH} backed up to ${APT_SOURCES_BACKUP_FILE_PATH}" + update_apt } examine (){ - less $1 2>/dev/null - isBackup - break + less "${1}" 2>/dev/null + is_backup } -isBackup (){ +is_backup (){ local query options opt - query="Backup file $backup already exists.\n" + query="Backup APT_SOURCES_FILE_NAME ${APT_SOURCES_BACKUP_FILE_PATH} already exists.\n" query+="Choose one of the following options:" - echo -e "$query" + echo -e "${query}" options=( "Replace backup and update apt" "Update apt without backing up" - "Examine $backup" - "Examine $apt_file" - "Examine $PWD/$file" + "Examine ${APT_SOURCES_BACKUP_FILE_PATH}" + "Examine ${APT_SOURCES_FILE_PATH}" + "Examine ${PWD}/${APT_SOURCES_FILE_NAME}" "Quit" ) select opt in "${options[@]}"; do - case $opt in + case ${opt} in "${options[0]}") - updateBackup + update_backup break ;; "${options[1]}") - updateApt + update_apt break ;; "${options[2]}") - examine $backup + examine "${APT_SOURCES_BACKUP_FILE_PATH}" ;; "${options[3]}") - examine $apt_file + examine "${APT_SOURCES_FILE_PATH}" ;; "${options[4]}") - examine $file + examine "${APT_SOURCES_FILE_PATH}" ;; "${options[5]}") break ;; - *) + *) echo invalid option ;; esac done } -if [ "$PWD" = "$apt" ]; then - echo "Please run the update from a directory other than $apt" +if [ "${PWD}" = "${APT_DIR_PATH}" ]; then + echo "Please run the update from a directory other than ${APT_DIR_PATH}" exit 1 else - if [ -f "$file" ]; then - [ -f "$backup" ] && isBackup || updateBackup + if [ -f "${APT_SOURCES_FILE_NAME}" ]; then + if [ -f "${APT_SOURCES_BACKUP_FILE_PATH}" ]; then + is_backup + else + update_backup + fi else - echo "$file must exist in the working directory" + echo "${APT_SOURCES_FILE_NAME} must exist in the working directory" exit 1 fi fi