diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9d5c918..4b5e71d 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -12,12 +12,12 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10", "3.11"] + python-version: ["3.10", 3.11, 3.12, 3.13, 3.14] steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/RELEASING.md b/RELEASING.md index e065257..49e7218 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -11,6 +11,11 @@ tox -r flake8 src ``` +- Make sure pylint report a score of 10: +```shell +pylint src +``` + - Update the version number, by removing the trailing `.dev0` in: - `setup.cfg` - `src/pyconcepticon/__init__.py` diff --git a/setup.cfg b/setup.cfg index e0c2d21..ee0e5f4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,12 +22,12 @@ classifiers = Natural Language :: English Operating System :: OS Independent Programming Language :: Python :: 3 - Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 Programming Language :: Python :: 3.11 Programming Language :: Python :: 3.12 Programming Language :: Python :: 3.13 + Programming Language :: Python :: 3.14 Programming Language :: Python :: Implementation :: CPython Programming Language :: Python :: Implementation :: PyPy License :: OSI Approved :: Apache Software License @@ -37,18 +37,17 @@ zip_safe = False packages = find: package_dir = = src -python_requires = >=3.8 +python_requires = >=3.9 install_requires = setuptools - attrs>=18.1.0 - pybtex>=0.22.2 - csvw>=3 - clldutils>=3.4 + simplepybtex + csvw>=4 + clldutils>=4 cldfcatalog>=1.3 cdstarcat nameparser termcolor - tabulate + backports.strenum; python_version < '3.11' include_package_data = True [options.packages.find] @@ -95,7 +94,7 @@ show_missing = true skip_covered = true [tox:tox] -envlist = py38, py39, py310, py311, py312, py313 +envlist = py39, py310, py311, py312, py313, py314 isolated_build = true skip_missing_interpreter = true diff --git a/src/pyconcepticon/__init__.py b/src/pyconcepticon/__init__.py index fc17bcb..cdddf5c 100644 --- a/src/pyconcepticon/__init__.py +++ b/src/pyconcepticon/__init__.py @@ -1,3 +1,6 @@ +""" +Functionality to access and curate the Concepticon dataset. +""" # noqa from pyconcepticon.api import Concepticon # noqa: F401 diff --git a/src/pyconcepticon/__main__.py b/src/pyconcepticon/__main__.py index bdeae41..c9b5aac 100644 --- a/src/pyconcepticon/__main__.py +++ b/src/pyconcepticon/__main__.py @@ -11,9 +11,13 @@ concepticon [OPTIONS] [args] """ +import logging import sys import pathlib +import argparse import contextlib +from typing import Optional +from collections.abc import Sequence from clldutils.clilib import register_subcommands, get_parser_and_subparsers, ParserError from clldutils.loglib import Logging @@ -23,7 +27,12 @@ import pyconcepticon.commands -def main(args=None, catch_all=False, parsed_args=None, log=None): +def main( # pylint: disable=C0116 + args: Optional[Sequence[str]] = None, + catch_all: bool = False, + parsed_args: Optional[argparse.Namespace] = None, + log: Optional[logging.Logger] = None, +) -> Optional[int]: repos = None try: repos = cldfcatalog.Config.from_file().get_clone('concepticon') @@ -58,14 +67,14 @@ def main(args=None, catch_all=False, parsed_args=None, log=None): # use of a Catalog as context manager: stack.enter_context(cldfcatalog.Catalog(args.repos, tag=args.repos_version)) args.repos = Concepticon(args.repos) - args.log.info('concepticon/concepticon-data at {0}'.format(args.repos.repos)) + args.log.info(f'concepticon/concepticon-data at {args.repos.repos}') try: return args.main(args) or 0 except KeyboardInterrupt: # pragma: no cover return 0 except ParserError as e: print(e) - return main([args._command, '-h']) + return main([args._command, '-h']) # pylint: disable=W0212 except Exception as e: # pragma: no cover if catch_all: print(e) diff --git a/src/pyconcepticon/_compat.py b/src/pyconcepticon/_compat.py new file mode 100644 index 0000000..4589a78 --- /dev/null +++ b/src/pyconcepticon/_compat.py @@ -0,0 +1,10 @@ +""" +Compat with older python versions. +""" +import sys + +if sys.version_info >= (3, 11): + from enum import StrEnum + assert StrEnum +else: + from backports.strenum import StrEnum # pragma: no cover diff --git a/src/pyconcepticon/api.py b/src/pyconcepticon/api.py index 1b5b5f1..9d04486 100644 --- a/src/pyconcepticon/api.py +++ b/src/pyconcepticon/api.py @@ -1,25 +1,41 @@ +""" +API to curate and access Concepticon data. +""" import re -import typing +from typing import Any, Optional, Union, TypedDict, Literal import pathlib import warnings import functools import collections +from collections.abc import Iterable, Container +import dataclasses import cldfcatalog -import pybtex.database +import simplepybtex.database from clldutils import jsonlib from clldutils.apilib import API from clldutils.markup import iter_markdown_tables from clldutils.source import Source -from pyconcepticon.glosses import concept_map, concept_map2 +from pyconcepticon.glosses import concept_map, concept_map2, MappingDict, map_list, MapOptions, \ + Similarity, GlossLanguage # The following symbols from models can explicitly be imported from pyconcepticon.api: from pyconcepticon.models import ( # noqa: F401 - Languoid, Metadata, Concept, Conceptlist, ConceptRelations, Conceptset, REF_PATTERN, MD_SUFFIX, + Languoid, Concept, Conceptlist, ConceptRelations, Conceptset, REF_PATTERN, MD_SUFFIX, ) -from pyconcepticon.util import read_dicts, lowercase, to_dict, UnicodeWriter, BIB_PATTERN +from pyconcepticon.util import read_dicts, lowercase, to_dict, BIB_PATTERN +assert MD_SUFFIX and Concept Editor = collections.namedtuple('Editor', ['name', 'start', 'end']) +PathType = Union[str, pathlib.Path] +ResourceType = Literal["Concept", "Conceptset", "Conceptlist"] + + +class Retirement(TypedDict): + """Retirements are stored as JSON objects.""" + id: str + comment: str + replacement: str class Concepticon(API): @@ -39,7 +55,7 @@ class Concepticon(API): 'publisher.contact': 'concepticon@eva.mpg.de', } - def __init__(self, repos: typing.Optional[typing.Union[str, pathlib.Path]] = None): + def __init__(self, repos: Optional[PathType] = None): """ :param repos: Path to a clone or source dump of concepticon-data. """ @@ -54,9 +70,10 @@ def data_path(self, *comps: str) -> pathlib.Path: return self.path('concepticondata', *comps) @functools.cached_property - def editors(self) -> typing.List[Editor]: + def editors(self) -> list[Editor]: + """The Concepticon editors, current and earlier.""" res = [] - header, rows = next( + _, rows = next( iter_markdown_tables(self.path('CONTRIBUTORS.md').read_text(encoding='utf8'))) for (period, name) in rows: start, to_, end = period.strip().partition('-') @@ -65,7 +82,7 @@ def editors(self) -> typing.List[Editor]: return res @functools.cached_property - def vocabularies(self) -> typing.Dict[str, dict]: + def vocabularies(self) -> dict[str, dict]: """ Provide access to a `dict` of controlled vocabularies. """ @@ -77,168 +94,128 @@ def vocabularies(self) -> typing.Dict[str, dict]: return res @property - def bibfile(self) -> pathlib.Path: + def bibfile(self) -> pathlib.Path: # pylint: disable=C0116 return self.data_path('references', 'references.bib') @functools.cached_property - def sources(self) -> dict: + def sources(self) -> dict[str, dict[str, Any]]: # pylint: disable=C0116 return jsonlib.load(self.data_path('sources', 'cdstar.json')) @functools.cached_property - def retirements(self): + def retirements(self) -> dict[str, list[Retirement]]: + """Lists of retirements by resource type.""" return jsonlib.load( self.data_path('retired.json'), object_pairs_hook=collections.OrderedDict) - def add_retirement(self, type_, repl): + def add_retirement(self, type_: ResourceType, repl: Retirement): + """Add info about a retired object to the retirements registry.""" obj = collections.OrderedDict() - for k in ['id', 'comment', 'replacement']: + for k in Retirement.__annotations__: obj[k] = repl[k] assert obj[k] if type_ not in self.retirements: self.retirements[type_] = [] + # It feels a bit hacky to mutate a cached property - but it works. self.retirements[type_].append(obj) jsonlib.dump(self.retirements, self.data_path('retired.json'), indent=2) @functools.cached_property - def bibliography(self) -> typing.Dict[str, Source]: + def bibliography(self) -> dict[str, Source]: """ :returns: `dict` mapping BibTeX IDs to `Reference` instances. """ return to_dict( - Source.from_entry(key, entry) for key, entry in pybtex.database.parse_string( + Source.from_entry(key, entry) for key, entry in simplepybtex.database.parse_string( self.bibfile.read_text(encoding='utf8'), bib_format='bibtex').entries.items()) @functools.cached_property - def conceptsets(self) -> typing.Dict[str, Conceptset]: + def conceptsets(self) -> dict[str, Conceptset]: """ :returns: `dict` mapping ConceptSet IDs to `Conceptset` instances. """ return to_dict( - Conceptset(api=self, **lowercase(d)) + Conceptset(_api=self, **lowercase(d)) for d in read_dicts(self.data_path('concepticon.tsv'))) @functools.cached_property - def conceptlists_dicts(self): + def conceptlists_dicts(self) -> list[dict[str, Union[str, int, float]]]: + """Read items in conceptlists.tsv into dicts.""" return read_dicts(self.data_path('conceptlists.tsv')) @functools.cached_property - def conceptlists(self): + def conceptlists(self) -> dict[str, Conceptlist]: """ :returns: `dict` mapping ConceptList IDs to `Conceptlist` instances. .. note:: Individual concepts can be accessed via `Conceptlist.concepts`. """ - return to_dict(Conceptlist(api=self, **lowercase(d)) for d in self.conceptlists_dicts) + return to_dict(Conceptlist(_api=self, **lowercase(d)) for d in self.conceptlists_dicts) @functools.cached_property - def relations(self): + def relations(self) -> ConceptRelations: """ :returns: `dict` mapping concept sets to related concepts. """ return ConceptRelations(self.data_path('conceptrelations.tsv')) @functools.cached_property - def multirelations(self): + def multirelations(self) -> ConceptRelations: """ :returns: `dict` mapping concept sets to related concepts. """ return ConceptRelations(self.data_path('conceptrelations.tsv'), multiple=True) @functools.cached_property - def frequencies(self): - D = collections.defaultdict(int) + def frequencies(self) -> collections.Counter: + """Maps concepticon conceptset glosses to the number of concepts linked to them.""" + d = collections.Counter() for cl in self.conceptlists.values(): - for concept in cl.concepts.values(): - if concept.concepticon_id: - D[concept.concepticon_gloss] += 1 - return D + d.update( + concept.concepticon_gloss for concept in cl.concepts.values() + if concept.concepticon_id) + return d - def _get_map_for_language(self, language, otherlist=None): + def _get_map_for_language( + self, + language, + otherlist: Optional[PathType] = None, + ) -> list[Union[tuple[str, str], tuple[str, str, str]]]: if (language, otherlist) not in self._to_mapping: if otherlist is not None: to = [] for item in read_dicts(otherlist): to.append((item['ID'], item.get('GLOSS', item.get('ENGLISH')))) else: - mapfile = self.repos / 'mappings' / 'map-{0}.tsv'.format(language) + mapfile = self.repos / 'mappings' / f'map-{language}.tsv' to = [(cs['ID'], cs['GLOSS']) for cs in read_dicts(mapfile)] self._to_mapping[(language, otherlist)] = to return self._to_mapping[(language, otherlist)] - def map(self, + def map(self, # pylint: disable=R0913,R0917 clist, otherlist=None, out=None, - full_search=False, - similarity_level=5, - language='en', - skip_multiple=False): - assert clist.exists(), "File %s does not exist" % clist - from_ = read_dicts(clist) - - to = self._get_map_for_language(language, otherlist) - gloss = { - 'fr': 'FRENCH', - 'en': 'ENGLISH', - 'es': 'SPANISH', - 'de': 'GERMAN', - 'pl': 'POLISH', - 'lt': 'LATIN', - 'zh': 'CHINESE', - 'pt': 'PORTUGUESE', - 'ru': 'RUSSIAN', - 'it': 'ITALIAN', - }.get(language, 'GLOSS') - cmap = (concept_map if full_search else concept_map2)( - [i.get('GLOSS', i.get(gloss)) for i in from_], - [i[1] for i in to], - similarity_level=similarity_level, - language=language + full_search: bool = False, + similarity_level: Optional[Union[Similarity, int]] = Similarity.SAME_LONGEST, + language: Optional[Union[GlossLanguage, str]] = GlossLanguage.ENGLISH, + skip_multiple: bool = False): + """Map items in a conceptlist to concepticon.""" + map_list( + clist, + self._get_map_for_language(language, otherlist), + out=out, + options=MapOptions( + full_search=full_search, + similarity_level=Similarity.from_int(similarity_level), + language=GlossLanguage.from_string(language), + skip_multiple=skip_multiple, + ), ) - good_matches = 0 - with UnicodeWriter(out) as writer: - writer.writerow( - list(from_[0].keys()) - + ['CONCEPTICON_ID', 'CONCEPTICON_GLOSS', 'SIMILARITY']) - for i, item in enumerate(from_): - row = list(item.values()) - matches, sim = cmap.get(i, ([], 10)) - if sim <= similarity_level: - good_matches += 1 - if not matches: - writer.writerow(row + ['', '???', '']) - elif len(matches) == 1: - row.extend([ - to[matches[0]][0], to[matches[0]][1].split('///')[0], sim]) - writer.writerow(row) - else: - assert not full_search - # we need a list to retain the order by frequency - visited = [] - for j in matches: - gls, cid = to[j][0], to[j][1].split('///')[0] - if (gls, cid) not in visited: - visited += [(gls, cid)] - if len(visited) > 1: - if not skip_multiple: - writer.writeblock( - row + [gls, cid, sim] for gls, cid in visited) - else: - row.extend([visited[0][0], visited[0][1], sim]) - writer.writerow(row) - writer.writerow( - ['#', - '{0}/{1}'.format(good_matches, len(from_)), - '{0:.0f}%'.format(100 * good_matches / len(from_))] - + (len(from_[0]) - 1) * ['']) - - if out is None: - print(writer.read().decode('utf-8')) - - def lookup( + + def lookup( # pylint: disable=R0913,R0917 self, - entries, - full_search=False, + entries: Iterable[str], + full_search: bool = False, similarity_level=5, language='en', mincsid=None, @@ -248,209 +225,259 @@ def lookup( :returns: `generator` of tuples (searchterm, concepticon_id, concepticon_gloss, similarity). """ if to is None: - to = [ - t for t in self._get_map_for_language(language, None) - if mincsid is None or (int(t[0]) >= mincsid)] - tox = [i[1] for i in to] - cfunc = concept_map2 if full_search else concept_map - cmap = cfunc( + to = [t for t in self._get_map_for_language(language, None) + if mincsid is None or (int(t[0]) >= mincsid)] + cmap: MappingDict = (concept_map2 if full_search else concept_map)( entries, - tox, + [i[1] for i in to], similarity_level=similarity_level, language=language) for i, e in enumerate(entries): - match, simil = cmap.get(i, [[], 100]) - yield set((e, to[m][0], to[m][1].split("///")[0], simil) for m in match) + mapping = cmap.get_mapping(i) + yield { + (e, to[m][0], to[m][1].split("///")[0], mapping.similarity) + for m in mapping.to_keys} - def check(self, *clids): - errors = [] + def check(self, *clids) -> bool: + """Returns the success of the checks.""" assert self.retirements - print('testing {0} concept lists'.format(len(clids) if clids else len(self.conceptlists))) + print(f'testing {len(clids) if clids else len(self.conceptlists)} concept lists') - def _msg(type_, msg, name, line): # pragma: no cover - if line: - line = ':%s' % line - return '%s:%s%s: %s' % (type_.upper(), name, line or '', msg) + with Checker(self) as checker: + checker.check_conceptlists(clids) + if checker.errors: # pragma: no cover + return False # Exit early in case of structural errors. - def error(msg, name, line=0): # pragma: no cover - errors.append((msg, name, line)) + # Make sure all language-specific mappings are well specified + checker.check_language_mappings() - def warning(msg, name, line=0): # pragma: no cover - warnings.warn(_msg('warning', msg, name, line), Warning) + # We collect all cite keys used to refer to references. + all_refs: set[str] = set() + refs_in_bib: set[str] = set(ref for ref in self.bibliography) - for i, d in enumerate(self.conceptlists_dicts, start=1): - if (not clids) or d['ID'] in clids: - try: - Conceptlist(api=self, **lowercase(d)) - except ValueError as e: # pragma: no cover - error(str(e), 'conceptlists.tsv', i) + # Make sure only records in the BibTeX file references.bib are referenced by + # concept lists. + for i, cl in enumerate(self.conceptlists.values()): + if not (clids and cl.id not in clids): + checker.check_refs(cl, i, refs_in_bib, all_refs) + + all_refs.add('List2016a') + + if not clids: + # Only report unused references if we check all concept lists! + for ref in refs_in_bib - all_refs: # pragma: no cover + checker.error(f'unused bibtex record: {ref}', 'references.bib') + + checker.check_relations() + + for fname in self.data_path('conceptlists').glob('*.tsv'): + if clids and fname.stem not in clids: + continue # pragma: no cover + if fname.stem not in self.conceptlists: # pragma: no cover + checker.error(f'conceptlist missing in conceptlists.tsv: {fname.name}') + + broken_cls = [] + + for cl in self.conceptlists.values(): + if clids and cl.id not in clids: + continue # pragma: no cover + + # Check consistency between the csvw metadata and the column names in the list. + checker.check_schema(cl, broken_cls) + + checker.check_conceptsets(broken_cls) - def exit(): - for msg, name, line in errors: - print(_msg('error', msg, name, line)) - return not bool(errors) + return not bool(checker.errors) - if errors: # pragma: no cover - return exit() # Exit early in case of structural errors. - REF_WITHOUT_LABEL_PATTERN = re.compile(r'[^]]\(:(ref|bib):[A-Za-z0-9\-]+\)') - REF_WITHOUT_LINK_PATTERN = re.compile('[^(]:(ref|bib):[A-Za-z0-9-]+') +@dataclasses.dataclass +class Checker: + """Implements consistency checks for the concepticon data.""" + api: Concepticon + errors: list = dataclasses.field(default_factory=list) + ref_without_label_pattern: re.Pattern = re.compile(r'[^]]\(:(ref|bib):[A-Za-z0-9\-]+\)') + ref_without_link_pattern: re.Pattern = re.compile('[^(]:(ref|bib):[A-Za-z0-9-]+') - # Make sure all language-specific mappings are well specified + concepticon_ids: set[str] = dataclasses.field(default_factory=set) + concepticon_glosses: set[str] = dataclasses.field(default_factory=set) + + def __post_init__(self): + self.concepticon_ids = set(self.api.conceptsets.keys()) + self.concepticon_glosses = set(cs.gloss for cs in self.api.conceptsets.values()) + + @staticmethod + def _msg(type_, msg, name, line): # pragma: no cover # pylint: disable=C0116 + if line: + line = f':{line}' + return f"{type_.upper()}:{name}{line or ''}: {msg}" + + def error(self, msg, name=None, line=0): # pragma: no cover # pylint: disable=C0116 + self.errors.append((msg, name, line)) + + @classmethod + def warning(cls, msg, name, line=0): # pragma: no cover # pylint: disable=C0116 + warnings.warn(cls._msg('warning', msg, name, line), Warning) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + for msg, name, line in self.errors: + print(self._msg('error', msg, name, line)) + + def check_conceptlists(self, clids: Optional[Union[Container[str], Iterable[str]]] = None): + """Make sure Conceptlist objects can be instantiated.""" + for i, d in enumerate(self.api.conceptlists_dicts, start=1): + if (not clids) or d['ID'] in clids: + try: + Conceptlist(_api=self, **lowercase(d)) + except ValueError as e: # pragma: no cover + self.error(str(e), 'conceptlists.tsv', i) + + def check_relations(self): + """Make sure relations only reference valid objects.""" + for i, rel in enumerate(self.api.relations.raw): + for attr, set_ in [ + ('SOURCE', self.concepticon_ids), + ('TARGET', self.concepticon_ids), + ('SOURCE_GLOSS', self.concepticon_glosses), + ('TARGET_GLOSS', self.concepticon_glosses), + ]: + if rel[attr] not in set_: # pragma: no cover + self.error(f'invalid {attr}: {rel[attr]}', 'conceptrelations', i + 2) + + def check_language_mappings(self): + """Make sure only valid gloss languages are specified.""" iso_langs = [ - lang.iso2 for lang in self.vocabularies['COLUMN_TYPES'].values() + lang.iso2 for lang in self.api.vocabularies['COLUMN_TYPES'].values() if isinstance(lang, Languoid) and lang.iso2] if len(iso_langs) != len(set(iso_langs)): - error( - 'Duplicate ISO codes: {}'.format(collections.Counter(iso_langs).most_common(1)), + self.error( + f'Duplicate ISO codes: {collections.Counter(iso_langs).most_common(1)}', 'concepticon.json') - assert set(p.stem.split('-')[1] for p in self.path('mappings').glob('map-*.tsv'))\ + assert set(p.stem.split('-')[1] for p in self.api.path('mappings').glob('map-*.tsv')) \ .issubset(iso_langs) - # We collect all cite keys used to refer to references. - all_refs = set() - refs_in_bib = set(ref for ref in self.bibliography) - - # Make sure only records in the BibTeX file references.bib are referenced by - # concept lists. - for i, cl in enumerate(self.conceptlists.values()): - if clids and cl.id not in clids: - continue # pragma: no cover - fl = ('conceptlists.tsv', i + 2) - for ref in re.findall(BIB_PATTERN, cl.note) + cl.refs: - if ref not in refs_in_bib: - error('cited bibtex record not in bib: {0}'.format(ref), *fl) - else: - all_refs.add(ref) - - for m in REF_WITHOUT_LABEL_PATTERN.finditer(cl.note): - error('link without label: {0}'.format(m.string[m.start():m.end()]), *fl) - - for m in REF_WITHOUT_LINK_PATTERN.finditer(cl.note): # pragma: no cover - error('reference not in link: {0}'.format(m.string[m.start():m.end()]), *fl) - - for m in REF_PATTERN.finditer(cl.note): - if m.group('id') not in self.conceptlists: # pragma: no cover - error('invalid conceptlist ref: {0}'.format(m.group('id')), *fl) - - # make also sure that all sources are accompanied by a PDF, but only write a - # warning if this is not the case - for ref in cl.pdf: - if ref not in self.sources: # pragma: no cover - warning('no PDF found for {0}'.format(ref), 'conceptlists.tsv') - all_refs.add('List2016a') - - if not clids: - # Only report unused references if we check all concept lists! - for ref in refs_in_bib - all_refs: # pragma: no cover - error('unused bibtex record: {0}'.format(ref), 'references.bib') - - ref_cols = { - 'concepticon_id': set(self.conceptsets.keys()), - 'concepticon_gloss': set(cs.gloss for cs in self.conceptsets.values()), - } - - for i, rel in enumerate(self.relations.raw): - for attr, type_ in [ - ('SOURCE', 'concepticon_id'), - ('TARGET', 'concepticon_id'), - ('SOURCE_GLOSS', 'concepticon_gloss'), - ('TARGET_GLOSS', 'concepticon_gloss'), - ]: - if rel[attr] not in ref_cols[type_]: # pragma: no cover - error( - 'invalid {0}: {1}'.format(attr, rel[attr]), 'conceptrelations', i + 2) - - for fname in self.data_path('conceptlists').glob('*.tsv'): - if clids and fname.stem not in clids: - continue # pragma: no cover - if fname.stem not in self.conceptlists: # pragma: no cover - error( - 'conceptlist missing in conceptlists.tsv: {0}'.format(fname.name), '') - - broken_cls = [] + def check_refs(self, cl: Conceptlist, i: int, refs_in_bib: set[str], all_refs: set[str]): + """ + Check items referenced in a conceptlists note or refs field. + """ + err = functools.partial(self.error, name='conceptlists.tsv', line=i + 2) - for cl in self.conceptlists.values(): - if clids and cl.id not in clids: - continue # pragma: no cover - # - # Check consistency between the csvw metadata and the column names in the list. - # - missing_in_md, missing_in_list = [], [] - cols_in_md = [] - for col in cl.metadata.tableSchema.columns: - cnames = [] # all names or aliases csvw will recognize for this column - if col.name in cols_in_md: # pragma: no cover - error('Duplicate name ot title in table schema: {0}'.format(col.name), cl.id) - cnames.append(col.name) - if col.titles: - c = col.titles.getfirst() - if c in cols_in_md: # pragma: no cover - error('Duplicate name or title in table schema: {0}'.format(c), cl.id) - cnames.append(c) - cols_in_md.extend(cnames) - if not any(name in cl.cols_in_list for name in cnames): - # Neither name nor title of the column is in the actual list header. - missing_in_list.append(col.name) - for col in cl.cols_in_list: - if col not in cols_in_md: - missing_in_md.append(col) - - for col in missing_in_list: - error('Column in metadata but missing in list: {0}'.format(col), cl.id) - for col in missing_in_md: - error('Column in list but missing in metadata: {0}'.format(col), cl.id) - - try: - # Now check individual concepts: - for i, concept in enumerate(cl.concepts.values()): - if not concept.id.startswith(cl.id): # pragma: no cover - error( - 'concept ID does not match concept list ID pattern %s' % concept.id, - cl.id) - - if concept.concepticon_id: - cs = self.conceptsets.get(concept.concepticon_id) - if not cs: # pragma: no cover - error('invalid conceptset ID %s' % concept.concepticon_id, cl.id) - elif cs.gloss != concept.concepticon_gloss: # pragma: no cover - error( - 'wrong conceptset GLOSS for ID {0}: {1} -> {2}'.format( - cs.id, concept.concepticon_gloss, cs.gloss), - cl.id) - - if i == 0: # pragma: no cover - for lg in cl.source_language: - if lg.lower() not in concept.cols: - error('missing source language col %s' % lg.upper(), cl.id) - - for lg in cl.source_language: # pragma: no cover - if not (concept.attributes.get(lg.lower()) - or getattr(concept, lg.lower(), None) - or (lg.lower() == 'english' and not concept.gloss)): - error('missing source language translation %s' % lg, cl.id, i + 2) - for attr, values in ref_cols.items(): - val = getattr(concept, attr) - if val: - # check that there are not leading and trailing spaces - # (while computationally expensive, this helps catch really - # hard to find typos) - if val != val.strip(): # pragma: no cover - error("leading or trailing spaces in value for %s: '%s'" % - (attr, val), cl.id, i + 2) - - if val not in values: # pragma: no cover - error('invalid value for %s: %s' % (attr, val), cl.id, i + 2) - except TypeError as e: # pragma: no cover - broken_cls.append(cl.id) - error(str(e), cl.id) - raise - - sameas = {} - glosses = set() - for cs in self.conceptsets.values(): + for ref in re.findall(BIB_PATTERN, cl.note) + cl.refs: + if ref not in refs_in_bib: + err(f'cited bibtex record not in bib: {ref}') + else: + all_refs.add(ref) + + for m in self.ref_without_label_pattern.finditer(cl.note): + err(f'link without label: {m.string[m.start():m.end()]}') + + for m in self.ref_without_link_pattern.finditer(cl.note): # pragma: no cover + err(f'reference not in link: {m.string[m.start():m.end()]}') + + for m in REF_PATTERN.finditer(cl.note): + if m.group('id') not in self.api.conceptlists: # pragma: no cover + err(f'invalid conceptlist ref: {m.group("id")}') + + # make also sure that all sources are accompanied by a PDF, but only write a + # warning if this is not the case + for ref in cl.pdf: + if ref not in self.api.sources: # pragma: no cover + self.warning(f'no PDF found for {ref}', 'conceptlists.tsv') + + def _check_cols_in_md(self, cl, err): + cols_in_md = {} + for col in cl.metadata.tableSchema.columns: + cnames = [] # all names or aliases csvw will recognize for this column + if col.name in cols_in_md: # pragma: no cover + err(f'Duplicate name ot title in table schema: {col.name}') + cnames.append(col.name) + if col.titles: + c = col.titles.getfirst() + if c in cols_in_md: # pragma: no cover + err(f'Duplicate name or title in table schema: {c}') + cnames.append(c) + cols_in_md[col.name] = cnames + return cols_in_md + + def _check_concept( # pylint: disable=R0912 + self, + concept: Concept, + i: int, + cl: Conceptlist, + err): + if not concept.id.startswith(cl.id): # pragma: no cover + err(f'concept ID does not match concept list ID pattern {concept.id}') + + if concept.concepticon_id: + cs = self.api.conceptsets.get(concept.concepticon_id) + if not cs: # pragma: no cover + err(f'invalid conceptset ID {concept.concepticon_id}') + elif cs.gloss != concept.concepticon_gloss: # pragma: no cover + err(f'wrong conceptset GLOSS for ID ' + f'{cs.id}: {concept.concepticon_gloss} -> {cs.gloss}') + + if i == 0: # pragma: no cover + for lg in cl.source_language: + if lg.lower() not in concept.cols: + err(f'missing source language col {lg.upper()}') + + for lg in cl.source_language: # pragma: no cover + if not any(( + concept.attributes.get(lg.lower()), + getattr(concept, lg.lower(), None), + lg.lower() == 'english' and not concept.gloss)): + err(f'missing source language translation {lg}', line=i + 2) + + for attr, values in [ + ('concepticon_id', self.concepticon_ids), + ('concepticon_gloss', self.concepticon_glosses), + ]: + val = getattr(concept, attr) + if val: + # check that there are no leading and trailing spaces (while computationally + # expensive, this helps catch really hard to find typos) + if val != val.strip(): # pragma: no cover + err(f"leading or trailing spaces in value for {attr}: '{val}'", line=i + 2) + + if val not in values: # pragma: no cover + err(f'invalid value for {attr}: {val}', line=i + 2) + + def check_schema(self, cl: Conceptlist, broken_cls): + """Check consistency between the csvw metadata and the column names in the list.""" + err = functools.partial(self.error, name=cl.id) + + cols_in_md = self._check_cols_in_md(cl, err) + for cname, cnames in cols_in_md.items(): + if not any(name in cl.cols_in_list for name in cnames): + # Neither name nor title of the column is in the actual list header. + err(f'Column in metadata but missing in list: {cname}') + + for col in cl.cols_in_list: + if not any(col in cnames for cnames in cols_in_md.values()): + err(f'Column in list but missing in metadata: {col}') + + try: + # Now check individual concepts: + for i, concept in enumerate(cl.concepts.values()): + self._check_concept(concept, i, cl, err) + except TypeError as e: # pragma: no cover + broken_cls.append(cl.id) + self.error(str(e), cl.id) + raise + + def check_conceptsets(self, broken_cls): + """ + Determine deprecated conceptsets and make sure they are not referenced anymore. + """ + # We partition conceptsets via the "sameas" relation ... + sameas: dict[str, set[str]] = {} + # ... and also check for duplicate glosses. + glosses: set[str] = set() + for cs in self.api.conceptsets.values(): if cs.gloss in glosses: # pragma: no cover - error('duplicate conceptset gloss: {0}'.format(cs.gloss), cs.id) + self.error(f'duplicate conceptset gloss: {cs.gloss}', cs.id) glosses.add(cs.gloss) for target, rel in cs.relations.items(): if rel == 'sameas': @@ -461,19 +488,18 @@ def exit(): else: sameas[cs.gloss] = {cs.id, target} - deprecated = {} + # Conceptsets marked as "sameas" some other conceptset are considered deprecated + # (except for the "earliest" conceptset among the same). + deprecated: dict[str, str] = {} for s in sameas.values(): - csids = sorted(s, key=lambda j: int(j)) + csids = sorted(s, key=int) for csid in csids[1:]: assert csid not in deprecated deprecated[csid] = csids[0] - for cl in self.conceptlists.values(): - if cl.id in broken_cls: - continue # pragma: no cover + # Make sure no deprecated conceptsets are referenced in conceptlists. + for cl in (cl_ for cl_ in self.api.conceptlists.values() if cl_.id not in broken_cls): for concept in cl.concepts.values(): if concept.concepticon_id in deprecated: # pragma: no cover - error('deprecated concept set {0} linked for {1}'.format( - concept.concepticon_id, concept.id), cl.id) - - return exit() + self.error(f'deprecated concept set {concept.concepticon_id} linked ' + f'for {concept.id}', cl.id) diff --git a/src/pyconcepticon/cli_util.py b/src/pyconcepticon/cli_util.py index 5a91aab..c454bd3 100644 --- a/src/pyconcepticon/cli_util.py +++ b/src/pyconcepticon/cli_util.py @@ -1,16 +1,23 @@ +""" +Helpers called from concepticon commands. +""" import pathlib +import argparse +from typing import Union from clldutils.clilib import ParserError from pyconcepticon.models import Conceptlist -def readme(outdir, text): +def readme(outdir, text: Union[str, list[str]]): + """Write text to a README in outdir.""" outdir.joinpath("README.md").write_text( "\n".join(text) if isinstance(text, list) else text, encoding="utf8") def add_search(parser): + """Add options to specify a concept mapping strategy.""" parser.add_argument( '--full-search', help="select between approximate search (default) and full search", @@ -24,7 +31,8 @@ def add_search(parser): def add_conceptlist(parser, multiple=False): - kw = dict( + """Add an option to specify one or more conceptlists.""" + kw = dict( # pylint: disable=R1735 metavar='CONCEPTLIST', help='Path to (or ID of) concept list in TSV format', type=pathlib.Path) @@ -33,7 +41,11 @@ def add_conceptlist(parser, multiple=False): parser.add_argument('conceptlist', **kw) -def get_conceptlist(args, path_only=False): +def get_conceptlist( + args: argparse.Namespace, + path_only: bool = False, +) -> Union[Union[pathlib.Path, Conceptlist], list[Union[pathlib.Path, Conceptlist]]]: + """Get conceptlist(s) as specified in args.""" if isinstance(args.conceptlist, list): return [_get_conceptlist(cl, args, path_only=path_only) for cl in args.conceptlist] return _get_conceptlist(args.conceptlist, args, path_only=path_only) @@ -58,4 +70,4 @@ def _get_conceptlist(cl, args, path_only=False): if cl.name in args.repos.conceptlists: return args.repos.conceptlists[cl.name] - raise ParserError("no conceptlist %s found" % cl) + raise ParserError(f"no conceptlist {cl} found") diff --git a/src/pyconcepticon/commands/attributes.py b/src/pyconcepticon/commands/attributes.py index ba0f119..e3f0511 100644 --- a/src/pyconcepticon/commands/attributes.py +++ b/src/pyconcepticon/commands/attributes.py @@ -10,7 +10,7 @@ from clldutils.clilib import Table, add_format -def register(parser): +def register(parser): # pylint: disable=C0116 add_format(parser, default='simple') parser.add_argument( '--min-occurs', @@ -20,7 +20,7 @@ def register(parser): ) -def run(args): +def run(args): # pylint: disable=C0116 attrs = collections.Counter() for cl in args.repos.conceptlists.values(): attrs.update(cl.attributes) diff --git a/src/pyconcepticon/commands/check.py b/src/pyconcepticon/commands/check.py index 518171f..795c8d4 100644 --- a/src/pyconcepticon/commands/check.py +++ b/src/pyconcepticon/commands/check.py @@ -9,7 +9,12 @@ - NUMBER - CONCEPTICON_GLOSS """ +import json +import argparse import collections +from collections.abc import Generator +import dataclasses +from typing import Optional, Any import termcolor from clldutils.clilib import Table, add_format @@ -18,10 +23,10 @@ from pyconcepticon.util import read_dicts, CS_ID, CS_GLOSS from pyconcepticon.models import CONCEPT_NETWORK_COLUMNS -import json +ItemListType = list[tuple[int, dict[str, str]]] -def register(parser): +def register(parser): # pylint: disable=C0116 add_conceptlist(parser, multiple=True) add_format(parser, default='simple') parser.add_argument( @@ -31,18 +36,18 @@ def register(parser): default=False) -def run(args): +def run(args): # pylint: disable=C0116 for cl in get_conceptlist(args, path_only=True): print(termcolor.colored(cl, attrs=['bold', 'underline'])) - items = list(enumerate(read_dicts(cl), start=2)) + items: ItemListType = list(enumerate(read_dicts(cl), start=2)) for check in CHECKS: - print(termcolor.colored('Check: {0}'.format(check.__name__), attrs=['bold'])) + print(termcolor.colored(f'Check: {check.__name__}', attrs=['bold'])) if args.verbose and check.__doc__: print(check.__doc__) # pragma: no cover try: check(items, args) - except Exception as e: # pragma: no cover - print(termcolor.colored('{0}: {1}'.format(e.__class__.__name__, e), color='red')) + except Exception as e: # pragma: no cover # pylint: disable=W0718 + print(termcolor.colored(f'{e.__class__.__name__}: {e}', color='red')) print() @@ -50,15 +55,16 @@ def run(args): # helpers # class Result(Table): + """Results, i.e. error reporter.""" def __exit__(self, exc_type, *args): - if self: + if self: # There are table rows, so render them. super().__exit__(exc_type, *args) else: if not exc_type: print(termcolor.colored('OK', color='green')) -def id_number_gloss(item): +def id_number_gloss(item): # pylint: disable=C0116 return [item.get('ID', ''), item.get('NUMBER', ''), item.get('GLOSS', item.get('ENGLISH', ''))] @@ -94,7 +100,7 @@ def valid_concepticon_gloss(items, args): t.append([cgloss, line] + id_number_gloss(item)) # pragma: no cover -def valid_concepticon_id(items, args): +def valid_concepticon_id(items, args): # pylint: disable=C0116 valid = set(cs.id for cs in args.repos.conceptsets.values() if not cs.replacement_id) with Result( args, 'CONCEPTICON_ID', 'LINE_NO', 'ID', 'NUMBER', 'GLOSS') as t: @@ -110,7 +116,7 @@ def _unique(items, args, *cols): for line, item in items: col = [c for c in cols if c in item] if not col: # pragma: no cover - print(termcolor.colored('no column {0}'.format(' or '.join(cols)), color='red')) + print(termcolor.colored(f'no column {" or ".join(cols)}', color='red')) return col = col[0] clashes[item[col]].append([line] + id_number_gloss(item)) @@ -121,71 +127,94 @@ def _unique(items, args, *cols): t.append([val] + item) -def unique_concepticon_gloss(items, args): +def unique_concepticon_gloss(items, args): # pylint: disable=C0116 _unique(items, args, CS_ID, CS_GLOSS) -def unique_id(items, args): +def unique_id(items, args): # pylint: disable=C0116 _unique(items, args, 'ID') -def unique_number(items, args): +def unique_number(items, args): # pylint: disable=C0116 _unique(items, args, 'NUMBER') -def good_graph(items, args): - cids = { - "ID": {b["ID"] for a, b in items}, - "NAME": {b.get("ENGLISH", b.get("GLOSS")) for a, b in items}} - # name suffixes for columns - all_problems = collections.OrderedDict({ - "ID": {name: [] for name in CONCEPT_NETWORK_COLUMNS}, - "NAME": {name: [] for name in CONCEPT_NETWORK_COLUMNS} - }) +@dataclasses.dataclass(frozen=True) +class NetworkValue: + """The value of a network column in a conceptlist with metadata.""" + line_no: int + row: dict[str, Any] + network_col: str + nodes: list[dict[str, Any]] + +def _iter_nodelists(items, cols=CONCEPT_NETWORK_COLUMNS) -> Generator[NetworkValue, None, None]: for cid, concept in items: - for name in CONCEPT_NETWORK_COLUMNS: - nodes_ = concept.get(name) - if nodes_: - nodes = json.loads(nodes_) - for node in nodes: - for itm in ["ID", "NAME"]: - if not node.get(itm) or not node.get(itm) in cids[itm]: - all_problems[itm][name].append([cid] + id_number_gloss(concept)) - - graph_problems = [] - # assemble edges and make sure they make sense - edges, id2num = collections.defaultdict(dict), {} - for i, (cid, concept) in enumerate(items): + for name in cols: + nodes = concept.get(name) + if nodes: + yield NetworkValue(cid, concept, name, json.loads(nodes)) + + +def _iter_duplicate_edges( + items +) -> Generator[tuple[tuple[str, str], dict[str, Any], dict[str, Any]], None, None]: + edges = collections.defaultdict(dict) + for nv in _iter_nodelists(items, cols=['LINKED_CONCEPTS']): # LINKED_CONCEPTS are considered undirected. They may be specified twice - i.e. in both # directions - but then they must carry the same exact attributes. - nodes_ = concept.get("LINKED_CONCEPTS") - id2num[concept["ID"]] = (concept["NUMBER"], i + 2) - if nodes_: - nodes = json.loads(nodes_) - for node in nodes: - for k, v in node.items(): - if isinstance(v, (float, int)): - edges[concept["ID"], node["ID"]][k] = v - for nA, nB in list(edges): - if (nB, nA) in edges: # Check attributes: - for attr in edges[nA, nB]: - if edges[nA, nB][attr] != edges[nB, nA].get(attr): - graph_problems.append([ - "different values for {} / {} in {}".format(nA, nB, attr), - id2num[nA][1], nA, id2num[nA][0]]) + for node in nv.nodes: + for k, v in node.items(): + if isinstance(v, (float, int)): + edges[nv.row["ID"], node["ID"]][k] = v + + keys = list(edges) + for a, b in keys: + if (a, b) in edges and (b, a) in edges: + yield (a, b), edges.pop((a, b)), edges.pop((b, a)) + + +@dataclasses.dataclass(frozen=True) +class Problem: + """Error reporting for the concept network check.""" + comment: str + line_no: int + id: str + number: str + gloss: Optional[str] = None + + +def good_graph(items: ItemListType, args: argparse.Namespace): + """Check node dicts of a concept networks.""" + cids = { + "ID": {b["ID"] for _, b in items}, + "NAME": {b.get("ENGLISH", b.get("GLOSS")) for _, b in items}} + id2num = {concept['ID']: (concept['NUMBER'], lid) for lid, concept in items} + problems: list[Problem] = [] + + for nv in _iter_nodelists(items): + for node in nv.nodes: + for itm in ["ID", "NAME"]: + if not node.get(itm) or not node.get(itm) in cids[itm]: + problems.append( + Problem( + f"Attribute {itm} in column {nv.network_col} not in concept list", + nv.line_no, + *id_number_gloss(nv.row))) + + for (n_a, n_b), props_a, props_b in _iter_duplicate_edges(items): + for attr in props_a: + if props_a[attr] != props_b.get(attr): + problems.append( + Problem( + f"different values for {n_a} / {n_b} in {attr}", + id2num[n_a][1], + n_a, + id2num[n_a][0])) with Result(args, "good graph", 'LINE_NO', 'ID', 'NUMBER', 'GLOSS') as t: - for item, problems in all_problems.items(): - for name in CONCEPT_NETWORK_COLUMNS: - for problem in problems[name]: - problem.insert( - 0, - "Attribute {} in column {}_CONCEPTS does not occur in concept list".format( - item, name)) - t.append(problem) - for problem in graph_problems: - t.append(problem) + for problem in problems: + t.append(dataclasses.astuple(problem)) CHECKS = [ diff --git a/src/pyconcepticon/commands/citation.py b/src/pyconcepticon/commands/citation.py index 188d270..e58889f 100644 --- a/src/pyconcepticon/commands/citation.py +++ b/src/pyconcepticon/commands/citation.py @@ -10,18 +10,18 @@ from nameparser import HumanName -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument('--version', default=None) parser.add_argument('--year', default=date.today().year, type=int) -def zenodo_json(citation, version, editors): +def zenodo_json(citation, version, editors): # pylint: disable=C0116 return collections.OrderedDict([ ("upload_type", "dataset"), - ("description", "

{}

".format(html.escape(citation))), + ("description", f"

{html.escape(citation)}

"), ("alternate_identifiers", [{"scheme": "url", "identifier": "https://concepticon.clld.org"}]), - ("title", "CLLD Concepticon {}".format(version.replace('v', ''))), + ("title", f"CLLD Concepticon {version.replace('v', '')}"), ("access_right", "open"), ("license", {"id": "CC-BY-4.0"}), ("keywords", ["linguistics"]), @@ -30,7 +30,7 @@ def zenodo_json(citation, version, editors): ]) -def run(args): +def run(args): # pylint: disable=C0116 if not args.version: # pragma: no cover args.version = git_describe(args.repos.repos) if args.version.startswith('v'): @@ -41,12 +41,11 @@ def run(args): editor_names = [] for e in current_editors: name = HumanName(e.name) - editor_names.append('{0.last}, {0.first} {0.middle}'.format(name).strip()) + editor_names.append(f'{name.last}, {name.first} {name.middle}'.strip()) editor_names = ' & '.join(editor_names) - res = "{0} (eds.) {1.year}. {2.title} {1.version}. {2.description}. "\ - "{2.publisher.place}: {2.publisher.name}. Available online at {2.url}".format( - editor_names, args, args.repos.dataset_metadata, - ) + md = args.repos.dataset_metadata + res = (f"{editor_names} (eds.) {args.year}. {md.title} {args.version}. {md.description}. " + f"{md.publisher.place}: {md.publisher.name}. Available online at {md.url}") print(res) dump( zenodo_json(res, args.version, current_editors), diff --git a/src/pyconcepticon/commands/create_metadata.py b/src/pyconcepticon/commands/create_metadata.py index 084efd9..f19de36 100644 --- a/src/pyconcepticon/commands/create_metadata.py +++ b/src/pyconcepticon/commands/create_metadata.py @@ -4,7 +4,7 @@ from csvw import Column -def run(args): +def run(args): # pylint: disable=C0116 for cl in args.repos.conceptlists.values(): mdpath = cl.path.parent.joinpath(cl.path.name + '-metadata.json') if not mdpath.exists(): @@ -20,6 +20,6 @@ def run(args): for col in cl.cols_in_list: if col not in cols_in_md: cl.metadata.tableSchema.columns.append( - Column.fromvalue(dict(name=col, datatype='string'))) + Column.fromvalue({'name': col, 'datatype': 'string'})) cl.tg.to_file(mdpath) diff --git a/src/pyconcepticon/commands/dump.py b/src/pyconcepticon/commands/dump.py index cef7566..47f3423 100644 --- a/src/pyconcepticon/commands/dump.py +++ b/src/pyconcepticon/commands/dump.py @@ -5,10 +5,10 @@ import zipfile import collections -from csvw.dsv import UnicodeDictReader +from pyconcepticon.util import reader -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument( "--destination", default=None, @@ -16,9 +16,8 @@ def register(parser): ) -def run(args): - paths = {p.stem.split('-')[1]: p for p in args.repos.path( - 'mappings').glob('map-*.tsv')} +def run(args): # pylint: disable=C0116 + paths = {p.stem.split('-')[1]: p for p in args.repos.path('mappings').glob('map-*.tsv')} translate = { 'Person/Thing': 'noun', 'Other': 'other', @@ -30,28 +29,20 @@ def run(args): mappings = {} for language, path in paths.items(): mappings[language] = collections.defaultdict(set) - with UnicodeDictReader(path, delimiter='\t') as reader: - for line in reader: - gloss = line['GLOSS'].split('///')[1] - oc = translate.get( - args.repos.conceptsets[line['ID']].ontological_category, - 'other') - cgl = args.repos.conceptsets[line['ID']].gloss - mappings[language][gloss].add( - (line['ID'], cgl, int(line['PRIORITY']), oc, 1)) - for gloss in list(mappings[language].keys()): - if gloss.lower() not in mappings[language]: - mappings[language][gloss.lower()] = set([ - (x[0], x[1], x[2], x[3], 0) for x in - mappings[language][gloss]]) + for line in reader(path, dicts=True): + gloss = line['GLOSS'].split('///')[1] + oc = translate.get(args.repos.conceptsets[line['ID']].ontological_category, 'other') + cgl = args.repos.conceptsets[line['ID']].gloss + mappings[language][gloss].add((line['ID'], cgl, int(line['PRIORITY']), oc, 1)) + + for gloss in list(mappings[language].keys()): + if gloss.lower() not in mappings[language]: + mappings[language][gloss.lower()] = { + (x[0], x[1], x[2], x[3], 0) for x in mappings[language][gloss]} for language, path in paths.items(): for k, v in mappings[language].items(): mappings[language][k] = sorted(v, key=lambda x: x[1], reverse=True) - with zipfile.ZipFile( - args.destination, - mode='w', - compression=zipfile.ZIP_DEFLATED - ) as myzip: + with zipfile.ZipFile(args.destination, mode='w', compression=zipfile.ZIP_DEFLATED) as myzip: myzip.writestr('concepticon.json', json.dumps(mappings)) diff --git a/src/pyconcepticon/commands/graph.py b/src/pyconcepticon/commands/graph.py index ed18184..ff3baaf 100644 --- a/src/pyconcepticon/commands/graph.py +++ b/src/pyconcepticon/commands/graph.py @@ -9,15 +9,17 @@ - NUMBER - CONCEPTICON_GLOSS """ +import json +import dataclasses +from typing import Any + from clldutils.clilib import Table, add_format from pyconcepticon.cli_util import add_conceptlist, get_conceptlist from pyconcepticon.util import read_dicts -import json - -def register(parser): +def register(parser): # pylint: disable=C0116 add_conceptlist(parser, multiple=True) add_format(parser, default='simple') parser.add_argument( @@ -25,11 +27,6 @@ def register(parser): action="store", default="LINKED_CONCEPTS", help="specify the column containing linked concepts") - parser.add_argument( - '--verbose', - action='store_true', - help='print check descriptions', - default=False) parser.add_argument( "--threshold", action='store', @@ -51,18 +48,45 @@ def register(parser): ) -def run(args): - header = args.weights +@dataclasses.dataclass(frozen=True) +class Link: + """A Link is a JSON object in a graph-valued column, linking to a conceptset.""" + id: str + name: str + properties: dict[str, Any] + + @classmethod + def from_json_object(cls, obj: dict[str, Any]): + """Turn JSON object into a Link.""" + return cls(id=obj.pop('ID'), name=obj.pop('NAME'), properties=obj) + + +@dataclasses.dataclass(frozen=True) +class GraphItem: + """A list of graph nodes specified in a column in c conceptlist.""" + links: list[Link] + id: str + gloss: str + + +def run(args): # pylint: disable=C0116 + header, rows = args.weights, [] + + for item in read_dicts(get_conceptlist(args, path_only=True)[0]): + item: GraphItem = GraphItem( + links=[Link.from_json_object(obj) for obj in json.loads(item[args.graph_column])], + id=item["ID"], + gloss=item.get("ENGLISH", item.get("GLOSS", "?"))) + + for link in item.links: + if args.threshold and args.threshold_property: + if link.properties[args.threshold_property] < args.threshold: + continue + if not header: + header = list(link.properties.keys()) + rows.append( + [item.id, item.gloss, link.id, link.name] + [link.properties[h] for h in header]) - with Table(args, *["SOURCE_ID", "SOURCE_NAME", "TARGET_ID", "TARGET_NAME"] + header) as t: - for idx, item in enumerate(read_dicts(get_conceptlist(args, path_only=True)[0]), start=2): - links = json.loads(item[args.graph_column]) - source_id, source_name = (item["ID"], item.get("ENGLISH", item.get("GLOSS", "?"))) - for link in links: - link_id, link_name = link["ID"], link["NAME"] - if args.threshold and args.threshold_property: - if link[args.threshold_property] < args.threshold: - continue - if not header: - header = [key for key in link if key not in ["ID", "NAME"]] - t.append([source_id, source_name, link_id, link_name] + [link[h] for h in header]) + with Table(args, "SOURCE_ID", "SOURCE_NAME", "TARGET_ID", "TARGET_NAME", *header) as t: + for row in rows: + t.append(row) diff --git a/src/pyconcepticon/commands/link.py b/src/pyconcepticon/commands/link.py index 38194ea..6d31a2e 100644 --- a/src/pyconcepticon/commands/link.py +++ b/src/pyconcepticon/commands/link.py @@ -5,73 +5,97 @@ ----- If either CONCEPTICON_GLOSS or CONCEPTICON_ID is given in the list, the other is added. """ -from pyconcepticon.util import rewrite, CS_GLOSS, CS_ID +import dataclasses +from typing import Optional, get_args, Literal +from collections.abc import Iterable + +from pyconcepticon.models import Conceptset +from pyconcepticon.util import rewrite from pyconcepticon.cli_util import add_conceptlist, get_conceptlist +LinkColType = Literal['CONCEPTICON_ID', 'CONCEPTICON_GLOSS'] +CS_ID = get_args(LinkColType)[0] +CS_GLOSS = get_args(LinkColType)[1] +RowType = list[str] + -def register(parser): +def register(parser): # pylint: disable=C0116 add_conceptlist(parser) -def run(args): +def run(args): # pylint: disable=C0116 cl = get_conceptlist(args, path_only=True) rewrite(cl, Linker(cl.stem, args.repos.conceptsets.values())) -class Linker(object): - def __init__(self, clid, conceptsets): - self.clid = clid - self.concepts = { +@dataclasses.dataclass(frozen=True) +class LinkCol: + """Index and name of a column that links to a concepticon conceptset.""" + index: int + col: LinkColType + + +@dataclasses.dataclass +class ColIndex: + """Bag to store indices of certain columns in the rows of a conceptlist.""" + cid: Optional[int] = None + cgloss: Optional[int] = None + number: Optional[int] = None + + +class Linker: # pylint: disable=R0903 + """Implements the rewriting of the conceptlist rows.""" + def __init__(self, clid: str, conceptsets: Iterable[Conceptset]): + self.clid: str = clid + self.concepts: dict[LinkColType, dict[str, str]] = { CS_ID: {cs.id: cs.gloss for cs in conceptsets}, # maps ID to GLOSS CS_GLOSS: {cs.gloss: cs.id for cs in conceptsets}, # maps GLOSS to ID } + self.col_index: ColIndex = ColIndex() + self.link_col: Optional[LinkCol] = None - self._cid_index = None - self._cgloss_index = None - self._link_col = (None, None) - self._number_index = None + def _header_row(self, row: RowType) -> RowType: + assert any(col in row for col in get_args(LinkColType)) + assert "NUMBER" in row + if all(col in row for col in get_args(LinkColType)): + self.col_index.cid = row.index(CS_ID) + self.col_index.cgloss = row.index(CS_GLOSS) + else: + # either CONCEPTICON_ID or CONCEPTICON_GLOSS is given, and the other is missing. + add = {CS_ID: CS_GLOSS, CS_GLOSS: CS_ID} + for j, col in enumerate(row): + if col in add: + col: LinkColType + row = [add[col]] + row + self.link_col = LinkCol(j, col) + break + if "ID" not in row: + self.col_index.number = row.index("NUMBER") + row = ["ID"] + row + return row - def __call__(self, i, row): + def __call__(self, i: int, row: RowType) -> RowType: if i == 0: - assert (CS_ID in row) or (CS_GLOSS in row) - assert "NUMBER" in row - if (CS_ID in row) and (CS_GLOSS in row): - self._cid_index = row.index(CS_ID) - self._cgloss_index = row.index(CS_GLOSS) - else: - # either CONCEPTICON_ID or CONCEPTICON_GLOSS is given, and the - # other is missing. - add = {CS_ID: CS_GLOSS, CS_GLOSS: CS_ID} - for j, col in enumerate(row): - if col in add: - row = [add[col]] + row - self._link_col = (j, col) - break - if "ID" not in row: - self._number_index = row.index("NUMBER") - row = ["ID"] + row - return row - - if self._link_col[1]: - val = self.concepts[self._link_col[1]].get(row[self._link_col[0]], "") + return self._header_row(row) + + if self.link_col: + val = self.concepts[self.link_col.col].get(row[self.link_col.index], "") if not val: # pragma: no cover - print("unknown %s: %s" % (self._link_col[1], row[self._link_col[0]])) + print(f"unknown {self.link_col}") row = [val] + row else: - cid = self.concepts[CS_GLOSS].get(row[self._cgloss_index], "") + cid = self.concepts[CS_GLOSS].get(row[self.col_index.cgloss], "") if not cid: - print("unknown CONCEPTICON_GLOSS: {0}".format(row[self._cgloss_index])) - elif cid != row[self._cid_index]: - if not row[self._cid_index]: - row[self._cid_index] = cid + print(f"unknown CONCEPTICON_GLOSS: {row[self.col_index.cgloss]}") + elif cid != row[self.col_index.cid]: + if not row[self.col_index.cid]: + row[self.col_index.cid] = cid else: - print( - "unknown CONCEPTICON_ID/GLOSS mismatch: %s %s" - % (row[self._cid_index], row[self._cgloss_index]) - ) + print(f"unknown CONCEPTICON_ID/GLOSS mismatch: " + f"{row[self.col_index.cid]} {row[self.col_index.cgloss]}") - if self._number_index is not None: - row = ["%s-%s" % (self.clid, row[self._number_index])] + row + if self.col_index.number is not None: + row = [f"{self.clid}-{row[self.col_index.number]}"] + row return row diff --git a/src/pyconcepticon/commands/lookup.py b/src/pyconcepticon/commands/lookup.py index 1cb3a2f..472311a 100644 --- a/src/pyconcepticon/commands/lookup.py +++ b/src/pyconcepticon/commands/lookup.py @@ -6,7 +6,7 @@ from pyconcepticon.cli_util import add_search -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument('gloss', metavar='GLOSS', nargs='+') add_format(parser, default='simple') parser.add_argument( @@ -17,7 +17,7 @@ def register(parser): add_search(parser) -def run(args): +def run(args): # pylint: disable=C0116 found = args.repos.lookup( args.gloss, language=args.language, diff --git a/src/pyconcepticon/commands/make_app.py b/src/pyconcepticon/commands/make_app.py index cbb6de4..febc8d3 100644 --- a/src/pyconcepticon/commands/make_app.py +++ b/src/pyconcepticon/commands/make_app.py @@ -12,19 +12,19 @@ from pyconcepticon import Concepticon -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument('--recreate', default=True, help=argparse.SUPPRESS) @Concepticon.app_wrapper -def run(args): +def run(args): # pylint: disable=C0116 data = collections.defaultdict(list) def key(g, l_): - return "{0}---{1}".format(g, l_) + return f"{g}---{l_}" for lang in ["en", "de", "zh", "fr", "ru", "es", "pt"]: - for cidx, gloss in args.api._get_map_for_language(lang): + for cidx, gloss in args.api._get_map_for_language(lang): # pylint: disable=W0212 g0, _, g1 = gloss.partition("///") csspec = ( cidx, @@ -38,6 +38,5 @@ def key(g, l_): data[key(g0.lower(), lang)].append(csspec) data["language"] = "en" args.api.appdatadir.joinpath("data.js").write_text( - "var Concepticon = {0};\n".format(json.dumps(data, indent=2)), - encoding='utf-8') + f"var Concepticon = {json.dumps(data, indent=2)};\n", encoding='utf-8') args.log.info("app data recreated") diff --git a/src/pyconcepticon/commands/make_linkdata.py b/src/pyconcepticon/commands/make_linkdata.py index 301866d..2caabe6 100644 --- a/src/pyconcepticon/commands/make_linkdata.py +++ b/src/pyconcepticon/commands/make_linkdata.py @@ -6,21 +6,25 @@ map* files contain lists of all concept-to-word-in-language mappings available within Concepticon. """ +import functools import collections -from csvw.dsv import UnicodeWriter +from pyconcepticon.util import UnicodeWriter -def run(args): +def run(args): # pylint: disable=C0116 + rep = _current_conceptsets(args.repos) for lang in args.repos.vocabularies["COLUMN_TYPES"].values(): if getattr(lang, "iso2", None): - _write_linking_data(args.repos, lang, args) + args.log.info(lang) + _write_linking_data(args.repos, lang, rep) -def _write_linking_data(api, lang, args): - out, freqs = collections.defaultdict(int), collections.defaultdict(int) - # find those concept sets that are wrongly linked, they should not go into - # the mapping, so we just make a re-linker here +def _current_conceptsets(api): + """ + find those concept sets that are wrongly linked, they should not go into + the mapping, so we just make a re-linker here + """ rep = {} for c in api.conceptsets.values(): if c.replacement_id: @@ -29,9 +33,18 @@ def _write_linking_data(api, lang, args): else: rep[c.id] = c.id rep[c.gloss] = c.gloss + return rep + + +def _local_gloss(rep, concepticon_gloss, local): + return f'{rep[concepticon_gloss]}///{local}' + + +def _get_frequencies(api, lang, rep): + out, freqs = collections.defaultdict(int), collections.defaultdict(int) + local_gloss = functools.partial(_local_gloss, rep) for clist in api.conceptlists.values(): - args.log.info("checking {clist.id}".format(clist=clist)) for row in clist.concepts.values(): if row.concepticon_id: gls = None @@ -43,29 +56,34 @@ def _write_linking_data(api, lang, args): gls = row.attributes[lang.name].strip("*$-—+") if gls: - out[rep[row.concepticon_gloss] + "///" + gls, rep[row.concepticon_id]] += 1 + out[local_gloss(row.concepticon_gloss, gls), rep[row.concepticon_id]] += 1 freqs[rep[row.concepticon_id]] += 1 + return out, freqs + + +def _write_linking_data(api, lang: str, rep): + out, freqs = _get_frequencies(api, lang, rep) if lang.iso2 == "en": for cset in api.conceptsets.values(): - gloss = rep[cset.gloss] - cid = rep[cset.id] + lgloss = cset.gloss.lower() + if cset.ontological_category == "Person/Thing": - out[gloss + "///the " + cset.gloss.lower(), cid] = freqs[cid] - out[gloss + "///the " + cset.gloss.lower() + "s", cid] = freqs[cid] + lgloss = "the " + lgloss + out[_local_gloss(rep, cset.gloss, lgloss + "s"), rep[cset.id]] = freqs[rep[cset.id]] elif cset.ontological_category == "Action/Process": - out[gloss + "///to " + cset.gloss.lower(), cid] = freqs[cid] + lgloss = "to " + lgloss elif cset.ontological_category == "Property": - out[gloss + "///" + cset.gloss.lower() + " (adjective)", cid] = freqs[cid] + lgloss += " (adjective)" elif cset.ontological_category == "Classifier": - out[gloss + "///" + cset.gloss.lower() + " (classifier)", cid] = freqs[cid] - else: - out[gloss + "///" + cset.gloss.lower(), cid] = freqs[cid] + lgloss += " (classifier)" + + out[_local_gloss(rep, cset.gloss, lgloss), rep[cset.id]] = freqs[rep[cset.id]] - p = api.path("mappings", "map-{0}.tsv".format(lang.iso2)) + p = api.path("mappings", f"map-{lang.iso2}.tsv") if not p.parent.exists(): p.parent.mkdir() - with UnicodeWriter(p, delimiter="\t") as f: + with UnicodeWriter(p) as f: f.writerow(["ID", "GLOSS", "PRIORITY"]) - for i, (gloss, cid) in enumerate(sorted(out)): + for gloss, cid in sorted(out): f.writerow([cid, gloss, out[gloss, cid]]) diff --git a/src/pyconcepticon/commands/map_concepts.py b/src/pyconcepticon/commands/map_concepts.py index ecef467..cf52324 100644 --- a/src/pyconcepticon/commands/map_concepts.py +++ b/src/pyconcepticon/commands/map_concepts.py @@ -14,7 +14,7 @@ from pyconcepticon.cli_util import add_conceptlist, get_conceptlist, _get_conceptlist, add_search -def register(parser): +def register(parser): # pylint: disable=C0116 add_conceptlist(parser) parser.add_argument( '--reference-list', @@ -33,7 +33,7 @@ def register(parser): default=None) -def run(args): +def run(args): # pylint: disable=C0116 # Note: Due to https://github.com/concepticon/pyconcepticon/issues/10 we require specification # of an output file on Windows: if platform.system() == 'Windows' and not args.output: # pragma: no cover diff --git a/src/pyconcepticon/commands/mergers.py b/src/pyconcepticon/commands/mergers.py index 8141645..ed5ef25 100644 --- a/src/pyconcepticon/commands/mergers.py +++ b/src/pyconcepticon/commands/mergers.py @@ -4,13 +4,11 @@ from pyconcepticon.cli_util import add_conceptlist, get_conceptlist -def register(parser): +def register(parser): # pylint: disable=C0116 add_conceptlist(parser) -def run(args): +def run(args): # pylint: disable=C0116 # @todo: check output - cl = get_conceptlist(args) - mapped, mapped_ratio, mergers = cl.stats() - for k, v in mergers: + for k, v in get_conceptlist(args).stats().mergers: print(k, v) diff --git a/src/pyconcepticon/commands/notlinked.py b/src/pyconcepticon/commands/notlinked.py index d6bf81e..6200473 100644 --- a/src/pyconcepticon/commands/notlinked.py +++ b/src/pyconcepticon/commands/notlinked.py @@ -4,7 +4,7 @@ import re -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument( '--full', action='store_true', @@ -29,7 +29,7 @@ def register(parser): ) -def run(args): +def run(args): # pylint: disable=C0116 i, notlinked = 0, [] for _, cl in sorted(args.repos.conceptlists.items(), key=lambda p: p[0]): if (not args.inid) or args.inid in cl.id: @@ -38,6 +38,7 @@ def run(args): key=lambda p: int(re.match('([0-9]+)', p.number).groups()[0])): if not concept.concepticon_id: notlinked.append(concept) + to = [('1', args.gloss)] if args.gloss else None for j, matches in enumerate(args.repos.lookup( [c.label for c in notlinked], full_search=not args.full, to=to)): @@ -46,4 +47,4 @@ def run(args): cid, cgl = candidates[0][2:4] if cgl <= args.similarity_threshold: i += 1 - print('{0} {1.id}: {1.label}: {2} [{3}]'.format(i, notlinked[j], cid, cgl)) + print(f'{i} {notlinked[j].id}: {notlinked[j].label}: {cid} [{cgl}]') diff --git a/src/pyconcepticon/commands/recreate_networks.py b/src/pyconcepticon/commands/recreate_networks.py index edf6bea..7f93554 100644 --- a/src/pyconcepticon/commands/recreate_networks.py +++ b/src/pyconcepticon/commands/recreate_networks.py @@ -4,13 +4,16 @@ import json import shutil import subprocess - -from csvw.dsv import reader +import dataclasses +from typing import Any, Union +import collections +from collections.abc import Sequence, Generator from pyconcepticon.models import CONCEPT_NETWORK_COLUMNS +from pyconcepticon.util import reader -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument( '--download', action='store_true', @@ -23,37 +26,79 @@ def register(parser): help="Do not overwrite lists, but compute diff") -def idname(t): - d = dict(t) - rem = '\t'.join('{}: {}'.format(k, v) for k, v in t if k not in ['ID', 'NAME']) - return '{}\t{}\t{}'.format(d['ID'], d.get('NAME', ''), rem) +# A dict represented as sequence of key-value pairs. +HashableDictType = Sequence[tuple[str, Union[str, int, float, Sequence[Union[str, int, float]]]]] +# An unordered sequence of HashableDictType. +ComparableJsonType = set[HashableDictType] +RowIdColNameType = tuple[str, str] -def hashable_dict(d): - return tuple(sorted([(k, tuple(v) if isinstance(v, list) else v) for k, v in d.items()])) +@dataclasses.dataclass +class NetworkDiffer: + """ + Differ for unordered sequences (aka sets) of JSON objects. + >>> d = NetworkDiffer() + >>> d.add_pair('row', 'col', '[{"ID": 5},{"ID": 3}]', '[{"ID": 3},{"ID": 7}]') + >>> for r, c, old, new in d.iter_diff(): + ... print(old - new) + ... print(new - old) + ... + {(('ID', 5),)} + {(('ID', 7),)} + """ + pairs: collections.OrderedDict[ + RowIdColNameType, tuple[ComparableJsonType, ComparableJsonType] + ] = dataclasses.field(default_factory=collections.OrderedDict) + + @staticmethod + def _hashable_dicts(jsonval) -> ComparableJsonType: + return set( + tuple(sorted([(k, tuple(v) if isinstance(v, list) else v) for k, v in d.items()])) + for d in json.loads(jsonval or '[]')) + + def add_pair(self, rowid: str, col: str, jsonval1: str, jsonval2: str): + """Add a pair of (possibly different) values for the same row and column.""" + self.pairs[rowid, col] = (self._hashable_dicts(jsonval1), self._hashable_dicts(jsonval2)) + + def iter_diff(self) -> Generator[ + tuple[str, str, list[collections.OrderedDict], list[collections.OrderedDict]], + None, + None + ]: + """Yields a quadruple (rowid, col, minus-items, plus-items) for each different pair.""" + for (rowid, col), (old, new) in self.pairs.items(): + if old != new: + yield ( + rowid, + col, + [collections.OrderedDict(i) for i in old - new], + [collections.OrderedDict(i) for i in new - old]) -def diff(new, old): - old = {r['ID']: r for r in reader(old, dicts=True, delimiter='\t')} - new = {r['ID']: r for r in reader(new, dicts=True, delimiter='\t')} - for k, i1 in old.items(): - i2 = new[k] +def diff(new, old): + """Compute and print differences between network-valued columns.""" + differ = NetworkDiffer() + new = {r['ID']: r for r in reader(new, dicts=True)} + for oldrow in reader(old, dicts=True): for col in CONCEPT_NETWORK_COLUMNS: - if col in i1: - v1 = set(hashable_dict(i) for i in json.loads(i1[col] or '[]')) - v2 = set(hashable_dict(i) for i in json.loads(i2[col] or '[]')) - if v1 != v2: - print('== {}\t{}'.format(k, col)) - for ii in v1: - if ii not in v2: - print('-- {}'.format(idname(ii))) - for ii in v2: - if ii not in v1: - print('++ {}'.format(idname(ii))) - - -def run(args): + if col in oldrow: + differ.add_pair(oldrow['ID'], col, oldrow[col], new[oldrow['ID']][col]) + + def idname(d: collections.OrderedDict[str, Any]) -> str: + """Format""" + rem = '\t'.join(f'{k}: {v}' for k, v in d.items() if k not in ['ID', 'NAME']) + return f'{d.get("ID", "")}\t{d.get("NAME", "")}\t{rem}' + + for rowid, col, minus, plus in differ.iter_diff(): + print(f'== {rowid}\t{col}') + for ii in minus: + print(f'-- {idname(ii)}') + for ii in plus: + print(f'++ {idname(ii)}') + + +def run(args): # pylint: disable=C0116 for cl in args.repos.conceptlists.values(): d = cl.path.parent / cl.path.stem if d.exists() and d.is_dir(): diff --git a/src/pyconcepticon/commands/rename.py b/src/pyconcepticon/commands/rename.py index 61fa97e..0f64364 100644 --- a/src/pyconcepticon/commands/rename.py +++ b/src/pyconcepticon/commands/rename.py @@ -7,14 +7,14 @@ """ import collections -from csvw.dsv import UnicodeWriter, reader from clldutils.clilib import ParserError from clldutils import jsonlib from pyconcepticon.models import MD_SUFFIX, CONCEPTLIST_ID_PATTERN +from pyconcepticon.util import UnicodeWriter, reader, rewrite -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument( 'from_', metavar='FROM', @@ -27,33 +27,32 @@ def register(parser): ) -def run(args): +def run(args): # pylint: disable=C0116 if not CONCEPTLIST_ID_PATTERN.match(args.to): - raise ParserError('Invalid conceptlist ID {0}'.format(args.to)) # pragma: no cover + raise ParserError(f'Invalid conceptlist ID {args.to}') # pragma: no cover if args.to in args.repos.conceptlists: - raise ParserError('Target ID {0} exists!'.format(args.to)) # pragma: no cover + raise ParserError(f'Target ID {args.to} exists!') # pragma: no cover try: cl = args.repos.conceptlists[args.from_] - except KeyError: # pragma: no cover - raise ParserError('Source conceptlist {0} does not exist!'.format(args.from_)) + except KeyError as e: # pragma: no cover + raise ParserError(f'Source conceptlist {args.from_} does not exist!') from e + + def retire(what, from_, to_): + args.repos.add_retirement(what, {'id': from_, 'comment': 'renaming', 'replacement': to_}) # write the adapted concept list to the new path: - with UnicodeWriter( - cl.path.parent / cl.path.name.replace(args.from_, args.to), delimiter='\t') as writer: - header = [] - for i, row in enumerate(reader(cl.path, delimiter='\t')): + with UnicodeWriter(cl.path.parent / cl.path.name.replace(args.from_, args.to)) as writer: + header: dict[str, int] = {} + for i, row in enumerate(reader(cl.path)): if i == 0: - header = row - writer.writerow(row) - header = {v: k for k, v in enumerate(header)} # Map col name to row index + header = {v: k for k, v in enumerate(row)} # Map col name to row index else: oid = row[header['ID']] assert oid.startswith(args.from_) nid = oid.replace(args.from_, args.to) - args.repos.add_retirement( - 'Concept', dict(id=oid, comment='renaming', replacement=nid)) + retire('Concept', oid, nid) row[header['ID']] = nid - writer.writerow(row) + writer.writerow(row) # write adapted metadata to the new path: fname_md = cl.path.name.replace(args.from_, args.to) + MD_SUFFIX @@ -69,17 +68,11 @@ def run(args): cl.path.parent.joinpath(cl.path.name + MD_SUFFIX).unlink() # adapt conceptlists.tsv - rows = [] - for row in reader(args.repos.data_path('conceptlists.tsv'), delimiter='\t'): - rows.append([col.replace(args.from_, args.to) if col else col for col in row]) - - with UnicodeWriter(args.repos.data_path('conceptlists.tsv'), delimiter='\t') as writer: - writer.writerows(rows) - - args.repos.add_retirement( - 'Conceptlist', dict(id=args.from_, comment='renaming', replacement=args.to)) - - print("""Please run -grep -r "{0}" concepticondata/ | grep -v retired.json + rewrite( + args.repos.data_path('conceptlists.tsv'), + lambda _, row: [col.replace(args.from_, args.to) if col else col for col in row]) -to confirm the renaming was complete!""".format(args.from_)) + retire('Conceptlist', args.from_, args.to) + print(f'Please run\n' + f'grep -r "{args.from_}" concepticondata/ | grep -v retired.json' + f'\n\nto confirm the renaming was complete!') diff --git a/src/pyconcepticon/commands/shrink.py b/src/pyconcepticon/commands/shrink.py index a8835f9..92e9119 100644 --- a/src/pyconcepticon/commands/shrink.py +++ b/src/pyconcepticon/commands/shrink.py @@ -10,7 +10,7 @@ from pyconcepticon.cli_util import add_conceptlist, get_conceptlist -def register(parser): +def register(parser): # pylint: disable=C0116 add_conceptlist(parser) parser.add_argument('column', metavar='COLUMN') parser.add_argument( @@ -19,7 +19,7 @@ def register(parser): default=None) -def run(args): +def run(args): # pylint: disable=C0116 dicts = list(dsv.reader(get_conceptlist(args, path_only=True), delimiter="\t", dicts=True)) out_dict = collections.OrderedDict() diff --git a/src/pyconcepticon/commands/stats.py b/src/pyconcepticon/commands/stats.py index f396bc1..d0443e6 100644 --- a/src/pyconcepticon/commands/stats.py +++ b/src/pyconcepticon/commands/stats.py @@ -3,79 +3,104 @@ """ import operator import collections +import dataclasses from clldutils.markup import Table from pyconcepticon.cli_util import readme -def run(args): +def run(args): # pylint: disable=C0116 cls = args.repos.conceptlists.values() readme_conceptlists(args.repos, cls, args) readme_concepticondata(args.repos, cls) def readme_conceptlists(api, cls, args): + """Write README.md in the conceptlists directory.""" table = Table("name", "# mapped", "% mapped", "mergers") for cl in cls: args.log.info("processing <" + cl.path.name + ">") - mapped, mapped_ratio, mergers = cl.stats() - table.append(["[%s](%s) " % (cl.id, cl.path.name), len(mapped), mapped_ratio, len(mergers)]) + stats = cl.stats() + table.append([ + f"[{cl.id}]({cl.path.name}) ", + len(stats.mapped), + stats.mapped_ratio_percent, + len(stats.mergers)]) readme( api.data_path("conceptlists"), - "# Concept Lists\n\n{0}".format(table.render(verbose=True, sortkey=operator.itemgetter(0))), + f"# Concept Lists\n\n{table.render(verbose=True, sortkey=operator.itemgetter(0))}", ) +@dataclasses.dataclass +class Concepts: + """Container for concept info suitable to derive summary stats.""" + by_concepticon_gloss: dict[str, tuple[str, str]] = dataclasses.field( + default_factory=lambda: collections.defaultdict(list)) + by_label: dict[str, tuple[str, str, str]] = dataclasses.field( + default_factory=lambda: collections.defaultdict(list)) + label_counter: dict[str, int] = dataclasses.field(default_factory=collections.Counter) + + def add(self, concept, cl): # pylint: disable=C0116 + self.by_concepticon_gloss[concept.concepticon_gloss].append((cl.id, concept.label)) + self.by_label[concept.label].append( + (concept.concepticon_id, concept.concepticon_gloss, cl.id)) + self.label_counter.update([concept.label]) + + @property + def n_conceptsets(self) -> int: # pylint: disable=C0116 + return len(self.by_concepticon_gloss) + + @property + def concepts_per_conceptset(self) -> float: # pylint: disable=C0116 + return sum(len(v) for v in self.by_concepticon_gloss.values()) / self.n_conceptsets + + @property + def unique_labels_per_conceptset(self) -> float: # pylint: disable=C0116 + return sum(len({label for _, label in v}) for v in self.by_concepticon_gloss.values()) \ + / self.n_conceptsets + + def readme_concepticondata(api, cls): """ Returns a dictionary with concept set label as value and tuples of concept list identifier and concept label as values. """ - D, G = collections.defaultdict(list), collections.defaultdict(list) - labels = collections.Counter() - + concepts = Concepts() for cl in cls: for concept in [c for c in cl.concepts.values() if c.concepticon_id]: - D[concept.concepticon_gloss].append((cl.id, concept.label)) - G[concept.label].append((concept.concepticon_id, concept.concepticon_gloss, cl.id)) - labels.update([concept.label]) - txt = [""" -# Concepticon Statistics -* concept sets (used): {0} -* concept lists: {1} -* concept labels: {2} -* concept labels (unique): {3} -* Ø concepts per list: {4:.2f} -* Ø concepts per concept set: {5:.2f} -* Ø unique concept labels per concept set: {6:.2f} - -""".format( - len(D), - len(cls), - sum(list(labels.values())), - len(labels), - sum(list(labels.values())) / len(cls), - sum([len(v) for k, v in D.items()]) / len(D), - sum([len(set([label for _, label in v])) for k, v in D.items()]) / len(D), - )] + concepts.add(concept, cl) + + txt = [ + "", + "# Concepticon Statistics", + f"* concept sets (used): {concepts.n_conceptsets}", + f"* concept lists: {len(cls)}", + f"* concept labels: {sum(concepts.label_counter.values())}", + f"* concept labels (unique): {len(concepts.label_counter)}", + f"* Ø concepts per list: {sum(concepts.label_counter.values()) / len(cls):.2f}", + f"* Ø concepts per concept set: {concepts.concepts_per_conceptset:.2f}", + f"* Ø unique concept labels per concept set: {concepts.unique_labels_per_conceptset:.2f}", + "", + ] for attr, key in [ - ("Diverse", lambda x: (len(set([label for _, label in x[1]])), x[0] or "")), - ("Frequent", lambda x: (len(set([clist for clist, _ in x[1]])), x[0] or "18G18G")), + # Most diverse conceptsets, i.e. the ones with the highest number of distinct glosses + # mapped. + ("Diverse", lambda x: (-len({label for _, label in x[1]}), x[0] or "")), + # Most frequent conceptsets, i.e. the ones for which there are concepts in most lists. + ("Frequent", lambda x: (-len({clist for clist, _ in x[1]}), x[0] or "18G18G")), ]: table = Table("No.", "concept set", "distinct labels", "concept lists", "examples") - for i, (k, v) in enumerate(sorted(D.items(), key=key, reverse=True)[:20]): + for i, (k, v) in enumerate(sorted(concepts.by_concepticon_gloss.items(), key=key)[:20]): table.append([ i + 1, k, - len(set([label for _, label in v])), - len(set([clist for clist, _ in v])), - ", ".join( - sorted(set(["«{0}»".format(label.replace("*", "`*`")) for _, label in v])) - ), + len({label for _, label in v}), + len({clist for clist, _ in v}), + ", ".join(sorted({f'«{label.replace("*", "`*`")}»' for _, label in v})), ]) - txt.append("## Twenty Most {0} Concept Sets\n\n{1}\n".format(attr, table.render())) + txt.append(f"## Twenty Most {attr} Concept Sets\n\n{table.render()}\n") readme(api.data_path(), txt) - return D, G diff --git a/src/pyconcepticon/commands/test.py b/src/pyconcepticon/commands/test.py index 094a1c6..5ecde4e 100644 --- a/src/pyconcepticon/commands/test.py +++ b/src/pyconcepticon/commands/test.py @@ -9,7 +9,7 @@ """ -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument( 'clids', metavar='CONCEPTLIST_ID', @@ -17,8 +17,8 @@ def register(parser): nargs='*') -def run(args): +def run(args): # pylint: disable=C0116 if args.repos.check(*args.clids): # pragma: no cover args.log.info("all integrity tests passed: OK") else: # pragma: no cover - args.log.error("inconsistent data in repository {0}".format(args.repos.repos)) + args.log.error("inconsistent data in repository %s", args.repos.repos) diff --git a/src/pyconcepticon/commands/upload_sources.py b/src/pyconcepticon/commands/upload_sources.py index b214e11..01e6b22 100644 --- a/src/pyconcepticon/commands/upload_sources.py +++ b/src/pyconcepticon/commands/upload_sources.py @@ -17,14 +17,14 @@ from pyconcepticon.cli_util import readme -def register(parser): +def register(parser): # pylint: disable=C0116 parser.add_argument( '--cdstar-catalog', default=os.environ.get("CDSTAR_CATALOG"), help='Path to global CDSTAR catalog') -def run(args): +def run(args): # pylint: disable=C0116 toc = ["# Sources\n"] with SourcesCatalog(args.repos.data_path("sources", "cdstar.json")) as lcat: with Catalog( @@ -43,6 +43,6 @@ def run(args): for key in sorted(lcat.items): spec = lcat.get(key) - toc.append("- [{0} [PDF {1}]]({2})".format(key, format_size(spec["size"]), spec["url"])) + toc.append(f'- [{key} [PDF {format_size(spec["size"])}]]({spec["url"]})') readme(args.repos.data_path("sources"), toc) diff --git a/src/pyconcepticon/commands/validate.py b/src/pyconcepticon/commands/validate.py index 1cfcb23..fa5110a 100644 --- a/src/pyconcepticon/commands/validate.py +++ b/src/pyconcepticon/commands/validate.py @@ -8,8 +8,8 @@ """ -def run(args): +def run(args): # pylint: disable=C0116 for cl in args.repos.conceptlists.values(): items = list(cl.metadata) if set(items[0].keys()) != set(c.name for c in cl.metadata.tableSchema.columns): - print("unspecified column in concept list {0}".format(cl.id)) + print(f"unspecified column in concept list {cl.id}") diff --git a/src/pyconcepticon/glosses.py b/src/pyconcepticon/glosses.py index 1e4a5ed..5d585fa 100644 --- a/src/pyconcepticon/glosses.py +++ b/src/pyconcepticon/glosses.py @@ -2,68 +2,309 @@ Module provides functions for the handling of concept glosses in linguistic datasets. """ import re -import typing +import enum +from typing import Union, Literal, Callable, Any, Optional, get_args import functools +import itertools import collections +from collections.abc import Iterable, Generator +import dataclasses -import attr +from .util import read_dicts, UnicodeWriter +from ._compat import StrEnum -__all__ = ['parse_gloss', 'Gloss', 'concept_map'] +__all__ = [ + 'parse_gloss', 'Gloss', 'concept_map', 'Mapping', 'Similarity', 'Pos', 'map_list', + 'GlossLanguage', 'MapOptions'] -@attr.s -class Gloss(object): - main = attr.ib(default='') +class Pos(enum.Enum): + """Recognized parts of speech in glosses.""" + NOUN = enum.auto() + VERB = enum.auto() + ADJECTIVE = enum.auto() + ADVERB = enum.auto() + CLASSIFIER = enum.auto() + + @classmethod + def from_string(cls, s: str) -> 'Pos': + """Get the enum symbol from its name.""" + return getattr(cls, s.upper()) + + +class Similarity(enum.IntEnum): + """Enum to make similarity measures more transparent.""" + SAME = 1 + SAME_DIFFERENT_POS = 2 + SAME_MAIN = 3 + SAME_MAIN_DIFFERENT_POS = 4 + SAME_LONGEST = 5 + SAME_LONGEST_DIFFERENT_POS = 6 + LONGEST_IS_CONTAINED = 7 + LONGEST_CONTAINS = 8 + DIFFERENT = 100 + + @classmethod + def from_int(cls, n: int = 5) -> Optional['Similarity']: + """Get the enum symbol from its name.""" + if isinstance(n, cls): + return n + for sim in cls: + if sim.value == n: + return sim + return None # pragma: no cover + + +class GlossLanguage(StrEnum): + """Languages used for glossing in conceptlists.""" + FRENCH = 'fr' + ENGLISH = 'en' + SPANISH = 'es' + GERMAN = 'de' + POLISH = 'pl' + LATIN = 'lt' + CHINESE = 'zh' + PORTUGUESE = 'pt' + RUSSIAN = 'ru' + ITALIAN = 'it' + + @classmethod + def from_string(cls, s: str = 'en') -> Optional['GlossLanguage']: + """Get the enum symbol from its name.""" + if isinstance(s, cls): + return s + for lg in cls: + if lg.value == s: + return lg + return None # pragma: no cover + + +LanguageType = Union[str, GlossLanguage] + + +@dataclasses.dataclass +class Gloss: # pylint: disable=too-many-instance-attributes + """ + A gloss, as parsed from a string with several constituent parts. + + >>> Gloss.from_string('word [comment]').comment + 'comment' + """ + main: str = '' # the start character indicating a potential comment: - comment_start = attr.ib(default='') + comment_start: str = '' # the comment (everything occurring in brackets in the input string: - comment = attr.ib(default='') + comment: str = '' # the end character indicating the end of a potential comment: - comment_end = attr.ib(default='') + comment_end: str = '' # the part of speech, in case this was specificied by a preceding "the" or a # preceding "to" in the mainpart of the string: - pos = attr.ib(default='') + pos: Optional[Pos] = None # the prefix, that is, words, like, eg. "be", "in", which may precede the main # gloss in concept lists, as in "be quiet": - prefix = attr.ib(default='') + prefix: str = '' # the longest constituent, which is identical with the main part if there's no # whitespace in the main part, otherwise the longest part part of the main gloss # split by whitespace: - longest_part = attr.ib(default='') + longest_part: str = '' # the original gloss (for the purpose of testing): - gloss = attr.ib(default='', converter=lambda s: s.lower().replace('*', '')) + gloss: str = '' - frequency = attr.ib(default=0) + frequency: int = 0 - @functools.cached_property - def tokens(self): - return ' '.join(s for s in self.gloss.split() if s not in ['or']) + def __post_init__(self): + self.gloss = self.gloss.lower().replace('*', '') + if isinstance(self.pos, str): + if self.pos: + self.pos = Pos.from_string(self.pos) + else: + self.pos = None - def similarity(self, other): + def similarity(self, other) -> Similarity: # pylint: disable=R0911 + """Compute similarity between two glosses.""" + same_pos = self.pos and self.pos == other.pos # first-order-match: identical glosses if self.gloss == other.gloss: - if self.pos and self.pos == other.pos: - return 1 - return 2 + if same_pos: + return Similarity.SAME + return Similarity.SAME_DIFFERENT_POS # second-order match: identical main-parts - if self.main == other.gloss or self.gloss == other.main or\ - self.main == other.main: + if self.main == other.gloss or self.gloss == other.main or self.main == other.main: # best match if pos matches - return 3 if self.pos and self.pos == other.pos else 4 + if same_pos: + return Similarity.SAME_MAIN + return Similarity.SAME_MAIN_DIFFERENT_POS if self.longest_part == other.longest_part: - return 5 if self.pos and self.pos == other.pos else 6 + if same_pos: + return Similarity.SAME_LONGEST + return Similarity.SAME_LONGEST_DIFFERENT_POS if other.longest_part in self.main.split(): - return 7 + return Similarity.LONGEST_IS_CONTAINED if self.longest_part in other.main.split(): - return 8 - return 100 + return Similarity.LONGEST_CONTAINS + return Similarity.DIFFERENT @classmethod - def from_string(cls, s, language='en'): + def from_string(cls, s: str, language: LanguageType = GlossLanguage.ENGLISH) -> 'Gloss': + """Parse a gloss from the string.""" + if isinstance(language, str): + language = GlossLanguage.from_string(language) return parse_gloss(s, language=language)[0] -def parse_gloss(gloss, language='en'): +POS_MARKERS_BY_LANGUAGE = { + 'en': {'the': 'noun', 'a': 'noun', 'to': 'verb'}, + 'de': {'der': 'noun', 'die': 'noun', 'das': 'noun'}, + 'fr': { + 'le': 'noun', + 'la': 'noun', + 'les': 'noun', + 'du': 'noun', + 'des': 'noun', + 'de': 'noun', + 'un': 'noun', + 'une': 'noun', + }, + 'es': { + "el": "noun", + "la": "noun", + "los": "noun", + "mi": "noun", + "un": "noun", + "una": "noun", + "unos": "noun", + "las": "noun", + "su": "noun", + } +} +PREFIXES_BY_LANGUAGE = { + 'en': ['be', 'in', 'at'], + 'fr': ['il', 'est'], + 'es': ["lo", "les", "le"], +} +POS_ABBREVIATIONS = [ + ('vb', 'verb'), + ('v.', 'verb'), + ('v', 'verb'), + ('adj', 'adjective'), + ('nn', 'noun'), + ('n.', 'noun'), + ('adv', 'adverb'), + ('noun', 'noun'), + ('verb', 'verb'), + ('adjective', 'adjective'), + ('cls', 'classifier') +] + + +@dataclasses.dataclass +class ParseSpec: + """Specification (and implementation) for the parsing of glosses for comparison.""" + pos_markers: dict[str, Pos] + prefixes: list[str] + pos_abbreviations: list[tuple[str, Pos]] + punctuation: str = '?!"¨:;,»«´“”*+-' + split_pattern: re.Pattern = re.compile(r',|;|:|/| or | OR ') + comment_marker: dict[str, str] = dataclasses.field( + default_factory=lambda: {'(': ')', '[': ']', '{': '}', '(': ')', '<': '>'}) + + @classmethod + def for_language( + cls, + language: Optional[Union[str, GlossLanguage]] = GlossLanguage.ENGLISH, + ) -> 'ParseSpec': + """Get a ParseSpec, optionally tuned to a particular gloss language.""" + if isinstance(language, str): + language = GlossLanguage.from_string(language) + pos_markers = POS_MARKERS_BY_LANGUAGE.get(language.value, {}) + pos_markers = {k: Pos.from_string(v) for k, v in pos_markers.items()} + abbreviations = [(k, Pos.from_string(v)) for k, v in POS_ABBREVIATIONS] + return cls( + pos_markers, + PREFIXES_BY_LANGUAGE.get(language.value, []), + # Sort abbreviations by descending length. + sorted(abbreviations, key=lambda x: len(x[0]), reverse=True), + ) + + def split_constituents(self, gloss): + """ + >>> spec = ParseSpec.for_language('en') + >>> spec.split_constituents('arm OR hand') + ['arm', 'hand', 'arm / hand'] + """ + constituents = [x.strip() for x in self.split_pattern.split(gloss) if x.strip()] + if len(constituents) > 1: + constituents += [' / '.join(sorted([c.strip() for c in constituents]))] + return constituents + + def _strip_comments(self, constituent: str, res: Gloss) -> str: + mainpart = '' + in_comment: list[str] = [] + for char in constituent: + if char in self.comment_marker: + in_comment.append(self.comment_marker[char]) + if not res.comment_start: + res.comment_start = char + else: + res.comment += char + continue + if in_comment and char == in_comment[-1]: + in_comment.pop() + if not in_comment: + res.comment_end = char + else: + res.comment += char + continue + if in_comment: + res.comment += char + else: + mainpart += char + return mainpart + + def _strip_punctuation(self, s: str) -> str: + return ''.join(c for c in s if c not in self.punctuation) + + def parse_constituent( + self, + full_gloss, + constituent, + gpos: Optional[Pos] = None, + ) -> tuple[Optional[Gloss], Optional[Pos]]: + """Parse a gloss constituent into a proper Gloss or part-of-speech information.""" + gloss = Gloss(gloss=full_gloss) + mainpart = self._strip_comments(constituent, gloss) + mainpart = self._strip_punctuation(mainpart).strip().lower().split() + + # search for pos-markers + if gpos: + gloss.pos = gpos + else: + if len(mainpart) > 1 and mainpart[0] in self.pos_markers: + gpos = gloss.pos = self.pos_markers[mainpart.pop(0)] + + # search for strip-off-prefixes + if len(mainpart) > 1 and mainpart[0] in self.prefixes: + gloss.prefix = mainpart.pop(0) + + if mainpart: + # check for a "first part" in case we encounter white space in the + # data (and return only the largest string of them) + gloss.longest_part = sorted(mainpart, key=len)[-1] + + # search for pos in comment + if not gloss.pos: + cparts = gloss.comment.split() + for p, t in self.pos_abbreviations: + if p in cparts or p in mainpart or t.name in cparts or t.name in mainpart: + gloss.pos = t + break + + gloss.main = ' '.join(mainpart) + return gloss, gpos + return None, gpos + + +def parse_gloss(gloss: str, language: LanguageType = GlossLanguage.ENGLISH) -> list[Gloss]: """ Parse a gloss into its constituents by applying some general logic. @@ -103,161 +344,182 @@ def parse_gloss(gloss, language='en'): and may thus help to compare different glosses across different resources. """ if not gloss: - print(gloss) raise ValueError("Your gloss is empty") - G = [] - gpos = '' - pos_markers = { - 'en': {'the': 'noun', 'a': 'noun', 'to': 'verb'}, - 'de': {'der': 'noun', 'die': 'noun', 'das': 'noun'}, - 'fr': { - 'le': 'noun', - 'la': 'noun', - 'les': 'noun', - 'du': 'noun', - 'des': 'noun', - 'de': 'noun', - 'un': 'noun', - 'une': 'noun', - }, - 'es': { - "el": "noun", - "la": "noun", - "los": "noun", - "mi": "noun", - "un": "noun", - "una": "noun", - "unos": "noun", - "las": "noun", - "su": "noun", - } - }.get(language, {}) - prefixes = { - 'en': ['be', 'in', 'at'], - 'fr': ['il', 'est'], - 'es': ["lo", "les", "le"], - }.get(language, []) - abbreviations = [ - ('vb', 'verb'), - ('v.', 'verb'), - ('v', 'verb'), - ('adj', 'adjective'), - ('nn', 'noun'), - ('n.', 'noun'), - ('adv', 'adverb'), - ('noun', 'noun'), - ('verb', 'verb'), - ('adjective', 'adjective'), - ('cls', 'classifier') - ] + spec = ParseSpec.for_language(language) # we use /// as our internal marker for glosses preceded by concepticon # gloss information and followed by literal readings if '///' in gloss: gloss = gloss.split('///')[1] - # if the gloss consists of multiple parts, we store both the separate part - # and a normalized form of the full gloss - constituents = [x.strip() for x in re.split(',|;|:|/| or | OR ', gloss) if x.strip()] - if len(constituents) > 1: - constituents += [' / '.join(sorted([c.strip() for c in constituents]))] - - for constituent in constituents: + glosses = [] + gpos = None + for constituent in spec.split_constituents(gloss): if constituent.strip(): - res = Gloss(gloss=gloss) - mainpart = '' - in_comment = False - for char in constituent: - if char in '([{(<': - in_comment = True - res.comment_start += char - elif char in ')]})>': - in_comment = False - res.comment_end += char - else: - if in_comment: - res.comment += char - else: - mainpart += char - - mainpart = ''.join(m for m in mainpart if m not in '?!"¨:;,»«´“”*+-')\ - .strip().lower().split() + res, gpos = spec.parse_constituent(gloss, constituent, gpos) + if res: + glosses.append(res) - # search for pos-markers - if gpos: - res.pos = gpos - else: - if len(mainpart) > 1 and mainpart[0] in pos_markers: - gpos = res.pos = pos_markers[mainpart.pop(0)] + return glosses - # search for strip-off-prefixes - if len(mainpart) > 1 and mainpart[0] in prefixes: - res.prefix = mainpart.pop(0) - if mainpart: - # check for a "first part" in case we encounter white space in the - # data (and return only the largest string of them) - res.longest_part = sorted(mainpart, key=lambda x: len(x))[-1] +GlossDictType = dict[int, list[Gloss]] - # search for pos in comment - if not res.pos: - cparts = res.comment.split() - for p, t in sorted( - abbreviations, key=lambda x: len(x[0]), reverse=True): - if p in cparts or p in mainpart or t in cparts or t in mainpart: - res.pos = t - break - res.main = ' '.join(mainpart) - G.append(res) +@functools.total_ordering +@dataclasses.dataclass(frozen=True) +class SimilarPair: + """Information about a pair of similar glosses in two conceptlists.""" + from_key: int + to_key: int + similarity: Similarity + frequency: int - return G + def __lt__(self, other): + """ + Order from best to worst. + Smaller level is better. Higher frequency is better. + """ + return (self.similarity, -self.frequency) < (other.similarity, -other.frequency) -def concept_map2(from_, to, freqs=None, language='en', **_): - # get frequencies - freqs = freqs or collections.defaultdict(int) +@dataclasses.dataclass +class Mapping: + """ + Items of a conceptlist can be associated with a Mapping, identifying similar items in a + different list, + """ + to_keys: Union[list[int]] = dataclasses.field(default_factory=list) + similarity: Similarity = Similarity.DIFFERENT + + def sort_keys(self, sortkey: Callable[[int], Any]): + """Sort keys in the mapping according to sortkey.""" + self.to_keys = sorted(self.to_keys, key=sortkey, reverse=True) + + +class MappingDict(dict): + """Map conceptlist items identified by index to a Mapping""" + def get_mapping(self, item: int) -> Mapping: + """Get the associated mapping or the default, i.e. "null" mapping.""" + return self.get(item, Mapping()) + + +ListIdentifierType = Literal["from_list", "to_list"] + + +@dataclasses.dataclass +class GlossMapper: + """Bundle functionality to map glosses with the data from two concept lists.""" + from_list: GlossDictType = dataclasses.field(default_factory=dict) + to_list: GlossDictType = dataclasses.field(default_factory=dict) + mapped: dict[str, dict[ListIdentifierType, list[int]]] = dataclasses.field( + default_factory=lambda: collections.defaultdict(lambda: collections.defaultdict(list))) + + def add(self, # pylint: disable=R0913,R0917 + key: ListIdentifierType, + i: int, + glosses: Iterable[Gloss], + pos: Optional[Pos] = None, + frequency: Optional[int] = None): + """Add glosses associated with a concept list item.""" + if pos or frequency: + for gloss in glosses: + gloss.pos = pos + gloss.frequency = frequency + getattr(self, key)[i] = glosses + + for gloss in glosses: + self.mapped[gloss.main][key] += [i] + + def _iter_mapped_values(self) -> Generator[tuple[list[int], list[int]]]: + for v in self.mapped.values(): + if all(arg in v for arg in get_args(ListIdentifierType)): + yield v['from_list'], v['to_list'] + + def _iter_similarpairs( + self, + similarity_level: Similarity, + ) -> Generator[SimilarPair, None, None]: + # now that we have prepared all the glossed list as planned, we compare them item by + # item and check for similarity + for i, fglosses in self.from_list.items(): + for fgloss in fglosses: + for j, tglosses in self.to_list.items(): + for tgloss in tglosses: + sim = fgloss.similarity(tgloss) + if sim and sim <= similarity_level: + yield SimilarPair(i, j, sim, tgloss.frequency) + + def best_matches(self, similarity_level: Similarity) -> MappingDict: + """ + The default matching implementation. + """ + # we keep track of which target concepts have already been chosen as best matches: + best, consumed, alternatives = MappingDict(), set(), collections.defaultdict(list) + # go through *all* matches from best to worst: + for pair in sorted(list(self._iter_similarpairs(similarity_level))): + if pair.from_key not in best and pair.to_key not in consumed: + best[pair.from_key] = Mapping([pair.to_key], pair.similarity) + consumed.add(pair.to_key) + elif pair.to_key not in alternatives[pair.from_key]: + alternatives[pair.from_key].append(pair.to_key) + return best + + def best_matches_2(self) -> MappingDict: + """ + An alternative matching implentation. + """ + mappings = MappingDict() + for from_list, to_list in self._iter_mapped_values(): + for i in from_list: + current = Mapping() + if i in mappings: + current = Mapping(mappings[i].to_keys, mappings[i].similarity) + for j in to_list: + for gloss_a, gloss_b in itertools.product(self.from_list[i], self.to_list[j]): + sim = gloss_a.similarity(gloss_b) + if sim < current.similarity: + current.to_keys = [j] + current.similarity = sim + elif sim == current.similarity: + current.to_keys.append(j) + mappings[i] = current + return mappings + + +def concept_map2( + from_: Iterable[str], + to: Iterable[str], + freqs: Optional[dict[str, int]] = None, + language: Optional[LanguageType] = GlossLanguage.ENGLISH, + **_, +) -> MappingDict: + """ + Match concepts from one list to the concepts of another one, optionally taking into account + frequencies. + """ # extract glossing information from the data - glosses = {'from': collections.defaultdict(list), 'to': collections.defaultdict(list)} - mapped = collections.defaultdict(lambda: collections.defaultdict(list)) - for l_, key in [(from_, 'from'), (to, 'to')]: + glosses = GlossMapper() + key: ListIdentifierType + + for l_, key in [(from_, 'from_list'), (to, 'to_list')]: for i, concept in enumerate(l_): - for gloss in parse_gloss(concept, language=language): - glosses[key][i] += [gloss] - mapped[gloss.main][key] += [i] - mapping = {} - sims = {} - for k, v in mapped.items(): - if 'from' in v and 'to' in v: - for i in v['from']: - current_sim = sims.get(i, 10) - best = mapping.get(i, set()) - for j in v['to']: - for glossA in glosses['from'][i]: - for glossB in glosses['to'][j]: - sim = glossA.similarity(glossB) or 10 - if sim < current_sim: - best = {j} - current_sim = sim - elif sim == current_sim: - best.add(j) - mapping[i] = best - sims[i] = current_sim - for i in mapping: - mapping[i] = ( - sorted( - mapping[i], key=lambda x: freqs.get(to[x].split('///')[0], 0), - reverse=True), - sims[i]) - return mapping - - -def concept_map(from_: typing.Iterable[typing.Union[typing.Tuple[str, str, float], str]], - to: typing.Iterable[typing.Union[typing.Tuple[str, str, float], str]], - similarity_level=5, - language='en', - **kw) -> typing.Dict[int, typing.Tuple[typing.List[int], int]]: + glosses.add(key, i, parse_gloss(concept, language=language)) + + freqs = freqs or collections.defaultdict(int) + mappings = glosses.best_matches_2() + for m in mappings.values(): + m.sort_keys(lambda x: freqs.get(to[x].split('///')[0], 0)) + return mappings + + +def concept_map( + from_: Iterable[Union[tuple[str, str, float], str]], + to: Iterable[Union[tuple[str, str, float], str]], + similarity_level: Similarity = Similarity.SAME_LONGEST, + language: Optional[LanguageType] = GlossLanguage.ENGLISH, +) -> MappingDict: """ Function compares two concept lists and outputs suggestions for mapping. @@ -269,38 +531,91 @@ def concept_map(from_: typing.Iterable[typing.Union[typing.Tuple[str, str, float textform or in other forms. """ # extract glossing information from the data - glosses = {'from': {}, 'to': {}} - for l_, key in [(from_, 'from'), (to, 'to')]: + glosses = GlossMapper() + key: ListIdentifierType + for l_, key in [(from_, 'from_list'), (to, 'to_list')]: for i, concept in enumerate(l_): if isinstance(concept, tuple): concept, pos, frequency = concept else: pos, frequency = None, 0 - glosses[key][i] = parse_gloss(concept, language=language) - if pos or frequency: - for gloss in glosses[key][i]: - gloss.pos = pos - gloss.frequency = frequency - # now that we have prepared all the glossed list as planned, we compare them item by - # item and check for similarity - sims = [] - for i, fglosses in glosses['from'].items(): - for fgloss in fglosses: - for j, tglosses in glosses['to'].items(): - for tgloss in tglosses: - sim = fgloss.similarity(tgloss) - if sim and sim <= similarity_level: - sims.append((i, j, sim, tgloss.frequency)) - - # we keep track of which target concepts have already been chosen as best matches: - best, consumed, alternatives = {}, set(), collections.defaultdict(list) - - # go through *all* matches from best to worst: - for i, j, sim, frequency in sorted(sims, key=lambda x: (x[2], -x[3])): - if i not in best and j not in consumed: - best[i] = ([j], sim) - consumed.add(j) - elif j not in alternatives[i]: - alternatives[i].append(j) - - return best + glosses.add( + key, i, parse_gloss(concept, language=language), pos=pos, frequency=frequency) + + return glosses.best_matches(similarity_level) + + +@dataclasses.dataclass +class MapOptions: + """Bag of options informing the mapping of glosses.""" + language: GlossLanguage = GlossLanguage.ENGLISH + full_search: bool = False + similarity_level: Similarity = Similarity.SAME_LONGEST + skip_multiple: bool = False + + def __post_init__(self): + if isinstance(self.language, str): + self.language = GlossLanguage.from_string(self.language) or GlossLanguage.ENGLISH + + +def map_list( + clist, + to, + out=None, + options: MapOptions = MapOptions(), +): + """Map items in a conceptlist to concepticon.""" + assert clist.exists(), f"File {clist} does not exist" + from_ = read_dicts(clist) + + language = GlossLanguage.from_string(options.language) + gloss = language.name if language else 'GLOSS' + cmap: MappingDict = (concept_map if options.full_search else concept_map2)( + [i.get('GLOSS', i.get(gloss)) for i in from_], + [i[1] for i in to], + similarity_level=options.similarity_level, + language=language, + ) + good_matches = 0 + + with UnicodeWriter(out) as writer: + writer.writerow( + list(from_[0].keys()) + + ['CONCEPTICON_ID', 'CONCEPTICON_GLOSS', 'SIMILARITY']) + for i, item in enumerate(from_): + row = list(item.values()) + mapping = cmap.get_mapping(i) + if mapping.similarity <= options.similarity_level: + good_matches += 1 + _map_row(row, mapping, to, options, writer) + writer.writerow( + ['#', f'{good_matches}/{len(from_)}', f'{100 * good_matches / len(from_):.0f}%'] + + (len(from_[0]) - 1) * ['']) + + if out is None: + print(writer.read().decode('utf-8')) + + +def _map_row(row, mapping: Mapping, to, options: MapOptions, writer): + if not mapping.to_keys: + writer.writerow(row + ['', '???', '']) + return + if len(mapping.to_keys) == 1: + row.extend([ + to[mapping.to_keys[0]][0], + to[mapping.to_keys[0]][1].split('///')[0], + mapping.similarity.value]) + writer.writerow(row) + return + assert not options.full_search + # we need a list to retain the order by frequency + visited = [] + for j in mapping.to_keys: + gls, cid = to[j][0], to[j][1].split('///')[0] + if (gls, cid) not in visited: + visited += [(gls, cid)] + if len(visited) > 1: + if not options.skip_multiple: + writer.writeblock(row + [gls, cid, mapping.similarity] for gls, cid in visited) + else: + writer.writerow(row + [visited[0][0], visited[0][1], mapping.similarity]) diff --git a/src/pyconcepticon/models.py b/src/pyconcepticon/models.py index cac775b..9da9bde 100644 --- a/src/pyconcepticon/models.py +++ b/src/pyconcepticon/models.py @@ -1,12 +1,17 @@ +""" +OO wrappers for the data in the Concepticon TSV files. +""" import re import pathlib import operator import warnings import functools import collections +from collections.abc import Generator, Sequence +import dataclasses +from typing import Optional, Any, Union -import attr -from clldutils.apilib import DataObject +import csvw from clldutils.jsonlib import load from csvw.dsv import reader from csvw.metadata import TableGroup, Link @@ -14,9 +19,11 @@ from pyconcepticon.util import split, split_ids, read_dicts, to_dict __all__ = [ - 'Languoid', 'Concept', 'Conceptlist', 'ConceptRelations', 'Conceptset', 'Metadata', + 'Languoid', 'Concept', 'Conceptlist', 'ConceptRelations', 'Conceptset', 'REF_PATTERN', 'MD_SUFFIX'] +RelationsType = dict[str, dict[str, Union[str, set[str]]]] + CONCEPTLIST_ID_PATTERN = re.compile( '(?P[A-Za-z]+)-(?P[0-9]+)-(?P[0-9]+)(?P[a-z]?)$') REF_PATTERN = re.compile(':ref:(?P[a-zA-Z0-9-]+)') @@ -27,81 +34,84 @@ CONCEPT_NETWORK_COLUMNS = {c + '_CONCEPTS': c != 'LINKED' for c in ["TARGET", "SOURCE", "LINKED"]} -@attr.s -class Languoid(object): - name = attr.ib(converter=lambda s: s.lower()) - glottocode = attr.ib() - iso2 = attr.ib() - - -class Bag(DataObject): - @classmethod - def public_fields(cls): - return [n for n in cls.fieldnames() if not n.startswith('_')] - +@dataclasses.dataclass +class Languoid: + """A bag of attributes identifying a languoid.""" + name: str + glottocode: str + iso2: str -def valid_int(attr_name, value): - try: - int(value) - except ValueError: # pragma: no cover - raise ValueError('invalid integer {0}: {1}'.format(attr_name, value)) + def __post_init__(self): + self.name = self.name.lower() -def valid_conceptlist_id(instance, attribute, value): - if not instance.local: - if not CONCEPTLIST_ID_PATTERN.match(value): - raise ValueError('invalid {0}.{1}: {2}'.format( - instance.__class__.__name__, - attribute.name, - value)) - +@dataclasses.dataclass +class Bag: + """Mixin class to make access to dataclass fields simpler.""" + @classmethod + def fieldnames(cls): # pylint: disable=C0116 + return [f.name for f in dataclasses.fields(cls)] -def valid_conceptlist_author(instance, attribute, value): - if value.count(',') > 1 and (not any(s in value for s in [' and ', ' AND '])): - raise ValueError('invalid format for multiple authors: {}'.format(value)) - if any(len(s) > 200 for s in re.split(r'\s+(?:and|AND)\s+', value)): - raise ValueError('suspiciously long author name in {}'.format(value)) + @classmethod + def public_fields(cls) -> list[str]: # pylint: disable=C0116 + return [n for n in cls.fieldnames() if not n.startswith('_')] -def valid_key(instance, attribute, value): +def valid_key(instance: object, attribute: str, value: Union[str, list[str], tuple[str]]): + """Raises ValueError on invalid value.""" vocabulary = None - if hasattr(instance._api, 'vocabularies'): - vocabulary = instance._api.vocabularies[attribute.name.upper()] + if hasattr(instance._api, 'vocabularies'): # pylint: disable=W0212 + vocabulary = instance._api.vocabularies[attribute.upper()] # pylint: disable=W0212 if value and vocabulary: if not isinstance(value, (list, tuple)): value = [value] if not all(v in vocabulary for v in value): - raise ValueError('invalid {0}.{1}: {2}'.format( - instance.__class__.__name__, - attribute.name, - value)) + raise ValueError(f'invalid {instance.__class__.__name__}.{attribute}: {value}') -@attr.s +@dataclasses.dataclass class Conceptset(Bag): - id = attr.ib() - gloss = attr.ib() - semanticfield = attr.ib(validator=valid_key) - definition = attr.ib() - ontological_category = attr.ib(validator=valid_key) - replacement_id = attr.ib() - _api = attr.ib(default=None) + """A Concepticon Concept Set, i.e. a row in concepticon.tsv.""" + id: str + gloss: str + semanticfield: str + definition: str + ontological_category: str + replacement_id: str + _api: Any = None + + def __post_init__(self): + valid_key(self, 'semanticfield', self.semanticfield) + valid_key(self, 'ontological_category', self.ontological_category) @property - def superseded(self): + def superseded(self) -> bool: + """If a conceptset has a replacement, it's superseded.""" return bool(self.replacement_id) @property - def replacement(self): + def replacement(self) -> Optional['Conceptset']: + """The conceptset that replaces self - or None.""" if self._api and self.replacement_id: return self._api.conceptsets[self.replacement_id] + return None # pragma: no cover @functools.cached_property - def relations(self): + def relations(self) -> dict[str, str]: + """ + >>> c = Concepticon('src/pyconcepticon/test_repos') + >>> c.conceptsets['2461'].relations + {'2460': 'narrower', '2448': 'narrower', '522': 'narrower', '2009': 'narrower'} + """ return self._api.relations.get(self.id, {}) if self._api else {} @functools.cached_property - def concepts(self): + def concepts(self) -> list['Concept']: + """ + >>> c = Concepticon('src/pyconcepticon/test_repos') + >>> c.conceptsets['1360'].concepts[0].id + 'Sun-1991-1004-138' + """ res = [] if self._api: for clist in self._api.conceptlists.values(): @@ -111,22 +121,6 @@ def concepts(self): return res -@attr.s -class Metadata(Bag): - id = attr.ib() - meta = attr.ib(default=attr.Factory(dict)) - values = attr.ib(default=attr.Factory(dict)) - - -def valid_concept(instance, attribute, value): - if not value: - raise ValueError('missing concept id %s' % instance) - if not re.match('[0-9]+.*', instance.number): - raise ValueError('invalid concept number: %s' % instance) - if not instance.label: - raise ValueError('fields gloss *and* english missing: %s' % instance) - - _INVERSE_RELATIONS = {'broader': 'narrower'} _INVERSE_RELATIONS.update({v: k for k, v in _INVERSE_RELATIONS.items()}) @@ -136,7 +130,7 @@ class ConceptRelations(dict): Class handles relations between concepts. """ def __init__(self, path, multiple=False): - rels = collections.defaultdict(lambda: collections.defaultdict(set)) + rels: RelationsType = collections.defaultdict(lambda: collections.defaultdict(set)) self.raw = list(read_dicts(path)) for item in self.raw: if multiple: @@ -155,9 +149,17 @@ def __init__(self, path, multiple=False): _INVERSE_RELATIONS[item['RELATION']] rels[item['TARGET_GLOSS']][item['SOURCE_GLOSS']] = \ _INVERSE_RELATIONS[item['RELATION']] - dict.__init__(self, rels.items()) - - def iter_related(self, concept, relation, max_degree_of_separation=2): + dict.__init__( + self, + ((k, {x: y for x, y in v.items()}) for k, v in rels.items()) # pylint: disable=R1721 + ) + + def iter_related( + self, + concept: str, + relation: str, + max_degree_of_separation: int = 2, + ) -> Generator[tuple[str, int], None, None]: """ Search for concept relations of a given concept. @@ -176,48 +178,102 @@ def iter_related(self, concept, relation, max_degree_of_separation=2): yield target, depth -@attr.s -class Concept(Bag): - id = attr.ib(validator=valid_concept) - number = attr.ib() - concepticon_id = attr.ib( - default=None, converter=lambda s: s if s is None else '{0}'.format(s)) - concepticon_gloss = attr.ib(default=None) - gloss = attr.ib(default=None) - english = attr.ib(default=None) - attributes = attr.ib(default=attr.Factory(dict)) - _list = attr.ib(default=None) +@dataclasses.dataclass +class Concept(Bag): # pylint: disable=R0902 + """Concepts are the items in conceptlists.""" + id: str + number: str + concepticon_id: Optional[str] = None + concepticon_gloss: Optional[str] = None + gloss: Optional[str] = None + english: Optional[str] = None + attributes: dict = dataclasses.field(default_factory=dict) + _list: Any = None + + def __post_init__(self): + if not self.id: + raise ValueError(f'missing concept id {self}') + if not re.match('[0-9]+.*', self.number): + raise ValueError(f'invalid concept number: {self}') + if not self.label: + raise ValueError(f'fields gloss *and* english missing: {self}') + + self.concepticon_id = self.concepticon_id \ + if self.concepticon_id is None else f'{self.concepticon_id}' @property - def label(self): + def label(self) -> str: + """A description of the concept.""" return self.gloss or self.english @functools.cached_property - def cols(self): + def cols(self) -> list[str]: + """Column names of the concept list to which the concept belongs.""" return Concept.public_fields() + list(self.attributes.keys()) -@attr.s -class Conceptlist(Bag): - _api = attr.ib() - id = attr.ib(validator=valid_conceptlist_id) - author = attr.ib(validator=valid_conceptlist_author) - year = attr.ib(converter=int, validator=lambda i, a, v: valid_int(a, v)) - list_suffix = attr.ib() - items = attr.ib(converter=int, validator=lambda i, a, v: valid_int(a, v)) - tags = attr.ib(converter=split_ids, validator=valid_key) - source_language = attr.ib(converter=lambda v: split(v.lower())) - target_language = attr.ib() - url = attr.ib() - refs = attr.ib(converter=split_ids) - pdf = attr.ib(converter=split_ids) - note = attr.ib() - pages = attr.ib() - alias = attr.ib(converter=lambda s: [] if s is None else split(s)) - local = attr.ib(default=False) +@dataclasses.dataclass(frozen=True) +class ConceptStats: + """Summary statistics on concepts.""" + mapped: list[Concept] + mapped_ratio_percent: int + mergers: list[tuple[str, int]] + + @classmethod + def from_concepts(cls, concepts: Sequence[Concept]) -> 'ConceptStats': + """Compute stats on a bunch of concepts.""" + mapped = [c for c in concepts if c.concepticon_id] + mapped_ratio = 0 + if concepts: + mapped_ratio = int((len(mapped) / len(concepts)) * 100) + concepticon_ids = collections.Counter(c.concepticon_id for c in concepts) + mergers = [(k, v) for k, v in concepticon_ids.items() if k and v > 1] + return cls(mapped, mapped_ratio, mergers) + + +@dataclasses.dataclass +class Conceptlist(Bag): # pylint: disable=R0902 + """Concept lists are the core entities of the Concepticon.""" + _api: Any + id: str + author: str + year: int + list_suffix: str + items: int + tags: Union[str, list[str]] + source_language: Union[str, list[str]] + target_language: str + url: str + refs: Union[str, list[str]] + pdf: Union[str, list[str]] + note: str + pages: str + alias: Union[str, list[str]] + local: bool = False + + def __post_init__(self): + if not self.local: + if not CONCEPTLIST_ID_PATTERN.match(self.id): + raise ValueError(f'Conceptlist.id: {self.id}') + + if self.author.count(',') > 1 and (not any(s in self.author for s in [' and ', ' AND '])): + raise ValueError(f'invalid format for multiple authors: {self.author}') + if any(len(s) > 200 for s in re.split(r'\s+(?:and|AND)\s+', self.author)): + raise ValueError(f'suspiciously long author name in {self.author}') + + self.year = int(self.year) + self.items = int(self.items) + self.tags = split_ids(self.tags) + valid_key(self, 'tags', self.tags) + if isinstance(self.source_language, str): + self.source_language = split(self.source_language.lower()) + self.refs = split_ids(self.refs) + self.pdf = split_ids(self.pdf) + self.alias = [] if self.alias is None else split(self.alias) @functools.cached_property - def tg(self): + def tg(self) -> csvw.TableGroup: + """A CSVW TableGroup instance describing the TSV file of the list.""" md = self.path.parent.joinpath(self.path.name + MD_SUFFIX) if not md.exists(): if hasattr(self._api, 'repos'): @@ -233,40 +289,48 @@ def tg(self): tg = TableGroup.from_file(md, data=metadata) if isinstance(self._api, pathlib.Path): - tg._fname = self._api.parent.joinpath(self._api.name + MD_SUFFIX) - tg.tables[0].url = Link('{0}.tsv'.format(self.id)) + tg._fname = self._api.parent.joinpath( # pylint: disable=W0212 + self._api.name + MD_SUFFIX) + tg.tables[0].url = Link(f'{self.id}.tsv') return tg @functools.cached_property - def metadata(self): + def metadata(self) -> csvw.Table: + """CSVW metadata for the TSV file of the conceptlist.""" return self.tg.tables[0] @property - def path(self): + def path(self) -> pathlib.Path: + """Path of the TSV file of the conceptlist.""" if isinstance(self._api, pathlib.Path): return self._api return self._api.data_path('conceptlists', self.id + '.tsv') @functools.cached_property - def cols_in_list(self): + def cols_in_list(self) -> list[str]: + """Actual column names in the TSV file of the conceptlist.""" return list(next(reader(self.path, dicts=True, delimiter='\t')).keys()) @functools.cached_property - def attributes(self): + def attributes(self) -> list[str]: + """Attributes are additional, non-standard columns in a conceptlist.""" return [c.name for c in self.metadata.tableSchema.columns if c.name.lower() not in Concept.public_fields()] @functools.cached_property - def concepts(self): + def concepts(self) -> dict[str, Concept]: + """List of concepts which are mapped to this conceptset.""" res = [] if self.path.exists(): for item in self.metadata: + # Partition the data read from the TSV table for instantiation of a Concept. kw, attributes = {}, {} for k, v in item.items(): if k: kl = k.lower() - operator.setitem(kw if kl in Concept.public_fields() else attributes, kl, v) - res.append(Concept(list=self, attributes=attributes, **kw)) + d = kw if kl in Concept.public_fields() else attributes + operator.setitem(d, kl, v) + res.append(Concept(_list=self, attributes=attributes, **kw)) return to_dict(res) @classmethod @@ -284,17 +348,9 @@ def from_file(cls, path, **keywords): items=keywords.get('items', len(read_dicts(path))), year=keywords.get('year', 0), local=True) - return cls(api=path, **attrs) + return cls(_api=path, **attrs) - def stats(self): + def stats(self) -> ConceptStats: """Return simple statistics for a given concept list""" # @todo: refine for custom-concept lists - concepts = self.concepts.values() - mapped = [c for c in concepts if c.concepticon_id] - mapped_ratio = 0 - if concepts: - mapped_ratio = int((len(mapped) / len(concepts)) * 100) - concepticon_ids = collections.Counter( - [c.concepticon_id for c in concepts if c.concepticon_id]) - mergers = [(k, v) for k, v in concepticon_ids.items() if v > 1] - return mapped, mapped_ratio, mergers + return ConceptStats.from_concepts(self.concepts.values()) diff --git a/src/pyconcepticon/test_repos/concepticondata/conceptlists/Sun-1991-1004/convert.py b/src/pyconcepticon/test_repos/concepticondata/conceptlists/Sun-1991-1004/convert.py index c74f88e..df139e8 100644 --- a/src/pyconcepticon/test_repos/concepticondata/conceptlists/Sun-1991-1004/convert.py +++ b/src/pyconcepticon/test_repos/concepticondata/conceptlists/Sun-1991-1004/convert.py @@ -1,7 +1,10 @@ +""" +test +""" import pathlib d = pathlib.Path(__file__).resolve().parent -t = d.parent.joinpath('{}.tsv'.format(d.name)).read_text(encoding='utf8') +t = d.parent.joinpath(f'{d.name}.tsv').read_text(encoding='utf8') t = t.replace( '[{"ID":"Sun-1991-1004-83","DEGREE":7}]', '[{"ID":"Sun-1991-1004-87","DEGREE":7}]') -d.joinpath('{}.tsv'.format(d.name)).write_text(t, encoding='utf8') +d.joinpath(f'{d.name}.tsv').write_text(t, encoding='utf8') diff --git a/src/pyconcepticon/test_util.py b/src/pyconcepticon/test_util.py index 9c9ac70..6b50bdb 100644 --- a/src/pyconcepticon/test_util.py +++ b/src/pyconcepticon/test_util.py @@ -1,9 +1,13 @@ +""" +Functionality to provide a Concepticon API for testing purposes. +""" import pathlib TEST_REPOS = pathlib.Path(__file__).parent / 'test_repos' def get_test_api(): - from pyconcepticon import Concepticon + """Returns a Concepticon API object to access the data in the test repos.""" + from pyconcepticon import Concepticon # pylint: disable=C0415 return Concepticon(TEST_REPOS) diff --git a/src/pyconcepticon/util.py b/src/pyconcepticon/util.py index ec13e75..9fea405 100644 --- a/src/pyconcepticon/util.py +++ b/src/pyconcepticon/util.py @@ -1,21 +1,28 @@ +""" +Utilities. +""" import re import json import pathlib import operator import functools import collections +from collections.abc import Sequence, Iterable +from typing import Any, Optional, Union, TypeVar, Callable +from pycdstar.resource import Object from clldutils import jsonlib from csvw import dsv -import pyconcepticon - __all__ = [ - 'natural_sort', 'to_dict', 'SourcesCatalog', 'UnicodeWriter', 'visit', - 'load_conceptlist', 'write_conceptlist', 'read_dicts', 'ConceptlistWithNetworksWriter'] + 'natural_sort', 'to_dict', 'SourcesCatalog', 'UnicodeWriter', 'reader', 'read_dicts', + 'ConceptlistWithNetworksWriter'] + +T = TypeVar('T') +PathType = Union[str, pathlib.Path] -REPOS_PATH = pathlib.Path(pyconcepticon.__file__).parent.parent -PKG_PATH = pathlib.Path(pyconcepticon.__file__).parent +REPOS_PATH = pathlib.Path(__file__).parent.parent +PKG_PATH = pathlib.Path(__file__).parent ID_SEP_PATTERN = re.compile(r'[.,;]') PREFIX = 'CONCEPTICON' CS_GLOSS = PREFIX + '_GLOSS' @@ -25,7 +32,10 @@ rewrite = functools.partial(dsv.rewrite, delimiter='\t') -def to_dict(iterobjects, key=operator.attrgetter('id')): +def to_dict( + iterobjects: Iterable[T], + key: Callable[[T], str] = operator.attrgetter('id'), +) -> collections.OrderedDict[str, T]: """ Turns an iterable into an `OrderedDict` mapping unique keys to items. @@ -41,39 +51,43 @@ def to_dict(iterobjects, key=operator.attrgetter('id')): if keys: k, n = keys.most_common(1)[0] if n > 1: - raise ValueError('non-unique key: %s' % k) + raise ValueError(f'non-unique key: {k}') return res -def read_all(fname, **kw): +def read_all(fname: PathType, **kw) -> list[dict[str, str]]: + """Read all rows in a TSV file.""" + kw['dicts'] = True kw.setdefault('delimiter', '\t') - if not kw.get('dicts'): - kw.setdefault('namedtuples', True) return list(dsv.reader(fname, **kw)) -def read_dicts(fname, schema=None, **kw): - kw['dicts'] = True +def read_dicts(fname: PathType, schema=None, **kw) -> list[dict[str, Union[str, int, float]]]: + """Read TSV rows a lightly typed dicts.""" res = read_all(fname, **kw) if schema: def identity(x): return x colspec = {} for col in schema['columns']: - conv = { - 'integer': int, - 'float': float, - }.get(col['datatype']) + conv = {'integer': int, 'float': float}.get(col['datatype']) colspec[col['name']] = conv or identity res = [{k: colspec.get(k, identity)(v) for k, v in d.items()} for d in res] return res +def reader(p, **kw): + """Convenience wrapper prepping dsv.reader for tab-separated values.""" + kw.setdefault('delimiter', '\t') + return dsv.reader(p, **kw) + + class UnicodeWriter(dsv.UnicodeWriter): + """A tab-separated values writer with a custom method.""" def __init__(self, *args, **kw): kw.setdefault('delimiter', '\t') self._rownum = None - super(UnicodeWriter, self).__init__(*args, **kw) + super().__init__(*args, **kw) def writerow(self, row): if self._rownum is None: @@ -81,6 +95,9 @@ def writerow(self, row): dsv.UnicodeWriter.writerow(self, row) def writeblock(self, rows, start='#<<<', end='#>>>'): + """ + Write "commented" rows, i.e. rows enclosed in a start and an end row of a particular format. + """ assert self._rownum self.writerow([start] + (self._rownum - 1) * ['']) for row in rows: @@ -88,81 +105,37 @@ def writeblock(self, rows, start='#<<<', end='#>>>'): self.writerow([end] + (self._rownum - 1) * ['']) -def lowercase(d): +def lowercase(d: dict[str, Any]) -> dict[str, Any]: + """Lowercases first-level dict keys.""" return {k.lower(): v for k, v in d.items()} -def unique(iterable): +def unique(iterable: Iterable[T]) -> list[T]: + """List of unique items in iterable.""" return list(sorted(set(i for i in iterable if i))) -def split(s, sep=','): +def split(s: str, sep: str = ',') -> list[str]: + """Unique items separated by sep in s.""" return unique(ss.strip() for ss in s.split(sep) if ss.strip()) -def split_ids(s): +def split_ids(s: str) -> list[str]: + """Unique IDs in s.""" return unique(id_.strip() for id_ in ID_SEP_PATTERN.split(s) if id_.strip()) -def visit(visitor, fname): - return rewrite(fname, visitor) - - -def load_conceptlist(idf): +def natural_sort(string: Sequence[str]) -> list[str]: """ - Load a concept list and display it as a complex dictionary (json-style). - - :rtype: dict / - A dictionary with IDs as keys and OrderedDicts with the data from the row as / - values. Duplicate links are passed as "splits" in a specific entry of the / - dictionary (named "splits"). + >>> natural_sort(['b123', 'a234']) + ['a234', 'b123'] """ - data = read_dicts(idf) - if data: - clist = dict(header=list(data[0].keys()), splits=[], mergers=[]) - cidxs = collections.defaultdict(list) - - previous_item = None - for item in data: - if item['ID'] and item['ID'] not in clist: - previous_item = clist[item['ID']] = item - else: - # a concept without ID or with duplicate ID - if previous_item: - # complete data in item with that of the previous one (?) - for k, v in previous_item.items(): - if not item[k]: - item[k] = v - clist['splits'].append(item) - else: # pragma: no cover - raise ValueError("item {0} is wrong".format(item)) - cidxs[previous_item[CS_ID]].append(previous_item['ID']) - - clist['mergers'] = [cidxs[k] for k in cidxs if len(cidxs[k]) > 1] - return clist - - -def natural_sort(string): def alphanum_key(key): return [int(c) if c.isdigit() else c.lower() for c in re.split('([0-9]+)', key)] return sorted(string, key=alphanum_key) -def write_conceptlist(clist, filename, header=False): - """ - Write conceptlist to file. - """ - header = header or clist['header'] - keys = natural_sort(list(clist.keys())) - with UnicodeWriter(filename) as writer: - writer.writerow(header) - for k in keys: - v = clist[k] - if k not in ['splits', 'mergers', 'header']: - writer.writerow([v[h] for h in header]) - - class ConceptlistWithNetworksWriter(list): """ Support for writing conceptlists which contain concept networks. @@ -174,32 +147,35 @@ def __init__(self, name): def __enter__(self): return self - def __exit__(self, type, value, traceback): + def __exit__(self, *_): assert self, 'empty list' header = list(self[0].keys()) if 'NUMBER' not in header: header.insert(0, 'NUMBER') header.insert(0, 'ID') - with UnicodeWriter('{}.tsv'.format(self.name), delimiter="\t") as writer: + with UnicodeWriter(f'{self.name}.tsv', delimiter="\t") as writer: writer.writerow(header) for i, row in enumerate(self, start=1): if 'NUMBER' not in row: row['NUMBER'] = str(i) - row['ID'] = '{}-{}'.format(self.name, row['NUMBER']) + row['ID'] = f"{self.name}-{row['NUMBER']}" writer.writerow([ json.dumps(row[key]) if key.endswith('_CONCEPTS') else row[key] for key in header]) -class SourcesCatalog(object): +class SourcesCatalog: + """A catalog for the metadata of conceptlist sources.""" def __init__(self, path): self.path = pathlib.Path(path) - self.items = jsonlib.load(self.path) if self.path.exists() else {} + self.items: dict[str, dict[str, Any]] = {} + if self.path.exists(): + self.items = jsonlib.load(self.path) def __contains__(self, item): return item in self.items - def get(self, item): + def get(self, item) -> Optional[dict[str, Any]]: # pylint: disable=C0116 return self.items.get(item) def __enter__(self): @@ -208,15 +184,16 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): jsonlib.dump( collections.OrderedDict( - [(k, collections.OrderedDict([i for i in sorted(v.items())])) + [(k, collections.OrderedDict(sorted(v.items()))) for k, v in sorted(self.items.items())]), self.path, indent=4) - def add(self, key, obj): + def add(self, key: str, obj: Object) -> dict[str, Any]: + """Add the metadata of a pycdstar Object to the catalog.""" bsid = obj.bitstreams[0].id self.items[key] = collections.OrderedDict([ - ('url', 'https://cdstar.eva.mpg.de/bitstreams/{0}/{1}'.format(obj.id, bsid)), + ('url', f'https://cdstar.eva.mpg.de/bitstreams/{obj.id}/{bsid}'), ('objid', obj.id), ('original', bsid), ('size', obj.bitstreams[0].size), diff --git a/tests/test_api.py b/tests/test_api.py index a93dbab..0ad70a1 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -37,7 +37,7 @@ def test_Conceptlist(fixturedir, api): with pytest.raises(ValueError): Conceptlist( - api=None, + _api=None, id='xy', author='x', year='1234', @@ -60,7 +60,7 @@ def test_Conceptset(api): d = {a: '' for a in Conceptset.public_fields()} d['semanticfield'] = 'xx' - d['api'] = api + d['_api'] = api with pytest.raises(ValueError): Conceptset(**d) diff --git a/tests/test_commands.py b/tests/test_commands.py index ce244d0..50cc1c0 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -35,7 +35,8 @@ def test_citation(capsys, _main, tmprepos): def test_recreate_networks(capsys, _main): _main('recreate_networks', '--diff') out, _ = capsys.readouterr() - assert 'Sun-1991-1004-79' in out + assert '-- Sun-1991-1004-83' in out + assert '++ Sun-1991-1004-87' in out _main('recreate_networks') @@ -59,6 +60,12 @@ def test_rename(capsys, _main, tmprepos): _main('create_metadata') _main('rename', 'Sun-1991-1004', 'Moon-2011-234') assert tmprepos.joinpath('concepticondata/conceptlists/Moon-2011-234.tsv').exists() + for d in read_all(tmprepos.joinpath('concepticondata', 'conceptlists.tsv')): + assert 'Sun-1991-1004' not in str(d) + if d['ID'] == 'Moon-2011-234': + break + else: + assert False, 'New ID not found!' # pragma: no cover def test_graph(capsys, _main, tmprepos): @@ -94,7 +101,7 @@ def test_make_linkdata(tmprepos, _main, caplog): with caplog.at_level(logging.INFO): _main('make_linkdata') assert caplog.records - assert 'checking' in caplog.records[-1].message + assert 'Languoid' in caplog.records[-1].message assert tmprepos.joinpath('mappings').exists() @@ -117,7 +124,6 @@ def test_check(api, capsys, tmp_path, _main): test.write_text(t.replace('Sun-1991-1004-1', 'Sun-1991-1004-2'), encoding='utf8') _main('check', str(test)) out, err = capsys.readouterr() - print(out) assert 'Sun-1991-1004-2 ' in out @@ -141,7 +147,7 @@ def test_link(fixturedir, tmp_path, capsys, _main): _main('link', '.') def nattr(p, attr): - return len(nfilter([getattr(i, attr, None) for i in read_all(str(p))])) + return len(nfilter([i.get(attr) for i in read_all(str(p))])) test = tmp_path / 'test.tsv' shutil.copy(fixturedir.joinpath('conceptlist.tsv'), test) diff --git a/tests/test_glosses.py b/tests/test_glosses.py index aeea2eb..56dd5c2 100644 --- a/tests/test_glosses.py +++ b/tests/test_glosses.py @@ -1,6 +1,21 @@ +import posix + import pytest from pyconcepticon.glosses import * +from pyconcepticon.glosses import ParseSpec + + +def test_ParseSpec_parse_constituent(): + spec = ParseSpec.for_language('en') + gloss, pos = spec.parse_constituent('full gloss', 'word [with (nested) comment]', '') + assert gloss.comment_start == '[' + assert gloss.comment_end == ']' + assert gloss.comment == 'with (nested) comment' + + +def test_Similarity(): + assert Similarity.from_int(1) == Similarity.SAME @pytest.mark.parametrize( @@ -31,21 +46,26 @@ def test_parse_gloss(g, res): def test_parse_gloss_2(): - assert parse_gloss('the mountain or hill')[1].pos == 'noun' + assert parse_gloss('the mountain or hill')[1].pos == Pos.NOUN - g = Gloss.from_string('the mountain or hill') - assert g.tokens == 'the mountain hill' + g1 = Gloss.from_string('der Berg', language='de') + g2 = Gloss.from_string('Berg (n.)') + assert g1.similarity(g2) == Similarity.SAME_MAIN g1 = Gloss.from_string('der Berg', language='de') g2 = Gloss.from_string('Berg') - assert g1.similarity(g2) == 4 + assert g1.similarity(g2) == Similarity.SAME_MAIN_DIFFERENT_POS + + g1 = Gloss.from_string('der Berg a', language='de') + g2 = Gloss.from_string('Berg b (n.)') + assert g1.similarity(g2) == Similarity.SAME_LONGEST g = Gloss.from_string('la montagne', language='fr') - assert g.pos == 'noun' + assert g.pos == Pos.NOUN g1 = Gloss.from_string('montagne', language='fr') g2 = Gloss.from_string('la montagne', language='fr') - assert g1.similarity(g2) == 4 + assert g1.similarity(g2) == Similarity.SAME_MAIN_DIFFERENT_POS # error on invalid gloss with pytest.raises(ValueError): @@ -57,7 +77,7 @@ def test_parse_gloss_2(): def test_concept_map(): f, t = ['the dog', 'to kill'], ['kill', 'dog (verb)', 'to kill'] - assert concept_map(f, t) == {0: ([1], 4), 1: ([2], 1)} + assert concept_map(f, t) == {0: Mapping([1], 4), 1: Mapping([2], 1)} assert 0 not in concept_map(f, t, similarity_level=1) - assert concept_map([('house', 'noun', 5)], [('house', 'noun', 4)]) == {0: ([0], 1)} + assert concept_map([('house', 'noun', 5)], [('house', 'noun', 4)]) == {0: Mapping([0], 1)} diff --git a/tests/test_models.py b/tests/test_models.py index 8cb31a1..53fcc1e 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -11,7 +11,7 @@ def sun1991(tmprepos): def test_Conceptlist(sun1991, api): def kw(**kwargs): res = dict( - api=api, + _api=api, id='Abc-1234-12', author='Some One', year='1234', diff --git a/tests/test_util.py b/tests/test_util.py index 22dbd8d..22bbc0f 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -33,22 +33,6 @@ def test_to_dict(): to_dict([None, None], id) -def test_load_conceptlist(tmp_path): - fname = tmp_path / 'cl.tsv' - fname.write_text("""\ -ID NUMBER ENGLISH PROTOWORLD CONCEPTICON_ID CONCEPTICON_GLOSS -Bengtson-1994-27-1 1 mother, older femaile relative AJA 1216 MOTHER -Bengtson-1994-27-1 2 knee, to bend BU(N)KA 1371 -""", encoding='utf8') - - res = load_conceptlist(fname) - assert res['splits'] - out = tmp_path / 'clist' - write_conceptlist(res, out) - assert out.read_text('utf8') - visit(lambda l, r: r, str(fname)) - - def test_SourcesCatalog(tmp_path): cat_path = tmp_path / 'test.json' with SourcesCatalog(cat_path) as cat: