From c35ed8911275c520de68d78fb198b7d3387a42d8 Mon Sep 17 00:00:00 2001 From: Johan Westin Date: Sun, 3 Mar 2024 10:40:27 +0100 Subject: [PATCH 1/7] Migrate to pyproject toml * Use automatic SCM tag based versioning. --- .gitignore | 5 + MANIFEST.in | 4 + pyproject.toml | 643 +++++++++++++++++++++++++++++++++++++++++++++++++ setup.cfg | 2 - setup.py | 52 ---- 5 files changed, 652 insertions(+), 54 deletions(-) create mode 100644 MANIFEST.in create mode 100644 pyproject.toml delete mode 100644 setup.cfg delete mode 100644 setup.py diff --git a/.gitignore b/.gitignore index b11ca80..2cfee8f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ __pycache__ *.pyc sources.list +_version.py* + +/.idea/ +/.venv/ +/dist/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..5e692b3 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +include *.rst *.txt *.md +recursive-include docs *.rst *.txt *.md +recursive-include apt_select * +recursive-include *.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e3cbeff --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,643 @@ +[build-system] +requires = ["hatchling>=1.18.0", "hatch-vcs>=0.3.0"] +build-backend = "hatchling.build" + +[project] +name = "apt-select" +dynamic = ["version"] +description = "Ubuntu Archive Mirror reporting tool for apt sources configuration" +readme = "README.rst" +license-files = { paths = ["LICENSE"] } +authors = [ + { name = "John Blakeman", email = "john@johnblakeman.com" }, +] +keywords = [ + "apt", + "configuration", + "latency", + "rank", + "reporting", + "status", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Environment :: Console", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: System :: Installation/Setup", + "Topic :: System :: Networking", + "Topic :: System :: Software Distribution", + "Topic :: System :: Systems Administration", + "Topic :: Utilities", +] +dependencies = [ + "beautifulsoup4", + "requests", +] + +[project.scripts] +apt-select = "apt_select.__main__:main" + +[project.urls] +Homepage = "https://github.com/jblakeman/apt-select" + +[tool.hatch.build.hooks.vcs] +version-file = "apt_select/_version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/apt_select", +] + +[tool.hatch.envs.default] +python = "3.11" +dependencies = [ + "black", + "build", + "hatch", + "ruff", + "wheel", +] + +[tool.hatch.envs.default.scripts] +dist = [ + "black apt_select", + "hatch build", +] +format = [ + "black apt_select", +] +lint = [ + "black apt_select", + "ruff check --fix apt_select", +] +lint-check = [ + "black --check apt_select", + "pylint apt_select", + "ruff check apt_select", +] +test = "pytest" +test-cov-xml = "pytest --cov-report=xml" + +[tool.hatch.version] +source = "vcs" + +[tool.pylint.main] +# Analyse import fallback blocks. This can be used to support both Python 2 and 3 +# compatible code, which means that the block might have code that exists only in +# one or another interpreter, leading to false positives when analysed. +# analyse-fallback-blocks = + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint in +# a server-like mode. +# clear-cache-post-run = + +# Always return a 0 (non-error) status code, even if lint errors are found. This +# is primarily useful in continuous integration scripts. +# exit-zero = + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +# extension-pkg-allow-list = + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +# extension-pkg-whitelist = + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +# fail-on = + +# Specify a score threshold under which the program will exit with error. +fail-under = 10.0 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +# from-stdin = + +# Files or directories to be skipped. They should be base names, not paths. +ignore = ["CVS"] + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because "\\" represents the directory delimiter on Windows systems, it +# can"t be used as an escape character. +# ignore-paths = + +# Files or directories matching the regular expression patterns are skipped. The +# regex matches against base names, not paths. The default value ignores Emacs +# file locks +# ignore-patterns = ["^\\.#"] +ignore-patterns = [ + "^_version.py$" +] + +# List of module names for which member attributes should not be checked (useful +# for modules/projects where namespaces are manipulated during runtime and thus +# existing member attributes cannot be deduced by static analysis). It supports +# qualified module names, as well as Unix pattern matching. +# ignored-modules = + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +# init-hook = + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs = 4 + +# Control the amount of potential inferred values when inferring a single object. +# This can help the performance when dealing with large functions or complex, +# nested conditions. +limit-inference-results = 100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +# load-plugins = + +# Pickle collected data for later comparisons. +persistent = true + +# Minimum Python version to use for version dependent checks. Will default to the +# version used to run pylint. +py-version = "3.11" + +# Discover python modules and packages in the file system subtree. +# recursive = + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +# source-roots = + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode = true + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +# unsafe-load-any-extension = + +[tool.pylint.basic] +# Naming style matching correct argument names. +argument-naming-style = "snake_case" + +# Regular expression matching correct argument names. Overrides argument-naming- +# style. If left empty, argument names will be checked with the set naming style. +# argument-rgx = + +# Naming style matching correct attribute names. +attr-naming-style = "snake_case" + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +# attr-rgx = + +# Bad variable names which should always be refused, separated by a comma. +bad-names = ["foo", "bar", "baz", "toto", "tutu", "tata"] + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +# bad-names-rgxs = + +# Naming style matching correct class attribute names. +class-attribute-naming-style = "any" + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +# class-attribute-rgx = + +# Naming style matching correct class constant names. +class-const-naming-style = "UPPER_CASE" + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +# class-const-rgx = + +# Naming style matching correct class names. +class-naming-style = "PascalCase" + +# Regular expression matching correct class names. Overrides class-naming-style. +# If left empty, class names will be checked with the set naming style. +# class-rgx = + +# Naming style matching correct constant names. +const-naming-style = "UPPER_CASE" + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming style. +# const-rgx = + +# Minimum line length for functions/classes that require docstrings, shorter ones +# are exempt. +docstring-min-length = -1 + +# Naming style matching correct function names. +function-naming-style = "snake_case" + +# Regular expression matching correct function names. Overrides function-naming- +# style. If left empty, function names will be checked with the set naming style. +# function-rgx = + +# Good variable names which should always be accepted, separated by a comma. +good-names = ["i", "j", "k", "ex", "Run", "_"] + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +# good-names-rgxs = + +# Include a hint for the correct naming format with invalid-name. +# include-naming-hint = + +# Naming style matching correct inline iteration names. +inlinevar-naming-style = "any" + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +# inlinevar-rgx = + +# Naming style matching correct method names. +method-naming-style = "snake_case" + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +# method-rgx = + +# Naming style matching correct module names. +module-naming-style = "snake_case" + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +# module-rgx = + +# Colon-delimited sets of names that determine each other"s naming style when the +# name regexes allow several styles. +# name-group = + +# Regular expression which should only match function or class names that do not +# require a docstring. +no-docstring-rgx = "^_" + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. These +# decorators are taken in consideration only for invalid-name. +property-classes = ["abc.abstractproperty"] + +# Regular expression matching correct type alias names. If left empty, type alias +# names will be checked with the set naming style. +# typealias-rgx = + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +# typevar-rgx = + +# Naming style matching correct variable names. +variable-naming-style = "snake_case" + +# Regular expression matching correct variable names. Overrides variable-naming- +# style. If left empty, variable names will be checked with the set naming style. +# variable-rgx = + +[tool.pylint.classes] +# Warn about protected attribute access inside special methods +# check-protected-access-in-special-methods = + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods = ["__init__", "__new__", "setUp", "asyncSetUp", "__post_init__"] + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected = ["_asdict", "_fields", "_replace", "_source", "_make", "os._exit"] + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg = ["cls"] + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg = ["mcs"] + +[tool.pylint.design] +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +# exclude-too-few-public-methods = + +# List of qualified class names to ignore when counting class parents (see R0901) +# ignored-parents = + +# Maximum number of arguments for function / method. +max-args = 5 + +# Maximum number of attributes for a class (see R0902). +max-attributes = 7 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr = 5 + +# Maximum number of branch for function / method body. +max-branches = 12 + +# Maximum number of locals for function / method body. +max-locals = 15 + +# Maximum number of parents for a class (see R0901). +max-parents = 7 + +# Maximum number of public methods for a class (see R0904). +max-public-methods = 20 + +# Maximum number of return / yield for function / method body. +max-returns = 6 + +# Maximum number of statements in function / method body. +max-statements = 50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods = 2 + +[tool.pylint.exceptions] +# Exceptions that will emit a warning when caught. +overgeneral-exceptions = ["builtins.BaseException", "builtins.Exception"] + +[tool.pylint.format] +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +# expected-line-ending-format = + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines = "^\\s*(# )??$" + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren = 4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string = " " + +# Maximum number of characters on a single line. +max-line-length = 120 + +# Maximum number of lines in a module. +max-module-lines = 1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +# single-line-class-stmt = + +# Allow the body of an if to be on the same line as the test if there is no else. +# single-line-if-stmt = + +[tool.pylint.imports] +# List of modules that can be imported at any level, not just the top level one. +# allow-any-import-level = + +# Allow explicit reexports by alias from a package __init__. +# allow-reexport-from-package = + +# Allow wildcard imports from modules that define __all__. +# allow-wildcard-with-all = + +# Deprecated modules which should not be used, separated by a comma. +# deprecated-modules = + +# Output a graph (.gv or any supported image format) of external dependencies to +# the given file (report RP0402 must not be disabled). +# ext-import-graph = + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be disabled). +# import-graph = + +# Output a graph (.gv or any supported image format) of internal dependencies to +# the given file (report RP0402 must not be disabled). +# int-import-graph = + +# Force import order to recognize a module as part of the standard compatibility +# libraries. +# known-standard-library = + +# Force import order to recognize a module as part of a third party library. +known-third-party = [ + "play_downloader", + "vtt2srt" +] + +# Couples of modules and preferred modules, separated by a comma. +# preferred-modules = + +[tool.pylint.logging] +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style = "old" + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules = ["logging"] + +[tool.pylint."messages control"] +# Only show warnings with the listed confidence levels. Leave empty to show all. +# Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, UNDEFINED. +confidence = ["HIGH", "CONTROL_FLOW", "INFERENCE", "INFERENCE_FAILURE", "UNDEFINED"] + +# Disable the message, report, category or checker with the given id(s). You can +# either give multiple identifiers separated by comma (,) or put this option +# multiple times (only on the command line, not in the configuration file where +# it should appear only once). You can also use "--disable=all" to disable +# everything first and then re-enable specific checks. For example, if you want +# to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +# disable = ["raw-checker-failed", "bad-inline-option", "locally-disabled", "file-ignored", "suppressed-message", "useless-suppression", "deprecated-pragma", "use-symbolic-message-instead", "empty-docstring", "missing-module-docstring", "missing-class-docstring", "missing-function-docstring", "too-few-public-methods"] +disable = [ + "broad-exception-caught", + "empty-docstring", + "missing-docstring", + "too-few-public-methods", + "too-many-arguments", + "too-many-branches", + "too-many-instance-attributes", + "too-many-locals", + "too-many-return-statements", + "too-many-statements", +] + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where it +# should appear only once). See also the "--disable" option for examples. +enable = ["c-extension-no-member"] + +[tool.pylint.method_args] +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. "requests.api.get,requests.api.post" +timeout-methods = ["requests.api.delete", "requests.api.get", "requests.api.head", "requests.api.options", "requests.api.patch", "requests.api.post", "requests.api.put", "requests.api.request"] + +[tool.pylint.miscellaneous] +# List of note tags to take in consideration, separated by a comma. +notes = ["FIXME", "XXX", "TODO"] + +# Regular expression of note tags to take in consideration. +# notes-rgx = + +[tool.pylint.refactoring] +# Maximum number of nested blocks for function / method body +max-nested-blocks = 5 + +# Complete name of functions that never returns. When checking for inconsistent- +# return-statements if a never returning function is called then it will be +# considered as an explicit return statement and no message will be printed. +never-returning-functions = ["sys.exit", "argparse.parse_error"] + +[tool.pylint.reports] +# Python expression which should return a score less than or equal to 10. You +# have access to the variables "fatal", "error", "warning", "refactor", +# "convention", and "info" which contain the number of messages in each category, +# as well as "statement" which is the total number of statements analyzed. This +# score is used by the global evaluation report (RP0004). +evaluation = "max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))" + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +# msg-template = + +# Set the output format. Available formats are text, parseable, colorized, json +# and msvs (visual studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +# output-format = + +# Tells whether to display a full report or only the messages. +# reports = + +# Activate the evaluation score. +score = true + +[tool.pylint.similarities] +# Comments are removed from the similarity computation +ignore-comments = true + +# Docstrings are removed from the similarity computation +ignore-docstrings = true + +# Imports are removed from the similarity computation +ignore-imports = true + +# Signatures are removed from the similarity computation +ignore-signatures = true + +# Minimum lines number of a similarity. +min-similarity-lines = 4 + +[tool.pylint.spelling] +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions = 4 + +# Spelling dictionary name. No available dictionaries : You need to install both +# the python package and the system dependency for enchant to work.. +# spelling-dict = + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives = "fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip:" + +# List of comma separated words that should not be checked. +# spelling-ignore-words = + +# A path to a file that contains the private dictionary; one word per line. +# spelling-private-dict-file = + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +# spelling-store-unknown-words = + +[tool.pylint.typecheck] +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators = ["contextlib.contextmanager"] + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn"t trigger E1101 when accessed. Python regular +# expressions are accepted. +# generated-members = + +# Tells whether missing members accessed in mixin class should be ignored. A +# class is considered mixin if its name matches the mixin-class-rgx option. +# Tells whether to warn about missing members when the owner of the attribute is +# inferred to be None. +ignore-none = true + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference can +# return multiple potential results while evaluating a Python object, but some +# branches might not be evaluated, which results in partial inference. In that +# case, it might be useful to still emit no-member and other checks for the rest +# of the inferred objects. +ignore-on-opaque-inference = true + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins = ["no-member", "not-async-context-manager", "not-context-manager", "attribute-defined-outside-init"] + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes = ["optparse.Values", "thread._local", "_thread._local", "argparse.Namespace"] + +# Show a hint with possible names when a member name was not found. The aspect of +# finding the hint is based on edit distance. +missing-member-hint = true + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance = 1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices = 1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx = ".*[Mm]ixin" + +# List of decorators that change the signature of a decorated function. +# signature-mutators = + +[tool.pylint.variables] +# List of additional names supposed to be defined in builtins. Remember that you +# should avoid defining new builtins when possible. +# additional-builtins = + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables = true + +# List of names allowed to shadow builtins +# allowed-redefined-builtins = + +# List of strings which can identify a callback function by name. A callback name +# must start or end with one of those strings. +callbacks = ["cb_", "_cb"] + +# A regular expression matching the name of dummy variables (i.e. expected to not +# be used). +dummy-variables-rgx = "_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_" + +# Argument names that match this expression will be ignored. +ignored-argument-names = "_.*|^ignored_|^unused_" + +# Tells whether we should check for unused import in __init__ files. +# init-import = + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules = ["six.moves", "past.builtins", "future.builtins", "builtins", "io"] diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 3c6e79c..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[bdist_wheel] -universal=1 diff --git a/setup.py b/setup.py deleted file mode 100644 index c10c259..0000000 --- a/setup.py +++ /dev/null @@ -1,52 +0,0 @@ -"""A setuptools based setup module. -See: -https://packaging.python.org/en/latest/distributing.html """ - -from setuptools import setup, find_packages -from codecs import open -from os import path -from apt_select import __version__ - -here = path.abspath(path.dirname(__file__)) - -with open(path.join(here, 'README.rst'), encoding='utf-8') as f: - long_description = f.read() - -setup( - name='apt-select', - version=__version__, - description='Ubuntu Archive Mirror reporting tool for apt sources configuration', - long_description=long_description, - url='https://github.com/jblakeman/apt-select', - author='John Blakeman', - author_email='john@johnblakeman.com', - license='MIT', - classifiers=[ - 'Development Status :: 4 - Beta', - 'Environment :: Console', - 'Intended Audience :: Developers', - 'Intended Audience :: System Administrators', - 'License :: OSI Approved :: MIT License', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Topic :: System :: Installation/Setup', - 'Topic :: System :: Networking', - 'Topic :: System :: Software Distribution', - 'Topic :: System :: Systems Administration', - 'Topic :: Utilities', - ], - keywords='latency status rank reporting apt configuration', - packages=find_packages(exclude=['tests']), - install_requires=['requests', 'beautifulsoup4'], - entry_points = { - 'console_scripts': [ - 'apt-select = apt_select.__main__:main' - ] - } -) From 0c3da08c1a468821336951dd7c3dc8df9210715b Mon Sep 17 00:00:00 2001 From: Johan Westin Date: Sun, 3 Mar 2024 10:46:24 +0100 Subject: [PATCH 2/7] Spelling correction --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 7dc716e..cd1d6ad 100644 --- a/README.rst +++ b/README.rst @@ -7,7 +7,7 @@ Features -------- * Tests latency to mirrors in a given country's mirror list at `mirrors.ubuntu.com `_. - - 3 requests are sent to each mirror, minumum round trip time being used for rank. + - 3 requests are sent to each mirror, minimum round trip time being used for rank. * Reports latency, status, and bandwidth capacity of the fastest mirrors in a ranked list. - Status and bandwidth are scraped from `launchpad `_. From 97b6453bfbc879ee9b5a2de933ea9bf62617bccf Mon Sep 17 00:00:00 2001 From: Johan Westin Date: Sun, 3 Mar 2024 10:51:47 +0100 Subject: [PATCH 3/7] Apply shellcheck --- update.sh | 74 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/update.sh b/update.sh index 48c1864..5991ae4 100755 --- a/update.sh +++ b/update.sh @@ -1,83 +1,87 @@ -#!/bin/bash +#!/usr/bin/env bash -apt=/etc/apt -file=sources.list -apt_file=${apt}/${file} -backup=${apt_file}.backup +declare -r APT_DIR_PATH='/etc/apt' +declare -r APT_SOURCES_FILE_NAME='sources.list' +declare -r APT_SOURCES_FILE_PATH="${APT_DIR_PATH}/${APT_SOURCES_FILE_NAME}" +declare -r APT_SOURCES_BACKUP_FILE_PATH="${APT_SOURCES_FILE_PATH}.backup" -if [ $EUID -ne 0 ]; then - echo "$0 needs sudoer priveleges to modify ${apt_file}" +# shellcheck disable=SC2046 +if [ $(id -u) -ne 0 ]; then + echo "${0} needs root privilege to modify ${APT_SOURCES_FILE_PATH}" echo "please run script as super user (root)" exit 1 fi -updateApt (){ - mv $file $apt_file && - echo "apt has been updated" +update_apt (){ + mv "${APT_SOURCES_FILE_NAME}" "${APT_SOURCES_FILE_PATH}" && + echo "APT_DIR_PATH has been updated" } -updateBackup (){ - mv $apt_file $backup && - echo "$apt_file backed up to $backup" - updateApt +update_backup (){ + mv "${APT_SOURCES_FILE_PATH}" "${APT_SOURCES_BACKUP_FILE_PATH}" && + echo "${APT_SOURCES_FILE_PATH} backed up to ${APT_SOURCES_BACKUP_FILE_PATH}" + update_apt } examine (){ - less $1 2>/dev/null - isBackup - break + less "${1}" 2>/dev/null + is_backup } -isBackup (){ +is_backup (){ local query options opt - query="Backup file $backup already exists.\n" + query="Backup APT_SOURCES_FILE_NAME ${APT_SOURCES_BACKUP_FILE_PATH} already exists.\n" query+="Choose one of the following options:" - echo -e "$query" + echo -e "${query}" options=( "Replace backup and update apt" "Update apt without backing up" - "Examine $backup" - "Examine $apt_file" - "Examine $PWD/$file" + "Examine ${APT_SOURCES_BACKUP_FILE_PATH}" + "Examine ${APT_SOURCES_FILE_PATH}" + "Examine ${PWD}/${APT_SOURCES_FILE_NAME}" "Quit" ) select opt in "${options[@]}"; do - case $opt in + case ${opt} in "${options[0]}") - updateBackup + update_backup break ;; "${options[1]}") - updateApt + update_apt break ;; "${options[2]}") - examine $backup + examine "${APT_SOURCES_BACKUP_FILE_PATH}" ;; "${options[3]}") - examine $apt_file + examine "${APT_SOURCES_FILE_PATH}" ;; "${options[4]}") - examine $file + examine "${APT_SOURCES_FILE_PATH}" ;; "${options[5]}") break ;; - *) + *) echo invalid option ;; esac done } -if [ "$PWD" = "$apt" ]; then - echo "Please run the update from a directory other than $apt" +if [ "${PWD}" = "${APT_DIR_PATH}" ]; then + echo "Please run the update from a directory other than ${APT_DIR_PATH}" exit 1 else - if [ -f "$file" ]; then - [ -f "$backup" ] && isBackup || updateBackup + if [ -f "${APT_SOURCES_FILE_NAME}" ]; then + if [ -f "${APT_SOURCES_BACKUP_FILE_PATH}" ]; then + is_backup + else + update_backup + fi else - echo "$file must exist in the working directory" + echo "${APT_SOURCES_FILE_NAME} must exist in the working directory" exit 1 fi fi From 2c0b47e1f9ebf3f1f2a3a120cf46eee3be5e8658 Mon Sep 17 00:00:00 2001 From: Johan Westin Date: Sun, 3 Mar 2024 10:58:37 +0100 Subject: [PATCH 4/7] Move Python code to src directory --- MANIFEST.in | 2 +- pyproject.toml | 18 +++++++++--------- {apt_select => src/apt_select}/__init__.py | 0 {apt_select => src/apt_select}/__main__.py | 0 {apt_select => src/apt_select}/apt.py | 0 {apt_select => src/apt_select}/arguments.py | 0 {apt_select => src/apt_select}/mirrors.py | 0 {apt_select => src/apt_select}/utils.py | 0 8 files changed, 10 insertions(+), 10 deletions(-) rename {apt_select => src/apt_select}/__init__.py (100%) rename {apt_select => src/apt_select}/__main__.py (100%) rename {apt_select => src/apt_select}/apt.py (100%) rename {apt_select => src/apt_select}/arguments.py (100%) rename {apt_select => src/apt_select}/mirrors.py (100%) rename {apt_select => src/apt_select}/utils.py (100%) diff --git a/MANIFEST.in b/MANIFEST.in index 5e692b3..4f2f121 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include *.rst *.txt *.md recursive-include docs *.rst *.txt *.md -recursive-include apt_select * +recursive-include src/apt_select * recursive-include *.py diff --git a/pyproject.toml b/pyproject.toml index e3cbeff..1893a03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,11 +47,11 @@ apt-select = "apt_select.__main__:main" Homepage = "https://github.com/jblakeman/apt-select" [tool.hatch.build.hooks.vcs] -version-file = "apt_select/_version.py" +version-file = "src/apt_select/_version.py" [tool.hatch.build.targets.sdist] include = [ - "/apt_select", + "/src/apt_select", ] [tool.hatch.envs.default] @@ -66,20 +66,20 @@ dependencies = [ [tool.hatch.envs.default.scripts] dist = [ - "black apt_select", + "black src", "hatch build", ] format = [ - "black apt_select", + "black src", ] lint = [ - "black apt_select", - "ruff check --fix apt_select", + "black src", + "ruff check --fix src", ] lint-check = [ - "black --check apt_select", - "pylint apt_select", - "ruff check apt_select", + "black --check src", + "pylint src", + "ruff check src", ] test = "pytest" test-cov-xml = "pytest --cov-report=xml" diff --git a/apt_select/__init__.py b/src/apt_select/__init__.py similarity index 100% rename from apt_select/__init__.py rename to src/apt_select/__init__.py diff --git a/apt_select/__main__.py b/src/apt_select/__main__.py similarity index 100% rename from apt_select/__main__.py rename to src/apt_select/__main__.py diff --git a/apt_select/apt.py b/src/apt_select/apt.py similarity index 100% rename from apt_select/apt.py rename to src/apt_select/apt.py diff --git a/apt_select/arguments.py b/src/apt_select/arguments.py similarity index 100% rename from apt_select/arguments.py rename to src/apt_select/arguments.py diff --git a/apt_select/mirrors.py b/src/apt_select/mirrors.py similarity index 100% rename from apt_select/mirrors.py rename to src/apt_select/mirrors.py diff --git a/apt_select/utils.py b/src/apt_select/utils.py similarity index 100% rename from apt_select/utils.py rename to src/apt_select/utils.py From 37944867324b677653e433d511c0fb6f0f404758 Mon Sep 17 00:00:00 2001 From: Johan Westin Date: Sun, 3 Mar 2024 11:03:23 +0100 Subject: [PATCH 5/7] Reformat with black 24.2.0 --- src/apt_select/__init__.py | 2 +- src/apt_select/__main__.py | 127 ++++++++++++++++++----------------- src/apt_select/apt.py | 130 ++++++++++++++++++------------------ src/apt_select/arguments.py | 85 ++++++++++++----------- src/apt_select/mirrors.py | 79 +++++++++------------- src/apt_select/utils.py | 7 +- 6 files changed, 210 insertions(+), 220 deletions(-) diff --git a/src/apt_select/__init__.py b/src/apt_select/__init__.py index 36a511e..b19ee4b 100644 --- a/src/apt_select/__init__.py +++ b/src/apt_select/__init__.py @@ -1 +1 @@ -__version__ = '2.2.1' +__version__ = "2.2.1" diff --git a/src/apt_select/__main__.py b/src/apt_select/__main__.py index 200a579..e5da157 100644 --- a/src/apt_select/__main__.py +++ b/src/apt_select/__main__.py @@ -23,25 +23,29 @@ def set_args(): args = parser.parse_args() # Convert status argument to format used by Launchpad - args.min_status = args.min_status.replace('-', ' ') - if not args.ping_only and (args.min_status != 'unknown'): + args.min_status = args.min_status.replace("-", " ") + if not args.ping_only and (args.min_status != "unknown"): args.min_status = args.min_status.capitalize() if args.choose and (not args.top_number or args.top_number < 2): parser.print_usage() - exit(( - "error: -c/--choose option requires -t/--top-number NUMBER " - "where NUMBER is greater than 1." - )) + exit( + ( + "error: -c/--choose option requires -t/--top-number NUMBER " + "where NUMBER is greater than 1." + ) + ) if not args.country: - stderr.write('WARNING: no country code provided. defaulting to US.\n') + stderr.write("WARNING: no country code provided. defaulting to US.\n") args.country = DEFAULT_COUNTRY - elif not re.match(r'^[a-zA-Z]{2}$', args.country): - exit(( - "Invalid country. %s is not in ISO 3166-1 alpha-2 " - "format" % args.country - )) + elif not re.match(r"^[a-zA-Z]{2}$", args.country): + exit( + ( + "Invalid country. %s is not in ISO 3166-1 alpha-2 " + "format" % args.country + ) + ) return args @@ -52,9 +56,8 @@ def get_mirrors(mirrors_url, country): response = requests.get(mirrors_url, headers=DEFAULT_REQUEST_HEADERS) if response.status_code == requests.codes.NOT_FOUND: exit( - "The mirror list for country: %s was not found at %s" % ( - country, mirrors_url - ) + "The mirror list for country: %s was not found at %s" + % (country, mirrors_url) ) stderr.write("done.\n") @@ -65,33 +68,39 @@ def get_mirrors(mirrors_url, country): def print_status(info, rank): """Print full mirror status report for ranked item""" for key in ("Org", "Speed"): - info.setdefault(key, "N/A") - - print(( - "%(rank)d. %(mirror)s\n" - "%(tab)sLatency: %(ms).2f ms\n" - "%(tab)sOrg: %(org)s\n" - "%(tab)sStatus: %(status)s\n" - "%(tab)sSpeed: %(speed)s" % { - 'tab': ' ', - 'rank': rank , - 'mirror': info['Host'], - 'ms': info['Latency'], - 'org': info['Organisation'], - 'status': info['Status'], - 'speed': info['Speed'] - } - )) + info.setdefault(key, "N/A") + + print( + ( + "%(rank)d. %(mirror)s\n" + "%(tab)sLatency: %(ms).2f ms\n" + "%(tab)sOrg: %(org)s\n" + "%(tab)sStatus: %(status)s\n" + "%(tab)sSpeed: %(speed)s" + % { + "tab": " ", + "rank": rank, + "mirror": info["Host"], + "ms": info["Latency"], + "org": info["Organisation"], + "status": info["Status"], + "speed": info["Speed"], + } + ) + ) def print_latency(info, rank, max_host_len): """Print latency information for mirror in ranked report""" - print("%(rank)d. %(mirror)s: %(padding)s%(ms).2f ms" % { - 'rank': rank, - 'padding': (max_host_len - info.get('host_len', max_host_len)) * ' ', - 'mirror': info['Host'], - 'ms': info['Latency'] - }) + print( + "%(rank)d. %(mirror)s: %(padding)s%(ms).2f ms" + % { + "rank": rank, + "padding": (max_host_len - info.get("host_len", max_host_len)) * " ", + "mirror": info["Host"], + "ms": info["Latency"], + } + ) def ask(query): @@ -107,7 +116,7 @@ def get_selected_mirror(list_size): try: key = int(key) except ValueError: - if key == 'q': + if key == "q": exit() else: if (key >= 1) and (key <= list_size): @@ -120,7 +129,7 @@ def get_selected_mirror(list_size): def yes_or_no(query): """Get definitive answer""" - opts = ('yes', 'no') + opts = ("yes", "no") answer = ask(query) while answer != opts[0]: if answer == opts[1]: @@ -161,34 +170,34 @@ def apt_select(): archives.status_num = args.top_number stderr.write("Looking up %d status(es)\n" % args.top_number) archives.lookup_statuses( - system.codename.capitalize(), - system.arch, - args.min_status + system.codename.capitalize(), system.arch, args.min_status ) if args.top_number > 1: - stderr.write('\n') + stderr.write("\n") if args.ping_only or archives.abort_launch: - archives.top_list = archives.ranked[:args.top_number] + archives.top_list = archives.ranked[: args.top_number] sources.set_current_archives() - current_url = sources.urls['current'] + current_url = sources.urls["current"] if archives.urls.get(current_url): - archives.urls[current_url]['Host'] += " (current)" + archives.urls[current_url]["Host"] += " (current)" show_status = False max_host_len = 0 if not args.ping_only and not archives.abort_launch: show_status = True else: + def set_hostname_len(url, i): - hostname_len = len(str(i) + archives.urls[url]['Host']) - archives.urls[url]['host_len'] = hostname_len + hostname_len = len(str(i) + archives.urls[url]["Host"]) + archives.urls[url]["host_len"] = hostname_len return hostname_len - max_host_len = max([set_hostname_len(url, i+1) - for i, url in enumerate(archives.top_list)]) + max_host_len = max( + [set_hostname_len(url, i + 1) for i, url in enumerate(archives.top_list)] + ) for i, url in enumerate(archives.top_list): info = archives.urls[url] rank = i + 1 @@ -205,14 +214,12 @@ def set_hostname_len(url, i): exit() new_mirror = archives.top_list[key] - print("Selecting mirror %(mirror)s ..." % {'mirror': new_mirror}) + print("Selecting mirror %(mirror)s ..." % {"mirror": new_mirror}) if current_url == new_mirror: stderr.write( "%(url)s is the currently used mirror.\n" - "%(message)s\n" % { - 'url': current_url, - 'message': sources.skip_gen_msg - }) + "%(message)s\n" % {"url": current_url, "message": sources.skip_gen_msg} + ) exit(SKIPPED_FILE_GENERATION) work_dir = getcwd() @@ -222,10 +229,7 @@ def set_hostname_len(url, i): "Generating a new '%(apt)s' file will " "overwrite the current file.\n" "You should copy or backup '%(apt)s' before replacing it.\n" - "Continue?\n[yes|no] " % { - 'dir': sources.DIRECTORY, - 'apt': sources.APT_FILE - } + "Continue?\n[yes|no] " % {"dir": sources.DIRECTORY, "apt": sources.APT_FILE} ) yes_or_no(query) @@ -246,5 +250,6 @@ def main(): except KeyboardInterrupt: stderr.write("Aborting...\n") -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/src/apt_select/apt.py b/src/apt_select/apt.py index 3b4fa28..b3d65e6 100644 --- a/src/apt_select/apt.py +++ b/src/apt_select/apt.py @@ -4,21 +4,18 @@ from os import path from apt_select.utils import utf8_decode -SUPPORTED_KERNEL = 'Linux' -SUPPORTED_DISTRIBUTION_TYPE = 'Ubuntu' +SUPPORTED_KERNEL = "Linux" +SUPPORTED_DISTRIBUTION_TYPE = "Ubuntu" -UNAME = 'uname' -KERNEL_COMMAND = (UNAME, '-s') -MACHINE_COMMAND = (UNAME, '-m') -RELEASE_COMMAND = ('lsb_release', '-ics') -RELEASE_FILE = '/etc/lsb-release' +UNAME = "uname" +KERNEL_COMMAND = (UNAME, "-s") +MACHINE_COMMAND = (UNAME, "-m") +RELEASE_COMMAND = ("lsb_release", "-ics") +RELEASE_FILE = "/etc/lsb-release" -LAUNCHPAD_ARCH_32 = 'i386' -LAUNCHPAD_ARCH_64 = 'amd64' -LAUNCHPAD_ARCHES = frozenset([ - LAUNCHPAD_ARCH_32, - LAUNCHPAD_ARCH_64 -]) +LAUNCHPAD_ARCH_32 = "i386" +LAUNCHPAD_ARCH_64 = "amd64" +LAUNCHPAD_ARCHES = frozenset([LAUNCHPAD_ARCH_32, LAUNCHPAD_ARCH_64]) class System(object): @@ -28,7 +25,8 @@ def __init__(self): _kernel = utf8_decode(check_output(KERNEL_COMMAND)).strip() if _kernel != SUPPORTED_KERNEL: raise OSError( - "Invalid kernel found: %s. Expected %s." % ( + "Invalid kernel found: %s. Expected %s." + % ( _kernel, SUPPORTED_KERNEL, ) @@ -36,18 +34,16 @@ def __init__(self): try: self.dist, self.codename = tuple( - utf8_decode(s).strip() - for s in check_output(RELEASE_COMMAND).split() + utf8_decode(s).strip() for s in check_output(RELEASE_COMMAND).split() ) except OSError: # Fall back to using lsb-release info file if lsb_release command # is not available. e.g. Ubuntu minimal (core, docker image). try: - with open(RELEASE_FILE, 'rU') as release_file: + with open(RELEASE_FILE, "rU") as release_file: try: lsb_info = dict( - line.strip().split('=') - for line in release_file.readlines() + line.strip().split("=") for line in release_file.readlines() ) except ValueError: raise OSError( @@ -55,56 +51,56 @@ def __init__(self): ) try: - self.dist = lsb_info['DISTRIB_ID'] - self.codename = lsb_info['DISTRIB_CODENAME'] + self.dist = lsb_info["DISTRIB_ID"] + self.codename = lsb_info["DISTRIB_CODENAME"] except KeyError: raise OSError( "Expected distribution keys missing from %s." % RELEASE_FILE ) except (IOError, OSError): - raise OSError(( - "Unable to determine system distribution. " - "%s is required." % SUPPORTED_DISTRIBUTION_TYPE - )) + raise OSError( + ( + "Unable to determine system distribution. " + "%s is required." % SUPPORTED_DISTRIBUTION_TYPE + ) + ) if self.dist != SUPPORTED_DISTRIBUTION_TYPE: raise OSError( - "%s distributions are not supported. %s is required." % ( - self.dist, SUPPORTED_DISTRIBUTION_TYPE - ) + "%s distributions are not supported. %s is required." + % (self.dist, SUPPORTED_DISTRIBUTION_TYPE) ) self.arch = LAUNCHPAD_ARCH_32 - if utf8_decode(check_output(MACHINE_COMMAND).strip()) == 'x86_64': + if utf8_decode(check_output(MACHINE_COMMAND).strip()) == "x86_64": self.arch = LAUNCHPAD_ARCH_64 class SourcesFileError(Exception): """Error class for operations on an apt configuration file - Operations include: - - verifying/reading from the current system file - - generating a new config file""" + Operations include: + - verifying/reading from the current system file + - generating a new config file""" + pass class Sources(object): """Class for apt configuration files""" - DEB_SCHEMES = frozenset(['deb', 'deb-src']) - PROTOCOLS = frozenset(['http', 'ftp', 'https']) + DEB_SCHEMES = frozenset(["deb", "deb-src"]) + PROTOCOLS = frozenset(["http", "ftp", "https"]) - DIRECTORY = '/etc/apt/' - LIST_FILE = 'sources.list' + DIRECTORY = "/etc/apt/" + LIST_FILE = "sources.list" _CONFIG_PATH = DIRECTORY + LIST_FILE def __init__(self, codename): self._codename = codename.lower() if not path.isfile(self._CONFIG_PATH): - raise SourcesFileError(( - "%s must exist as file" % self._CONFIG_PATH - )) + raise SourcesFileError(("%s must exist as file" % self._CONFIG_PATH)) self._required_component = "main" self._lines = [] @@ -114,43 +110,46 @@ def __init__(self, codename): def __set_sources_lines(self): """Read system config file and store the lines in memory for parsing - and generation of new config file""" + and generation of new config file""" try: - with open(self._CONFIG_PATH, 'r') as f: + with open(self._CONFIG_PATH, "r") as f: self._lines = f.readlines() except IOError as err: - raise SourcesFileError(( - "Unable to read system apt file: %s" % err - )) + raise SourcesFileError(("Unable to read system apt file: %s" % err)) def __confirm_apt_source_uri(self, uri): """Check if line follows correct sources.list URI""" - if (uri and (uri[0] in self.DEB_SCHEMES) and - uri[1].split('://')[0] in self.PROTOCOLS): + if ( + uri + and (uri[0] in self.DEB_SCHEMES) + and uri[1].split("://")[0] in self.PROTOCOLS + ): return True return False def __get_current_archives(self): """Parse through all lines of the system apt file to find current - mirror urls""" + mirror urls""" urls = {} for line in self._lines: fields = line.split() if self.__confirm_apt_source_uri(fields): - if (not urls and - (self._codename in fields[2]) and - (fields[3] == self._required_component)): - urls['current'] = fields[1] - elif urls and (fields[2] == '%s-security' % self._codename): - urls['security'] = fields[1] + if ( + not urls + and (self._codename in fields[2]) + and (fields[3] == self._required_component) + ): + urls["current"] = fields[1] + elif urls and (fields[2] == "%s-security" % self._codename): + urls["security"] = fields[1] break return urls def set_current_archives(self): """Read in the system apt config, parse to find current mirror urls - to set as attribute""" + to set as attribute""" try: self.__set_sources_lines() except SourcesFileError as err: @@ -158,28 +157,29 @@ def set_current_archives(self): urls = self.__get_current_archives() if not urls: - raise SourcesFileError(( - "Error finding current %s URI in %s\n%s\n" % - (self._required_component, self._CONFIG_PATH, - self.skip_gen_msg) - )) + raise SourcesFileError( + ( + "Error finding current %s URI in %s\n%s\n" + % (self._required_component, self._CONFIG_PATH, self.skip_gen_msg) + ) + ) self.urls = urls def __set_config_lines(self, new_mirror): """Replace all instances of the current urls with the new mirror""" - self._lines = ''.join(self._lines) + self._lines = "".join(self._lines) for url in self.urls.values(): self._lines = self._lines.replace(url, new_mirror) def generate_new_config(self, work_dir, new_mirror): """Write new configuration file to current working directory""" self.__set_config_lines(new_mirror) - self.new_file_path = work_dir.rstrip('/') + '/' + self.LIST_FILE + self.new_file_path = work_dir.rstrip("/") + "/" + self.LIST_FILE try: - with open(self.new_file_path, 'w') as f: + with open(self.new_file_path, "w") as f: f.write(self._lines) except IOError as err: - raise SourcesFileError(( - "Unable to generate new sources.list:\n\t%s\n" % err - )) + raise SourcesFileError( + ("Unable to generate new sources.list:\n\t%s\n" % err) + ) diff --git a/src/apt_select/arguments.py b/src/apt_select/arguments.py index d0fe763..e78a5b9 100644 --- a/src/apt_select/arguments.py +++ b/src/apt_select/arguments.py @@ -3,33 +3,33 @@ from argparse import ArgumentParser, RawTextHelpFormatter -DEFAULT_COUNTRY = 'US' +DEFAULT_COUNTRY = "US" DEFAULT_NUMBER = 1 STATUS_ARGS = ( "up-to-date", "one-day-behind", "two-days-behind", "one-week-behind", - "unknown" + "unknown", ) SKIPPED_FILE_GENERATION = 4 + def get_args(): """Get parsed command line arguments""" parser = ArgumentParser( description=( - "Find the fastest Ubuntu apt mirrors.\n" - "Generate new sources.list file." + "Find the fastest Ubuntu apt mirrors.\n" "Generate new sources.list file." ), - epilog="The exit code is 0 on success, 1 on error, and %d if "\ - "sources.list already has the chosen\n"\ + epilog="The exit code is 0 on success, 1 on error, and %d if " + "sources.list already has the chosen\n" "mirror and a new one was not generated." % SKIPPED_FILE_GENERATION, - formatter_class=RawTextHelpFormatter + formatter_class=RawTextHelpFormatter, ) parser.add_argument( - '-C', - '--country', - nargs='?', + "-C", + "--country", + nargs="?", type=str, help=( "specify a country to test its list of mirrors\n" @@ -37,26 +37,23 @@ def get_args(): "COUNTRY should follow ISO 3166-1 alpha-2 format\n" "default: %s" % DEFAULT_COUNTRY ), - metavar='COUNTRY' + metavar="COUNTRY", ) parser.add_argument( - '-t', - '--top-number', - nargs='?', + "-t", + "--top-number", + nargs="?", type=int, - help=( - "specify number of mirrors to return\n" - "default: 1\n" - ), + help=("specify number of mirrors to return\n" "default: 1\n"), const=DEFAULT_NUMBER, default=DEFAULT_NUMBER, - metavar='NUMBER' + metavar="NUMBER", ) test_group = parser.add_mutually_exclusive_group(required=False) test_group.add_argument( - '-m', - '--min-status', - nargs='?', + "-m", + "--min-status", + nargs="?", choices=STATUS_ARGS, type=str, help=( @@ -67,53 +64,55 @@ def get_args(): " %(two_day)s\n" " %(week)s\n" " %(unknown)s\n" - "default: %(up)s\n" % { - 'up': STATUS_ARGS[0], - 'day': STATUS_ARGS[1], - 'two_day': STATUS_ARGS[2], - 'week': STATUS_ARGS[3], - 'unknown': STATUS_ARGS[4] + "default: %(up)s\n" + % { + "up": STATUS_ARGS[0], + "day": STATUS_ARGS[1], + "two_day": STATUS_ARGS[2], + "week": STATUS_ARGS[3], + "unknown": STATUS_ARGS[4], } ), const=STATUS_ARGS[0], default=STATUS_ARGS[0], - metavar='STATUS' + metavar="STATUS", ) test_group.add_argument( - '-p', - '--ping-only', - action='store_true', + "-p", + "--ping-only", + action="store_true", help=( "rank mirror(s) by latency only, disregard status(es)\n" "cannot be used with -m/--min-status\n" ), - default=False + default=False, ) output_group = parser.add_mutually_exclusive_group(required=False) output_group.add_argument( - '-c', - '--choose', - action='store_true', + "-c", + "--choose", + action="store_true", help=( "choose mirror from a list\n" "requires -t/--top-num NUMBER where NUMBER > 1\n" ), - default=False + default=False, ) output_group.add_argument( - '-l', - '--list', - dest='list_only', - action='store_true', + "-l", + "--list", + dest="list_only", + action="store_true", help=( "print list of mirrors only, don't generate file\n" "cannot be used with -c/--choose\n" ), - default=False + default=False, ) return parser -if __name__ == '__main__': + +if __name__ == "__main__": get_args().parse_args() diff --git a/src/apt_select/mirrors.py b/src/apt_select/mirrors.py index 2296f6f..90ad0f9 100644 --- a/src/apt_select/mirrors.py +++ b/src/apt_select/mirrors.py @@ -4,10 +4,10 @@ Provides latency testing and mirror attribute getting from Launchpad.""" from sys import stderr -from socket import (socket, AF_INET, SOCK_STREAM, - gethostbyname, error, timeout, gaierror) +from socket import socket, AF_INET, SOCK_STREAM, gethostbyname, error, timeout, gaierror from time import time from apt_select.utils import progress_msg, get_text, URLGetTextError + try: from urlparse import urlparse except ImportError: @@ -21,6 +21,7 @@ from Queue import Queue, Empty from bs4 import BeautifulSoup, FeatureNotFound + PARSER = "lxml" try: BeautifulSoup("", PARSER) @@ -35,6 +36,7 @@ class ConnectError(Exception): """Socket connection errors""" + pass @@ -51,9 +53,7 @@ def __init__(self, url_list, ping_only, min_status): self._trip_queue = Queue() if not ping_only: self._launchpad_base = "https://launchpad.net" - self._launchpad_url = ( - self._launchpad_base + "/ubuntu/+archivemirrors" - ) + self._launchpad_url = self._launchpad_base + "/ubuntu/+archivemirrors" self._launchpad_html = "" self.abort_launch = False self._status_opts = ( @@ -61,7 +61,7 @@ def __init__(self, url_list, ping_only, min_status): "One week behind", "Two days behind", "One day behind", - "Up to date" + "Up to date", ) index = self._status_opts.index(min_status) self._status_opts = self._status_opts[index:] @@ -74,10 +74,12 @@ def get_launchpad_urls(self): try: self._launchpad_html = get_text(self._launchpad_url) except URLGetTextError as err: - stderr.write(( - "%s: %s\nUnable to retrieve list of launchpad sites\n" - "Reverting to latency only\n" % (self._launchpad_url, err) - )) + stderr.write( + ( + "%s: %s\nUnable to retrieve list of launchpad sites\n" + "Reverting to latency only\n" % (self._launchpad_url, err) + ) + ) self.abort_launch = True else: stderr.write("done.\n") @@ -85,7 +87,7 @@ def get_launchpad_urls(self): def __parse_launchpad_list(self): """Parse Launchpad's list page to find each mirror's - Official page""" + Official page""" soup = BeautifulSoup(self._launchpad_html, PARSER) prev = "" for element in soup.table.descendants: @@ -100,9 +102,7 @@ def __parse_launchpad_list(self): pass else: if url in self.urls: - self.urls[url]["Launchpad"] = ( - self._launchpad_base + prev - ) + self.urls[url]["Launchpad"] = self._launchpad_base + prev if url.startswith("/ubuntu/+mirror/"): prev = url @@ -113,9 +113,7 @@ def __kickoff_trips(self): for url in self._url_list: host = urlparse(url).netloc try: - thread = Thread( - target=_RoundTrip(url, host, self._trip_queue).min_rtt - ) + thread = Thread(target=_RoundTrip(url, host, self._trip_queue).min_rtt) except gaierror as err: stderr.write("%s: %s ignored\n" % (err, url)) else: @@ -148,21 +146,17 @@ def get_rtts(self): processed += 1 progress_msg(processed, self._num_trips) - stderr.write('\n') + stderr.write("\n") # Mirrors without latency info are removed - self.urls = { - key: val for key, val in self.urls.items() if "Latency" in val - } + self.urls = {key: val for key, val in self.urls.items() if "Latency" in val} - self.ranked = sorted( - self.urls, key=lambda x: self.urls[x]["Latency"] - ) + self.ranked = sorted(self.urls, key=lambda x: self.urls[x]["Latency"]) def __queue_lookups(self, codename, arch, data_queue): """Queue threads for data retrieval from launchpad.net - Returns number of threads started to fulfill number of - requested statuses""" + Returns number of threads started to fulfill number of + requested statuses""" num_threads = 0 for url in self.ranked: try: @@ -172,11 +166,7 @@ def __queue_lookups(self, codename, arch, data_queue): else: thread = Thread( target=_LaunchData( - url, - launch_url, - codename, - arch, - data_queue + url, launch_url, codename, arch, data_queue ).get_info ) thread.daemon = True @@ -221,7 +211,7 @@ def lookup_statuses(self, codename, arch, min_status): # iteration if another queue needs to be built) self.ranked.remove(info[0]) - if (self.got["data"] == self.status_num): + if self.got["data"] == self.status_num: break # Reorder by latency as queue returns vary building final list @@ -244,13 +234,13 @@ def __tcp_ping(self): port = 80 sock = socket(AF_INET, SOCK_STREAM) sock.settimeout(2.5) - send_tstamp = time()*1000 + send_tstamp = time() * 1000 try: sock.connect((self._addr, port)) except (timeout, error) as err: raise ConnectError(err) - recv_tstamp = time()*1000 + recv_tstamp = time() * 1000 rtt = recv_tstamp - send_tstamp sock.close() return rtt @@ -283,20 +273,17 @@ def __parse_mirror_html(self, launch_html): info = {} soup = BeautifulSoup(launch_html, PARSER) # Find elements of the ids we need - for line in soup.find_all(id=['arches', 'speed', 'organisation']): - if line.name == 'table': + for line in soup.find_all(id=["arches", "speed", "organisation"]): + if line.name == "table": # Status information lives in a table column alongside # series name and machine architecture - for tr in line.find('tbody').find_all('tr'): - arches = [x.get_text() for x in tr.find_all('td')] - if (self._codename in arches[0] and - arches[1] == self._arch): + for tr in line.find("tbody").find_all("tr"): + arches = [x.get_text() for x in tr.find_all("td")] + if self._codename in arches[0] and arches[1] == self._arch: info.update({"Status": arches[2]}) else: # "Speed" lives in a dl, and we use the key -> value as such - info.update({ - line.dt.get_text().strip(':'): line.dd.get_text() - }) + info.update({line.dt.get_text().strip(":"): line.dd.get_text()}) return info @@ -314,9 +301,9 @@ def get_info(self): else: info = self.__parse_mirror_html(launch_html) if "Status" not in info: - stderr.write(( - "Unable to parse status info from %s\n" % self._launch_url - )) + stderr.write( + ("Unable to parse status info from %s\n" % self._launch_url) + ) self._data_queue.put_nowait((self._url, None)) return diff --git a/src/apt_select/utils.py b/src/apt_select/utils.py index 1bffd12..183ab51 100644 --- a/src/apt_select/utils.py +++ b/src/apt_select/utils.py @@ -5,17 +5,16 @@ import requests -DEFAULT_REQUEST_HEADERS = { - 'User-Agent': 'apt-select' -} +DEFAULT_REQUEST_HEADERS = {"User-Agent": "apt-select"} def utf8_decode(encoded): - return encoded.decode('utf-8') + return encoded.decode("utf-8") class URLGetTextError(Exception): """Error class for fetching text from a URL""" + pass From 65d768ae4ababbd42ed30b1033a8f2906ee981c6 Mon Sep 17 00:00:00 2001 From: Johan Westin Date: Sun, 3 Mar 2024 11:10:45 +0100 Subject: [PATCH 6/7] Add Python type hints --- MANIFEST.in | 2 +- pyproject.toml | 30 +- src/apt_select/__init__.py | 2 +- src/apt_select/__main__.py | 287 ++++++++++--------- src/apt_select/apt.py | 109 ++++--- src/apt_select/{arguments.py => argument.py} | 21 +- src/apt_select/constant.py | 9 + src/apt_select/{mirrors.py => mirror.py} | 211 +++++++------- src/apt_select/utility.py | 38 +++ src/apt_select/utils.py | 37 --- typeshed/pyi/apt_select/__init__.pyi | 1 + typeshed/pyi/apt_select/__main__.pyi | 12 + typeshed/pyi/apt_select/apt.pyi | 33 +++ typeshed/pyi/apt_select/argument.pyi | 9 + typeshed/pyi/apt_select/constant.pyi | 10 + typeshed/pyi/apt_select/mirror.pyi | 27 ++ typeshed/pyi/apt_select/utility.pyi | 8 + 17 files changed, 500 insertions(+), 346 deletions(-) rename src/apt_select/{arguments.py => argument.py} (82%) create mode 100644 src/apt_select/constant.py rename src/apt_select/{mirrors.py => mirror.py} (60%) create mode 100644 src/apt_select/utility.py delete mode 100644 src/apt_select/utils.py create mode 100644 typeshed/pyi/apt_select/__init__.pyi create mode 100644 typeshed/pyi/apt_select/__main__.pyi create mode 100644 typeshed/pyi/apt_select/apt.pyi create mode 100644 typeshed/pyi/apt_select/argument.pyi create mode 100644 typeshed/pyi/apt_select/constant.pyi create mode 100644 typeshed/pyi/apt_select/mirror.pyi create mode 100644 typeshed/pyi/apt_select/utility.pyi diff --git a/MANIFEST.in b/MANIFEST.in index 4f2f121..9f7c803 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include *.rst *.txt *.md recursive-include docs *.rst *.txt *.md recursive-include src/apt_select * -recursive-include *.py +recursive-include typeshed/pyi *.py *.pyi diff --git a/pyproject.toml b/pyproject.toml index 1893a03..a7bc47f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,21 +52,30 @@ version-file = "src/apt_select/_version.py" [tool.hatch.build.targets.sdist] include = [ "/src/apt_select", + "/typeshed/pyi", ] +[tool.hatch.build.targets.wheel.force-include] +"typeshed/pyi" = "typeshed/pyi" + [tool.hatch.envs.default] python = "3.11" dependencies = [ "black", "build", "hatch", + "mypy", "ruff", + "types-beautifulsoup4", + "types-requests", "wheel", ] [tool.hatch.envs.default.scripts] dist = [ "black src", + "rm -rf 'typeshed/pyi'", + "stubgen --output=typeshed/pyi --search-path=src src", "hatch build", ] format = [ @@ -75,11 +84,13 @@ format = [ lint = [ "black src", "ruff check --fix src", + "mypy src", ] lint-check = [ "black --check src", "pylint src", "ruff check src", + "mypy src", ] test = "pytest" test-cov-xml = "pytest --cov-report=xml" @@ -87,6 +98,23 @@ test-cov-xml = "pytest --cov-report=xml" [tool.hatch.version] source = "vcs" +[tool.mypy] +check_untyped_defs = true +disallow_any_generics = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +ignore_missing_imports = true +mypy_path = ["typeshed/pyi"] +no_implicit_optional = true +python_version = "3.11" +warn_redundant_casts = true +warn_return_any = true +warn_unused_configs = true +warn_unused_ignores = true + [tool.pylint.main] # Analyse import fallback blocks. This can be used to support both Python 2 and 3 # compatible code, which means that the block might have code that exists only in @@ -550,7 +578,7 @@ max-spelling-suggestions = 4 # List of comma separated words that should be considered directives if they # appear at the beginning of a comment and should not be checked. -spelling-ignore-comment-directives = "fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip:" +spelling-ignore-comment-directives = "fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:" # List of comma separated words that should not be checked. # spelling-ignore-words = diff --git a/src/apt_select/__init__.py b/src/apt_select/__init__.py index b19ee4b..ac64ce1 100644 --- a/src/apt_select/__init__.py +++ b/src/apt_select/__init__.py @@ -1 +1 @@ -__version__ = "2.2.1" +from apt_select._version import __version__, __version_tuple__ # noqa: F401 diff --git a/src/apt_select/__main__.py b/src/apt_select/__main__.py index e5da157..ee9e80d 100644 --- a/src/apt_select/__main__.py +++ b/src/apt_select/__main__.py @@ -1,25 +1,18 @@ #!/usr/bin/env python """Main apt-select script""" -import requests +import os import re +import sys +from argparse import Namespace -from sys import exit, stderr, version_info -from os import getcwd -from apt_select.arguments import get_args, DEFAULT_COUNTRY, SKIPPED_FILE_GENERATION -from apt_select.mirrors import Mirrors -from apt_select.apt import System, Sources, SourcesFileError -from apt_select.utils import DEFAULT_REQUEST_HEADERS - -# Support input for Python 2 and 3 -get_input = input -if version_info[:2] <= (2, 7): - get_input = raw_input +import requests +from apt_select import apt, argument, constant, mirror -def set_args(): +def set_args() -> tuple[str | Namespace, int]: """Set arguments, disallow bad combination""" - parser = get_args() + parser = argument.get_arg_parser() args = parser.parse_args() # Convert status argument to format used by Launchpad @@ -29,227 +22,237 @@ def set_args(): if args.choose and (not args.top_number or args.top_number < 2): parser.print_usage() - exit( - ( - "error: -c/--choose option requires -t/--top-number NUMBER " - "where NUMBER is greater than 1." - ) - ) + return ( + "error: -c/--choose option requires -t/--top-number NUMBER " + "where NUMBER is greater than 1." + ), constant.NOK if not args.country: - stderr.write("WARNING: no country code provided. defaulting to US.\n") - args.country = DEFAULT_COUNTRY + sys.stderr.write("WARNING: no country code provided. defaulting to US.\n") + args.country = argument.DEFAULT_COUNTRY elif not re.match(r"^[a-zA-Z]{2}$", args.country): - exit( - ( - "Invalid country. %s is not in ISO 3166-1 alpha-2 " - "format" % args.country - ) + return ( + f"Invalid country. {args.country} is not in ISO 3166-1 alpha-2 format", + constant.NOK, ) - return args + return args, constant.OK -def get_mirrors(mirrors_url, country): +def get_mirrors( + mirrors_url: str, + country: str, + timeout_sec: float = constant.DEFAULT_REQUEST_TIMEOUT_SEC, +) -> tuple[list[str], int]: """Fetch list of Ubuntu mirrors""" - stderr.write("Getting list of mirrors...") - response = requests.get(mirrors_url, headers=DEFAULT_REQUEST_HEADERS) - if response.status_code == requests.codes.NOT_FOUND: - exit( - "The mirror list for country: %s was not found at %s" - % (country, mirrors_url) + sys.stderr.write("Getting list of mirrors...") + response = requests.get( + mirrors_url, headers=constant.DEFAULT_REQUEST_HEADERS, timeout=timeout_sec + ) + not_found = requests.codes.get("NOT_FOUND", None) + if response.status_code == not_found: + return ( + [f"The mirror list for country: {country} was not found at {mirrors_url}"], + constant.NOK, ) - stderr.write("done.\n") + sys.stderr.write("done.\n") - return response.text.splitlines() + return response.text.splitlines(), constant.OK -def print_status(info, rank): +def print_status(info: dict[str, float | str], rank: int) -> None: """Print full mirror status report for ranked item""" for key in ("Org", "Speed"): info.setdefault(key, "N/A") print( ( - "%(rank)d. %(mirror)s\n" - "%(tab)sLatency: %(ms).2f ms\n" - "%(tab)sOrg: %(org)s\n" - "%(tab)sStatus: %(status)s\n" - "%(tab)sSpeed: %(speed)s" - % { - "tab": " ", - "rank": rank, - "mirror": info["Host"], - "ms": info["Latency"], - "org": info["Organisation"], - "status": info["Status"], - "speed": info["Speed"], - } + f"{rank}. {info['Host']}\n" + f"{' '}Latency: {info['Latency']:.2f} ms\n" + f"{' '}Org: {info['Organisation']}\n" + f"{' '}Status: {info['Status']}\n" + f"{' '}Speed: {info['Speed']}" ) ) -def print_latency(info, rank, max_host_len): +def print_latency( + info: dict[str, float | str], rank: int, max_hostname_length: int +) -> None: """Print latency information for mirror in ranked report""" - print( - "%(rank)d. %(mirror)s: %(padding)s%(ms).2f ms" - % { - "rank": rank, - "padding": (max_host_len - info.get("host_len", max_host_len)) * " ", - "mirror": info["Host"], - "ms": info["Latency"], - } - ) + hostname_length = info.get("host_length", max_hostname_length) + if isinstance(hostname_length, int): + print( + f"{rank}. {info['Host']}: " + f"{(max_hostname_length - hostname_length) * ' '}{info['Latency']:.2f} ms" + ) -def ask(query): +def ask(query: str) -> str: """Ask for unput from user""" - answer = get_input(query) + answer = input(query) return answer -def get_selected_mirror(list_size): +def get_selected_mirror(list_size: int) -> tuple[int | None, int]: """Prompt for user input to select desired mirror""" - key = ask("Choose a mirror (1 - %d)\n'q' to quit " % list_size) + key = ask(f"Choose a mirror (1 - {list_size})\n'q' to quit ") while True: + if key == "q": + return None, constant.USER_INTERRUPT try: - key = int(key) - except ValueError: - if key == "q": - exit() - else: - if (key >= 1) and (key <= list_size): + if 1 <= int(key) <= list_size: break + except ValueError: + key = ask("Invalid entry ") - key = ask("Invalid entry ") - - return key + return int(key), constant.OK -def yes_or_no(query): +def yes_or_no(query: str) -> int: """Get definitive answer""" opts = ("yes", "no") answer = ask(query) while answer != opts[0]: if answer == opts[1]: - exit(0) - answer = ask("Please enter '%s' or '%s': " % opts) + return constant.USER_INTERRUPT + answer = ask(f"Please enter '{opts[0]}' or '{opts[1]}': ") + return constant.OK -def apt_select(): +def apt_select() -> tuple[str | None, int]: """Run apt-select: Ubuntu archive mirror reporting tool""" try: - system = System() + system = apt.System() except OSError as err: - exit("Error setting system information:\n\t%s" % err) + return f"Error setting system information:\n\t{err}", constant.NOK try: - sources = Sources(system.codename) - except SourcesFileError as err: - exit("Error with current apt sources:\n\t%s" % err) + sources = apt.Sources(codename=system.codename) + except apt.SourcesFileError as err: + return f"Error with current apt sources:\n\t{err}", constant.NOK + + args, status = set_args() + if status != constant.OK or isinstance(args, str): + return f"{args}", status - args = set_args() mirrors_loc = "mirrors.ubuntu.com" - mirrors_url = "http://%s/%s.txt" % (mirrors_loc, args.country.upper()) - mirrors_list = get_mirrors(mirrors_url, args.country) + mirrors_url = f"http://{mirrors_loc}/{args.country.upper()}.txt" + mirrors_list, status = get_mirrors(mirrors_url=mirrors_url, country=args.country) + if status != constant.OK: + return "".join(mirrors_list), status - archives = Mirrors(mirrors_list, args.ping_only, args.min_status) - archives.get_rtts() - if archives.got["ping"] < args.top_number: - args.top_number = archives.got["ping"] + mirrors = mirror.Mirrors( + url_list=mirrors_list, min_status=args.min_status, ping_only=args.ping_only + ) + mirrors.measure_rtts() + if mirrors.got["ping"] < args.top_number: + args.top_number = mirrors.got["ping"] if args.top_number == 0: - exit("Cannot connect to any mirrors in %s\n." % mirrors_list) + return f"Cannot connect to any mirrors in {mirrors_list}\n.", constant.NOK if not args.ping_only: - archives.get_launchpad_urls() - if not archives.abort_launch: + mirrors.fetch_launchpad_urls() + if not mirrors.abort_launch: # Mirrors needs a limit to stop launching threads - archives.status_num = args.top_number - stderr.write("Looking up %d status(es)\n" % args.top_number) - archives.lookup_statuses( - system.codename.capitalize(), system.arch, args.min_status + mirrors.status_num = args.top_number + sys.stderr.write(f"Looking up {args.top_number} status(es)\n") + mirrors.lookup_statuses( + codename=system.codename.capitalize(), arch=system.arch ) if args.top_number > 1: - stderr.write("\n") + sys.stderr.write("\n") - if args.ping_only or archives.abort_launch: - archives.top_list = archives.ranked[: args.top_number] + if args.ping_only or mirrors.abort_launch: + mirrors.top_list = mirrors.ranked[: args.top_number] sources.set_current_archives() current_url = sources.urls["current"] - if archives.urls.get(current_url): - archives.urls[current_url]["Host"] += " (current)" + if mirrors.urls.get(current_url): + _v1: float | str = mirrors.urls[current_url]["Host"] + if isinstance(_v1, str): + mirrors.urls[current_url]["Host"] = f"{_v1} (current)" show_status = False - max_host_len = 0 - if not args.ping_only and not archives.abort_launch: + max_hostname_length = 0 + if not args.ping_only and not mirrors.abort_launch: show_status = True else: - - def set_hostname_len(url, i): - hostname_len = len(str(i) + archives.urls[url]["Host"]) - archives.urls[url]["host_len"] = hostname_len - return hostname_len - - max_host_len = max( - [set_hostname_len(url, i + 1) for i, url in enumerate(archives.top_list)] + max_hostname_length = max( + _set_hostname_length(index=i + 1, entry=mirrors.urls[url]) + for i, url in enumerate(mirrors.top_list) ) - for i, url in enumerate(archives.top_list): - info = archives.urls[url] + for i, url in enumerate(mirrors.top_list): + info = mirrors.urls[url] rank = i + 1 if show_status: - print_status(info, rank) + print_status(info=info, rank=rank) else: - print_latency(info, rank, max_host_len) + print_latency(info=info, rank=rank, max_hostname_length=max_hostname_length) key = 0 if args.choose: - key = get_selected_mirror(len(archives.top_list)) - 1 + maybe_key, status = get_selected_mirror(list_size=len(mirrors.top_list)) + if status != constant.OK: + return None, constant.USER_INTERRUPT + if maybe_key is None: + return "Invalid mirror index", constant.INVALID_MIRROR_INDEX + key = maybe_key - 1 if args.list_only: - exit() + return None, constant.OK - new_mirror = archives.top_list[key] - print("Selecting mirror %(mirror)s ..." % {"mirror": new_mirror}) + new_mirror = mirrors.top_list[key] + print(f"Selecting mirror {new_mirror} ...") if current_url == new_mirror: - stderr.write( - "%(url)s is the currently used mirror.\n" - "%(message)s\n" % {"url": current_url, "message": sources.skip_gen_msg} - ) - exit(SKIPPED_FILE_GENERATION) + return ( + f"[{current_url}] is the currently used mirror.\n" + f"{sources.skip_gen_msg}\n" + ), constant.SKIPPED_FILE_GENERATION - work_dir = getcwd() + work_dir = os.getcwd() if work_dir == sources.DIRECTORY[0:-1]: query = ( - "'%(dir)s' is the current directory.\n" - "Generating a new '%(apt)s' file will " + f"'{sources.DIRECTORY}' is the current directory.\n" + f"Generating a new '{sources.LIST_FILE}' file will " "overwrite the current file.\n" "You should copy or backup '%(apt)s' before replacing it.\n" - "Continue?\n[yes|no] " % {"dir": sources.DIRECTORY, "apt": sources.APT_FILE} + "Continue?\n[yes|no] " ) - yes_or_no(query) + status = yes_or_no(query=query) + if status != constant.OK: + return None, status - new_mirror = archives.top_list[key] + new_mirror = mirrors.top_list[key] try: - sources.generate_new_config(work_dir, new_mirror) - except SourcesFileError as err: - exit("Error generating new config file" % err) - else: - print("New config file saved to %s" % sources.new_file_path) + sources.generate_new_config(work_dir=work_dir, new_mirror=new_mirror) + except apt.SourcesFileError as err: + return f"Error generating new config file {err}", constant.NOK + print(f"New config file saved to {sources.new_file_path}") + + return None, constant.OK + - exit() +def _set_hostname_length(index: int, entry: dict[str, float | int | str]) -> int: + hostname_len = len(f"{index}{entry['Host']}") + entry["host_length"] = hostname_len + return hostname_len -def main(): +def main() -> int: try: - apt_select() + msg, status = apt_select() + if msg: + sys.stderr.write(msg) + return status except KeyboardInterrupt: - stderr.write("Aborting...\n") + sys.stderr.write("Aborting...\n") + return 1 if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/src/apt_select/apt.py b/src/apt_select/apt.py index b3d65e6..94b4496 100644 --- a/src/apt_select/apt.py +++ b/src/apt_select/apt.py @@ -1,8 +1,9 @@ #!/usr/bin/env python -from subprocess import check_output from os import path -from apt_select.utils import utf8_decode +from subprocess import check_output + +from apt_select import constant, utility SUPPORTED_KERNEL = "Linux" SUPPORTED_DISTRIBUTION_TYPE = "Ubuntu" @@ -18,62 +19,60 @@ LAUNCHPAD_ARCHES = frozenset([LAUNCHPAD_ARCH_32, LAUNCHPAD_ARCH_64]) -class System(object): +class System: """System information for use in apt related operations""" - def __init__(self): - _kernel = utf8_decode(check_output(KERNEL_COMMAND)).strip() + def __init__(self) -> None: + _kernel = utility.utf8_decode(check_output(KERNEL_COMMAND)).strip() if _kernel != SUPPORTED_KERNEL: raise OSError( - "Invalid kernel found: %s. Expected %s." - % ( - _kernel, - SUPPORTED_KERNEL, - ) + f"Invalid kernel found: {_kernel}. Expected {SUPPORTED_KERNEL}." ) try: self.dist, self.codename = tuple( - utf8_decode(s).strip() for s in check_output(RELEASE_COMMAND).split() + utility.utf8_decode(s).strip() + for s in check_output(RELEASE_COMMAND).split() ) except OSError: # Fall back to using lsb-release info file if lsb_release command # is not available. e.g. Ubuntu minimal (core, docker image). try: - with open(RELEASE_FILE, "rU") as release_file: + with open( + RELEASE_FILE, "r", encoding=constant.ENCODING_UTF_8 + ) as release_file: try: lsb_info = dict( line.strip().split("=") for line in release_file.readlines() ) - except ValueError: + except ValueError as err: raise OSError( - "Unexpected release file format found in %s." % RELEASE_FILE - ) + f"Unexpected release file format found in {RELEASE_FILE}." + ) from err try: self.dist = lsb_info["DISTRIB_ID"] self.codename = lsb_info["DISTRIB_CODENAME"] - except KeyError: + except KeyError as err: raise OSError( - "Expected distribution keys missing from %s." % RELEASE_FILE - ) + f"Expected distribution keys missing from {RELEASE_FILE}." + ) from err - except (IOError, OSError): + except (IOError, OSError) as err: raise OSError( ( "Unable to determine system distribution. " - "%s is required." % SUPPORTED_DISTRIBUTION_TYPE + f"{SUPPORTED_DISTRIBUTION_TYPE} is required." ) - ) + ) from err if self.dist != SUPPORTED_DISTRIBUTION_TYPE: raise OSError( - "%s distributions are not supported. %s is required." - % (self.dist, SUPPORTED_DISTRIBUTION_TYPE) + f"{self.dist} distributions are not supported. {SUPPORTED_DISTRIBUTION_TYPE} is required." ) self.arch = LAUNCHPAD_ARCH_32 - if utf8_decode(check_output(MACHINE_COMMAND).strip()) == "x86_64": + if utility.utf8_decode(check_output(MACHINE_COMMAND).strip()) == "x86_64": self.arch = LAUNCHPAD_ARCH_64 @@ -84,10 +83,8 @@ class SourcesFileError(Exception): - verifying/reading from the current system file - generating a new config file""" - pass - -class Sources(object): +class Sources: """Class for apt configuration files""" DEB_SCHEMES = frozenset(["deb", "deb-src"]) @@ -97,89 +94,89 @@ class Sources(object): LIST_FILE = "sources.list" _CONFIG_PATH = DIRECTORY + LIST_FILE - def __init__(self, codename): + def __init__(self, codename: str) -> None: self._codename = codename.lower() if not path.isfile(self._CONFIG_PATH): - raise SourcesFileError(("%s must exist as file" % self._CONFIG_PATH)) + raise SourcesFileError(f"{self._CONFIG_PATH} must exist as file") self._required_component = "main" - self._lines = [] - self.urls = [] + self._lines: str | list[str] = [] + self.urls: dict[str, str] = {} self.skip_gen_msg = "Skipping file generation" - self.new_file_path = None + self.new_file_path: str | None = None - def __set_sources_lines(self): + def __set_sources_lines(self) -> None: """Read system config file and store the lines in memory for parsing and generation of new config file""" try: - with open(self._CONFIG_PATH, "r") as f: + with open(self._CONFIG_PATH, "r", encoding=constant.ENCODING_UTF_8) as f: self._lines = f.readlines() except IOError as err: - raise SourcesFileError(("Unable to read system apt file: %s" % err)) + raise SourcesFileError(f"Unable to read system apt file: {err}") from err - def __confirm_apt_source_uri(self, uri): + def __confirm_apt_source_uri(self, uris: list[str]) -> bool: """Check if line follows correct sources.list URI""" if ( - uri - and (uri[0] in self.DEB_SCHEMES) - and uri[1].split("://")[0] in self.PROTOCOLS + uris + and (uris[0] in self.DEB_SCHEMES) + and uris[1].split("://")[0] in self.PROTOCOLS ): return True return False - def __get_current_archives(self): + def __get_current_archives(self) -> dict[str, str]: """Parse through all lines of the system apt file to find current mirror urls""" - urls = {} + urls: dict[str, str] = {} for line in self._lines: fields = line.split() - if self.__confirm_apt_source_uri(fields): + if self.__confirm_apt_source_uri(uris=fields): if ( not urls and (self._codename in fields[2]) and (fields[3] == self._required_component) ): urls["current"] = fields[1] - elif urls and (fields[2] == "%s-security" % self._codename): + elif urls and (fields[2] == f"{self._codename}-security"): urls["security"] = fields[1] break return urls - def set_current_archives(self): + def set_current_archives(self) -> None: """Read in the system apt config, parse to find current mirror urls to set as attribute""" try: self.__set_sources_lines() except SourcesFileError as err: - raise SourcesFileError(err) + raise SourcesFileError(err) from err urls = self.__get_current_archives() if not urls: raise SourcesFileError( - ( - "Error finding current %s URI in %s\n%s\n" - % (self._required_component, self._CONFIG_PATH, self.skip_gen_msg) - ) + f"Error finding current {self._required_component} URI in {self._CONFIG_PATH}\n{self.skip_gen_msg}\n" ) self.urls = urls - def __set_config_lines(self, new_mirror): + def __set_config_lines(self, new_mirror: str) -> None: """Replace all instances of the current urls with the new mirror""" self._lines = "".join(self._lines) for url in self.urls.values(): self._lines = self._lines.replace(url, new_mirror) - def generate_new_config(self, work_dir, new_mirror): + def generate_new_config(self, work_dir: str, new_mirror: str) -> None: """Write new configuration file to current working directory""" - self.__set_config_lines(new_mirror) + self.__set_config_lines(new_mirror=new_mirror) self.new_file_path = work_dir.rstrip("/") + "/" + self.LIST_FILE try: - with open(self.new_file_path, "w") as f: - f.write(self._lines) + if isinstance(self._lines, str): + with open( + self.new_file_path, "w", encoding=constant.ENCODING_UTF_8 + ) as f: + f.write(self._lines) except IOError as err: raise SourcesFileError( - ("Unable to generate new sources.list:\n\t%s\n" % err) - ) + f"Unable to generate new sources.list:\n\t{err}\n" + ) from err diff --git a/src/apt_select/arguments.py b/src/apt_select/argument.py similarity index 82% rename from src/apt_select/arguments.py rename to src/apt_select/argument.py index e78a5b9..d24e26e 100644 --- a/src/apt_select/arguments.py +++ b/src/apt_select/argument.py @@ -3,6 +3,8 @@ from argparse import ArgumentParser, RawTextHelpFormatter +from apt_select import constant + DEFAULT_COUNTRY = "US" DEFAULT_NUMBER = 1 STATUS_ARGS = ( @@ -12,18 +14,19 @@ "one-week-behind", "unknown", ) -SKIPPED_FILE_GENERATION = 4 -def get_args(): +def get_arg_parser() -> ArgumentParser: """Get parsed command line arguments""" parser = ArgumentParser( description=( - "Find the fastest Ubuntu apt mirrors.\n" "Generate new sources.list file." + "Find the fastest Ubuntu apt mirrors.\nGenerate new sources.list file." + ), + epilog=( + f"The exit code is {constant.OK} on success, {constant.NOK} on error" + f", and {constant.SKIPPED_FILE_GENERATION} if sources.list already has the chosen\n" + "mirror and a new one was not generated." ), - epilog="The exit code is 0 on success, 1 on error, and %d if " - "sources.list already has the chosen\n" - "mirror and a new one was not generated." % SKIPPED_FILE_GENERATION, formatter_class=RawTextHelpFormatter, ) parser.add_argument( @@ -35,7 +38,7 @@ def get_args(): "specify a country to test its list of mirrors\n" "used to match country list file names found at mirrors.ubuntu.com\n" "COUNTRY should follow ISO 3166-1 alpha-2 format\n" - "default: %s" % DEFAULT_COUNTRY + f"default: {DEFAULT_COUNTRY}" ), metavar="COUNTRY", ) @@ -44,7 +47,7 @@ def get_args(): "--top-number", nargs="?", type=int, - help=("specify number of mirrors to return\n" "default: 1\n"), + help="specify number of mirrors to return\n" "default: 1\n", const=DEFAULT_NUMBER, default=DEFAULT_NUMBER, metavar="NUMBER", @@ -115,4 +118,4 @@ def get_args(): if __name__ == "__main__": - get_args().parse_args() + get_arg_parser().parse_args() diff --git a/src/apt_select/constant.py b/src/apt_select/constant.py new file mode 100644 index 0000000..00b514d --- /dev/null +++ b/src/apt_select/constant.py @@ -0,0 +1,9 @@ +DEFAULT_REQUEST_HEADERS = {"User-Agent": "apt-select"} +DEFAULT_REQUEST_TIMEOUT_SEC = 120.0 +ENCODING_UTF_8 = "utf-8" + +OK = 0 +NOK = 1 +INVALID_MIRROR_INDEX = 2 +USER_INTERRUPT = 3 +SKIPPED_FILE_GENERATION = 4 diff --git a/src/apt_select/mirrors.py b/src/apt_select/mirror.py similarity index 60% rename from src/apt_select/mirrors.py rename to src/apt_select/mirror.py index 90ad0f9..8c3625f 100644 --- a/src/apt_select/mirrors.py +++ b/src/apt_select/mirror.py @@ -1,62 +1,51 @@ #!/usr/bin/env python """The mirrors module defines classes and methods for Ubuntu archive mirrors. - Provides latency testing and mirror attribute getting from Launchpad.""" +Provides latency testing and mirror attribute getting from Launchpad.""" -from sys import stderr +import sys +from queue import Queue, Empty from socket import socket, AF_INET, SOCK_STREAM, gethostbyname, error, timeout, gaierror -from time import time -from apt_select.utils import progress_msg, get_text, URLGetTextError - -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse - from threading import Thread - -try: - from queue import Queue, Empty -except ImportError: - from Queue import Queue, Empty +from time import time +from urllib.parse import urlparse from bs4 import BeautifulSoup, FeatureNotFound +from apt_select import utility + PARSER = "lxml" try: BeautifulSoup("", PARSER) except FeatureNotFound: PARSER = "html.parser" -try: - xrange -except NameError: - xrange = range - class ConnectError(Exception): """Socket connection errors""" - pass - -class Mirrors(object): +class Mirrors: """Base for collection of archive mirrors""" - def __init__(self, url_list, ping_only, min_status): - self.urls = {} + def __init__( + self, url_list: list[str], min_status: int, ping_only: bool | None = None + ) -> None: + if ping_only is None: + ping_only = False + self.urls: dict[str, dict[str, float | int | str]] = {} self._url_list = url_list self._num_trips = 0 self.got = {"ping": 0, "data": 0} - self.ranked = [] - self.top_list = [] - self._trip_queue = Queue() + self.ranked: list[str] = [] + self.top_list: list[str] = [] + self._trip_queue: Queue[tuple[str, float] | None] = Queue() if not ping_only: self._launchpad_base = "https://launchpad.net" self._launchpad_url = self._launchpad_base + "/ubuntu/+archivemirrors" self._launchpad_html = "" self.abort_launch = False - self._status_opts = ( + self._status_opts: tuple[str, ...] = ( "unknown", "One week behind", "Two days behind", @@ -68,71 +57,75 @@ def __init__(self, url_list, ping_only, min_status): # Default to top self.status_num = 1 - def get_launchpad_urls(self): + def fetch_launchpad_urls(self) -> None: """Obtain mirrors' corresponding launchpad URLs""" - stderr.write("Getting list of launchpad URLs...") + sys.stderr.write("Getting list of launchpad URLs...") try: - self._launchpad_html = get_text(self._launchpad_url) - except URLGetTextError as err: - stderr.write( + self._launchpad_html = utility.get_text(self._launchpad_url) + except utility.URLGetTextError as err: + sys.stderr.write( ( - "%s: %s\nUnable to retrieve list of launchpad sites\n" - "Reverting to latency only\n" % (self._launchpad_url, err) + f"{self._launchpad_url}: {err}\nUnable to retrieve list of launchpad sites\n" + "Reverting to latency only\n" ) ) self.abort_launch = True else: - stderr.write("done.\n") + sys.stderr.write("done.\n") self.__parse_launchpad_list() - def __parse_launchpad_list(self): + def __parse_launchpad_list(self) -> None: """Parse Launchpad's list page to find each mirror's Official page""" soup = BeautifulSoup(self._launchpad_html, PARSER) prev = "" - for element in soup.table.descendants: - try: - url = element.a - except AttributeError: - pass - else: + if soup.table is not None: + for element in soup.table.descendants: try: - url = url["href"] - except TypeError: + url = element.a + except AttributeError: pass else: - if url in self.urls: - self.urls[url]["Launchpad"] = self._launchpad_base + prev - - if url.startswith("/ubuntu/+mirror/"): - prev = url - - def __kickoff_trips(self): + try: + url = url["href"] + except TypeError: + pass + else: + if url in self.urls: + self.urls[url][ + "Launchpad" + ] = f"{self._launchpad_base}{prev}" + + if url.startswith("/ubuntu/+mirror/"): + prev = url + + def __kickoff_trips(self) -> None: """Instantiate round trips class for all, initiating queued threads""" for url in self._url_list: host = urlparse(url).netloc try: - thread = Thread(target=_RoundTrip(url, host, self._trip_queue).min_rtt) + round_trip = _RoundTrip(url=url, host=host, trip_queue=self._trip_queue) + thread = Thread(target=round_trip.min_rtt) except gaierror as err: - stderr.write("%s: %s ignored\n" % (err, url)) + sys.stderr.write(f"{err}: {url} ignored\n") else: self.urls[url] = {"Host": host} thread.daemon = True thread.start() self._num_trips += 1 - def get_rtts(self): + def measure_rtts(self) -> None: """Test latency to all mirrors""" - stderr.write("Testing latency to mirror(s)\n") + sys.stderr.write("Testing latency to mirror(s)\n") self.__kickoff_trips() processed = 0 - progress_msg(processed, self._num_trips) - for _ in xrange(self._num_trips): + utility.progress_msg(processed=processed, total=self._num_trips) + for _ in range(self._num_trips): try: - min_rtt = self._trip_queue.get(block=True) + min_rtt: tuple[str, float] | None = self._trip_queue.get(block=True) except Empty: pass else: @@ -144,35 +137,45 @@ def get_rtts(self): self.got["ping"] += 1 processed += 1 - progress_msg(processed, self._num_trips) + utility.progress_msg(processed=processed, total=self._num_trips) - stderr.write("\n") + sys.stderr.write("\n") # Mirrors without latency info are removed self.urls = {key: val for key, val in self.urls.items() if "Latency" in val} self.ranked = sorted(self.urls, key=lambda x: self.urls[x]["Latency"]) - def __queue_lookups(self, codename, arch, data_queue): + def __queue_lookups( + self, + codename: str, + arch: str, + data_queue: Queue[tuple[str, dict[str, str] | None]], + ) -> int: """Queue threads for data retrieval from launchpad.net Returns number of threads started to fulfill number of requested statuses""" num_threads = 0 for url in self.ranked: + launch_url: float | str | None = None try: launch_url = self.urls[url]["Launchpad"] except KeyError: pass else: - thread = Thread( - target=_LaunchData( - url, launch_url, codename, arch, data_queue - ).get_info - ) - thread.daemon = True - thread.start() - - num_threads += 1 + if launch_url is not None and isinstance(launch_url, str): + thread = Thread( + target=_LaunchData( + url=url, + launch_url=launch_url, + codename=codename, + arch=arch, + data_queue=data_queue, + ).get_info + ) + thread.daemon = True + thread.start() + num_threads += 1 # We expect number of retrieved status requests may already # be greater than 0. This would be the case anytime an initial @@ -182,16 +185,18 @@ def __queue_lookups(self, codename, arch, data_queue): return num_threads - def lookup_statuses(self, codename, arch, min_status): + def lookup_statuses(self, codename: str, arch: str) -> None: """Scrape statuses/info in from launchpad.net mirror pages""" while (self.got["data"] < self.status_num) and self.ranked: - data_queue = Queue() - num_threads = self.__queue_lookups(codename, arch, data_queue) + data_queue: Queue[tuple[str, dict[str, str] | None]] = Queue() + num_threads = self.__queue_lookups( + codename=codename, arch=arch, data_queue=data_queue + ) if num_threads == 0: break # Get output of all started thread methods from queue - progress_msg(self.got["data"], self.status_num) - for _ in xrange(num_threads): + utility.progress_msg(processed=self.got["data"], total=self.status_num) + for _ in range(num_threads): try: # We don't care about timeouts longer than 7 seconds as # we're only getting 16 KB @@ -204,7 +209,7 @@ def lookup_statuses(self, codename, arch, min_status): self.urls[info[0]].update(info[1]) self.got["data"] += 1 self.top_list.append(info[0]) - progress_msg(self.got["data"], self.status_num) + utility.progress_msg(self.got["data"], self.status_num) # Eliminate the url from the ranked list as long as # something is received from the queue (for selective @@ -220,16 +225,18 @@ def lookup_statuses(self, codename, arch, min_status): data_queue.join() -class _RoundTrip(object): +class _RoundTrip: """Socket connections for latency reporting""" - def __init__(self, url, host, trip_queue): + def __init__( + self, url: str, host: str, trip_queue: Queue[tuple[str, float] | None] + ) -> None: self._url = url self._host = host - self._trip_queue = trip_queue + self._trip_queue: Queue[tuple[str, float] | None] = trip_queue self._addr = gethostbyname(host) - def __tcp_ping(self): + def __tcp_ping(self) -> float: """Return socket latency to host's resolved IP address""" port = 80 sock = socket(AF_INET, SOCK_STREAM) @@ -238,39 +245,45 @@ def __tcp_ping(self): try: sock.connect((self._addr, port)) except (timeout, error) as err: - raise ConnectError(err) + raise ConnectError(err) from err recv_tstamp = time() * 1000 rtt = recv_tstamp - send_tstamp sock.close() return rtt - def min_rtt(self): + def min_rtt(self) -> None: """Return lowest rtt""" rtts = [] - for _ in xrange(3): + for _ in range(3): try: rtt = self.__tcp_ping() except ConnectError as err: - stderr.write("\tconnection to %s: %s\n" % (self._host, err)) + sys.stderr.write(f"\tconnection to {self._host}: {err}\n") self._trip_queue.put_nowait(None) return - else: - rtts.append(rtt) + rtts.append(rtt) self._trip_queue.put((self._url, min(rtts))) -class _LaunchData(object): - def __init__(self, url, launch_url, codename, arch, data_queue): +class _LaunchData: + def __init__( + self, + url: str, + launch_url: str, + codename: str, + arch: str, + data_queue: Queue[tuple[str, dict[str, str] | None]], + ) -> None: self._url = url self._launch_url = launch_url self._codename = codename self._arch = arch self._data_queue = data_queue - def __parse_mirror_html(self, launch_html): - info = {} + def __parse_mirror_html(self, launch_html: str) -> dict[str, str]: + info: dict[str, str] = {} soup = BeautifulSoup(launch_html, PARSER) # Find elements of the ids we need for line in soup.find_all(id=["arches", "speed", "organisation"]): @@ -287,22 +300,22 @@ def __parse_mirror_html(self, launch_html): return info - def get_info(self): + def get_info(self) -> None: """Parse launchpad page HTML for mirror information Ideally, launchpadlib would be used to get mirror information, but the Launchpad API doesn't support access to archivemirror statuses.""" try: - launch_html = get_text(self._launch_url) - except URLGetTextError as err: - stderr.write("connection to %s: %s\n" % (self._launch_url, err)) + launch_html = utility.get_text(self._launch_url) + except utility.URLGetTextError as err: + sys.stderr.write(f"connection to {self._launch_url}: {err}\n") self._data_queue.put_nowait((self._url, None)) else: - info = self.__parse_mirror_html(launch_html) + info = self.__parse_mirror_html(launch_html=launch_html) if "Status" not in info: - stderr.write( - ("Unable to parse status info from %s\n" % self._launch_url) + sys.stderr.write( + f"Unable to parse status info from {self._launch_url}\n" ) self._data_queue.put_nowait((self._url, None)) return diff --git a/src/apt_select/utility.py b/src/apt_select/utility.py new file mode 100644 index 0000000..ca9b8c5 --- /dev/null +++ b/src/apt_select/utility.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +"""Collection of module neutral utility functions""" + +import sys + +import requests +from apt_select import constant + + +def utf8_decode(encoded: bytes) -> str: + return encoded.decode(constant.ENCODING_UTF_8) + + +class URLGetTextError(Exception): + """Error class for fetching text from a URL""" + + +def get_text( + url: str, timeout_sec: float = constant.DEFAULT_REQUEST_TIMEOUT_SEC +) -> str: + """Return text from GET request response content""" + try: + result = requests.get( + url, headers=constant.DEFAULT_REQUEST_HEADERS, timeout=timeout_sec + ) + result.raise_for_status() + except requests.HTTPError as err: + raise URLGetTextError(err) from err + + return result.text + + +def progress_msg(processed: float | int, total: float | int) -> None: + """Update user on percent done""" + if total > 1: + percent = int((float(processed) / total) * 100) + sys.stderr.write(f"\r[{processed}/{total}] {percent}%") + sys.stderr.flush() diff --git a/src/apt_select/utils.py b/src/apt_select/utils.py deleted file mode 100644 index 183ab51..0000000 --- a/src/apt_select/utils.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python -"""Collection of module neutral utility functions""" - -from sys import stderr - -import requests - -DEFAULT_REQUEST_HEADERS = {"User-Agent": "apt-select"} - - -def utf8_decode(encoded): - return encoded.decode("utf-8") - - -class URLGetTextError(Exception): - """Error class for fetching text from a URL""" - - pass - - -def get_text(url): - """Return text from GET request response content""" - try: - result = requests.get(url, headers=DEFAULT_REQUEST_HEADERS) - result.raise_for_status() - except requests.HTTPError as err: - raise URLGetTextError(err) - - return result.text - - -def progress_msg(processed, total): - """Update user on percent done""" - if total > 1: - percent = int((float(processed) / total) * 100) - stderr.write("\r[%d/%d] %d%%" % (processed, total, percent)) - stderr.flush() diff --git a/typeshed/pyi/apt_select/__init__.pyi b/typeshed/pyi/apt_select/__init__.pyi new file mode 100644 index 0000000..50d8897 --- /dev/null +++ b/typeshed/pyi/apt_select/__init__.pyi @@ -0,0 +1 @@ +from apt_select._version import __version__ as __version__, __version_tuple__ as __version_tuple__ diff --git a/typeshed/pyi/apt_select/__main__.pyi b/typeshed/pyi/apt_select/__main__.pyi new file mode 100644 index 0000000..c490258 --- /dev/null +++ b/typeshed/pyi/apt_select/__main__.pyi @@ -0,0 +1,12 @@ +from apt_select import apt as apt, argument as argument, constant as constant, mirror as mirror +from argparse import Namespace + +def set_args() -> tuple[str | Namespace, int]: ... +def get_mirrors(mirrors_url: str, country: str, timeout_sec: float = ...) -> tuple[list[str], int]: ... +def print_status(info: dict[str, float | str], rank: int) -> None: ... +def print_latency(info: dict[str, float | str], rank: int, max_hostname_length: int) -> None: ... +def ask(query: str) -> str: ... +def get_selected_mirror(list_size: int) -> tuple[int | None, int]: ... +def yes_or_no(query: str) -> int: ... +def apt_select() -> tuple[str | None, int]: ... +def main() -> int: ... diff --git a/typeshed/pyi/apt_select/apt.pyi b/typeshed/pyi/apt_select/apt.pyi new file mode 100644 index 0000000..c078859 --- /dev/null +++ b/typeshed/pyi/apt_select/apt.pyi @@ -0,0 +1,33 @@ +from _typeshed import Incomplete +from apt_select import constant as constant, utility as utility + +SUPPORTED_KERNEL: str +SUPPORTED_DISTRIBUTION_TYPE: str +UNAME: str +KERNEL_COMMAND: Incomplete +MACHINE_COMMAND: Incomplete +RELEASE_COMMAND: Incomplete +RELEASE_FILE: str +LAUNCHPAD_ARCH_32: str +LAUNCHPAD_ARCH_64: str +LAUNCHPAD_ARCHES: Incomplete + +class System: + dist: Incomplete + codename: Incomplete + arch: Incomplete + def __init__(self) -> None: ... + +class SourcesFileError(Exception): ... + +class Sources: + DEB_SCHEMES: Incomplete + PROTOCOLS: Incomplete + DIRECTORY: str + LIST_FILE: str + urls: Incomplete + skip_gen_msg: str + new_file_path: Incomplete + def __init__(self, codename: str) -> None: ... + def set_current_archives(self) -> None: ... + def generate_new_config(self, work_dir: str, new_mirror: str) -> None: ... diff --git a/typeshed/pyi/apt_select/argument.pyi b/typeshed/pyi/apt_select/argument.pyi new file mode 100644 index 0000000..f3a9f53 --- /dev/null +++ b/typeshed/pyi/apt_select/argument.pyi @@ -0,0 +1,9 @@ +from _typeshed import Incomplete +from apt_select import constant as constant +from argparse import ArgumentParser + +DEFAULT_COUNTRY: str +DEFAULT_NUMBER: int +STATUS_ARGS: Incomplete + +def get_arg_parser() -> ArgumentParser: ... diff --git a/typeshed/pyi/apt_select/constant.pyi b/typeshed/pyi/apt_select/constant.pyi new file mode 100644 index 0000000..6d220e4 --- /dev/null +++ b/typeshed/pyi/apt_select/constant.pyi @@ -0,0 +1,10 @@ +from _typeshed import Incomplete + +DEFAULT_REQUEST_HEADERS: Incomplete +DEFAULT_REQUEST_TIMEOUT_SEC: float +ENCODING_UTF_8: str +OK: int +NOK: int +INVALID_MIRROR_INDEX: int +USER_INTERRUPT: int +SKIPPED_FILE_GENERATION: int diff --git a/typeshed/pyi/apt_select/mirror.pyi b/typeshed/pyi/apt_select/mirror.pyi new file mode 100644 index 0000000..e40890e --- /dev/null +++ b/typeshed/pyi/apt_select/mirror.pyi @@ -0,0 +1,27 @@ +from _typeshed import Incomplete +from apt_select import utility as utility +from queue import Queue + +PARSER: str + +class ConnectError(Exception): ... + +class Mirrors: + urls: Incomplete + got: Incomplete + ranked: Incomplete + top_list: Incomplete + abort_launch: bool + status_num: int + def __init__(self, url_list: list[str], min_status: int, ping_only: bool | None = None) -> None: ... + def fetch_launchpad_urls(self) -> None: ... + def measure_rtts(self) -> None: ... + def lookup_statuses(self, codename: str, arch: str) -> None: ... + +class _RoundTrip: + def __init__(self, url: str, host: str, trip_queue: Queue[tuple[str, float] | None]) -> None: ... + def min_rtt(self) -> None: ... + +class _LaunchData: + def __init__(self, url: str, launch_url: str, codename: str, arch: str, data_queue: Queue[tuple[str, dict[str, str] | None]]) -> None: ... + def get_info(self) -> None: ... diff --git a/typeshed/pyi/apt_select/utility.pyi b/typeshed/pyi/apt_select/utility.pyi new file mode 100644 index 0000000..80159bc --- /dev/null +++ b/typeshed/pyi/apt_select/utility.pyi @@ -0,0 +1,8 @@ +from apt_select import constant as constant + +def utf8_decode(encoded: bytes) -> str: ... + +class URLGetTextError(Exception): ... + +def get_text(url: str, timeout_sec: float = ...) -> str: ... +def progress_msg(processed: float | int, total: float | int) -> None: ... From b470c0aa13bca745a9abfa2742652abef4c3c3b8 Mon Sep 17 00:00:00 2001 From: Johan Westin Date: Sun, 3 Mar 2024 11:17:41 +0100 Subject: [PATCH 7/7] Prepare for pytest --- pyproject.toml | 40 +++++++++++++++++++++++++++++++--------- test/.gitignore | 0 2 files changed, 31 insertions(+), 9 deletions(-) create mode 100644 test/.gitignore diff --git a/pyproject.toml b/pyproject.toml index a7bc47f..4566a00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,15 @@ dependencies = [ "build", "hatch", "mypy", + "pylint", + "pytest", + "pytest-clarity", + "pytest-cov", + "pytest-describe", + "pytest-expecter", + "pytest-random", + "pytest-sugar", + "pytest-xdist[psutil]", "ruff", "types-beautifulsoup4", "types-requests", @@ -73,24 +82,24 @@ dependencies = [ [tool.hatch.envs.default.scripts] dist = [ - "black src", + "black src test", "rm -rf 'typeshed/pyi'", "stubgen --output=typeshed/pyi --search-path=src src", "hatch build", ] format = [ - "black src", + "black src test", ] lint = [ - "black src", - "ruff check --fix src", - "mypy src", + "black src test", + "ruff check --fix src test", + "mypy src test", ] lint-check = [ - "black --check src", - "pylint src", - "ruff check src", - "mypy src", + "black --check src test", + "pylint src/apt_select", + "ruff check src test", + "mypy src test", ] test = "pytest" test-cov-xml = "pytest --cov-report=xml" @@ -669,3 +678,16 @@ ignored-argument-names = "_.*|^ignored_|^unused_" # List of qualified module names which can have objects that can redefine # builtins. redefining-builtins-modules = ["six.moves", "past.builtins", "future.builtins", "builtins", "io"] + +[tool.pytest.ini_options] +addopts = "-n 4 --cov=src/apt_select/ --cov-report=term-missing" +minversion = "7.2" +python_files = [ + "*Test.py", + "*_test.py", + "test_*.py", +] +testpaths = [ + "src", + "test", +] diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 0000000..e69de29