diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index fa08c7bb..a4f578b5 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -23,7 +23,6 @@ repos:
- id: fix-byte-order-marker
- id: name-tests-test
args: [ '--pytest-test-first' ]
- exclude: ^tests/_duplicates.py$
- id: no-commit-to-branch
args: [ '--branch', 'main' ]
- id: trailing-whitespace
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index 27971f4f..69bab7ad 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -20,6 +20,18 @@ Breaking changes
^^^^^^^^^^^^^^^^
* Development dependencies ("dev", "docs") are now installed via the new `dependency-groups` conventions (`PEP 735 `_) (:pull:`419`)
* `prek` is now the suggested pre-commit runner (installed by default via `pip install --group dev`) (:pull:`419`)
+* delete submodule ``src.cdm_reader_mapper.duplicates`` (:issue:`152`, :issue:`283`, :pull:`434`)
+
+ * ``cdm_reader_mapper.DupDetect`` is not importable anymore
+ * ``cdm_reader_mapper.duplicate_check`` is not importable anymore
+ * ``cdm_reader_mapper.DataBundle.duplicate_check`` is not callable anymore
+ * ``cdm_reader_mapper.DataBundle.get_duplicates`` is not callable anymore
+ * ``cdm_reader_mapper.DataBundle.flag_duplicates`` is not callable anymore
+ * ``cdm_reader_mapper.DataBundle.remove_duplicates`` is not callable anymore
+ * ``cdm_reader_mapper.DataBundle`` does not have attribute ``DupDetect`` anymore
+
+* submodule ``src.cdm_reader_mapper.duplicates`` has been moved to `marine_qc `_ (:issue:`283`, :pull:`434`)
+
Internal changes
^^^^^^^^^^^^^^^^
diff --git a/docs/api.rst b/docs/api.rst
index 83e7e1ff..2f2de05a 100755
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -43,9 +43,6 @@ Useful functions
.. autofunction:: cdm_reader_mapper.correct_pt
:noindex:
-.. autofunction:: cdm_reader_mapper.duplicate_check
- :noindex:
-
.. autofunction:: cdm_reader_mapper.map_model
:noindex:
@@ -84,12 +81,3 @@ Useful functions
.. autofunction:: cdm_reader_mapper.write_tables
:noindex:
-
-.. _dupdetect:
-
-DupDetect
-=========
-
-.. autoclass:: cdm_reader_mapper.DupDetect
- :members:
- :noindex:
diff --git a/docs/getting-started.rst b/docs/getting-started.rst
index c3901db2..4c581e1c 100755
--- a/docs/getting-started.rst
+++ b/docs/getting-started.rst
@@ -41,24 +41,7 @@ In this case deck 704: US Marine Meteorological Journal collection of data code:
cdm_tables = db_cdm.data
-4. Detect duplicated observations
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Detect and flag duplicated observations without overwriting the original CDM tables:
-
-.. code-block:: console
-
- db_dup = db.duplicate_check()
-
- db_dup_f = db_dup.flag_duplicates()
-
- flagged_tables = db_dup_f.data
-
- db_dup_r = db_dup.remove_duplicates()
-
- removed_tables = db_dup_r.data
-
-5. Write the output
+4. Write the output
~~~~~~~~~~~~~~~~~~~
This writes the output to an ascii file with a pipe delimited format using the following function:
diff --git a/docs/hyperlinks.rst b/docs/hyperlinks.rst
index 89d11e7c..237eb26a 100755
--- a/docs/hyperlinks.rst
+++ b/docs/hyperlinks.rst
@@ -8,8 +8,6 @@
.. _CDM: https://github.com/glamod/common_data_model/blob/master/cdm_latest.pdf
-.. _CDM code tables for duplicate_status: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/duplicate_status/duplicate_status.html
-
.. _CDM code tables for report_quality: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/quality_flag/quality_flag.html
.. _conda: https://docs.conda.io/en/latest/
diff --git a/docs/index.rst b/docs/index.rst
index 31a2dd3c..fc623f00 100755
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -7,7 +7,6 @@ The **cdm_reader_mapper** toolbox is a python3_ tool designed for
* reading original marine-meteorological data files compliant with a user specified data model (:ref:`data-models`) into a Marine Data Format (MDF) file.
* mapping observed meteorological variables and its associated metadata from a data model (:ref:`data-models`) to the C3S CDS Common Data Model (CDM_) format or **imodel** as called in this tool.
-* to detect and flag or remove duplicated observations
It was developed with the initial idea of reading data from the International Comprehensive Ocean-Atmosphere Data Set (ICOADS_) stored in the International Maritime Meteorological Archive (IMMA_) data format. In the meanwhile, it can read data C-RAID_ Copernicus in situ project too.
@@ -34,9 +33,6 @@ The reader allows for basic transformations of the data. This feature includes `
In addition, the **cdm_reader_mapper.DataBundle** object has several main method functions:
* :py:func:`DataBundle.map_model`: map observed variables and its associated metadata from a data model or models combination to the standardized C3S CDS Common Data Model (CDM_) format.
-* :py:func:`DataBundle.duplicate_check`: detect duplicated observations
-* :py:func:`DataBundle.flag_duplicates`: flag detected duplicated observations
-* :py:func:`DataBundle.remove_duplicates`: remove detected duplicated observations
* :py:func:`DataBundle.write`: save both observational MDF files as a coma-separated list and observational standardized CDM tables as pipe-seperated lists
.. toctree::
diff --git a/docs/tool-overview-databundle.rst b/docs/tool-overview-databundle.rst
index 123849b4..1e98fbeb 100755
--- a/docs/tool-overview-databundle.rst
+++ b/docs/tool-overview-databundle.rst
@@ -84,22 +84,4 @@ Now the meteorological data can be maqpped to the Common Data Model (CDM_) using
For more information how the mapping is working, please see :ref:`tool-overview-mapper` and/or :ref:`how-to-register-a-new-data-model-mapping`.
-:ref:`dupdetect`
-^^^^^^^^^^^^^^^^
-
-After mapping to the CDM format it is useful to check if the CDM tables contain any duplicates. The duplicate checker included in the ``cdm_reader_mapper`` toolbox is based on python record linkage toolkit RecordLinkage_.
-
-The first step is to call the method function :func:`.DataBundle.duplicate_check`. This function scans the CDM tables for any duplicates.
-
-.. code-block:: console
-
- db_dup = db.duplicate_check()
-
-Afterwards there are two options how to deal with the detected duplicates:
-
-1. :func:`.DataBundle.flag_duplicates`
-2. :func:`.DataBundle.remove_duplicates`
-
-The first function flags the detected duplicates. For more information about the flags see `CDM code tables for duplicate_status`_ and `CDM code tables for report_quality`_. The second function removes the detected duplicates.
-
.. include:: hyperlinks.rst
diff --git a/environment-docs.yml b/environment-docs.yml
index fb537a14..e67b4c5b 100755
--- a/environment-docs.yml
+++ b/environment-docs.yml
@@ -57,4 +57,3 @@ dependencies:
- msgpack
- requests
- platformdirs >4.0.0
- - recordlinkage >=0.15
diff --git a/pyproject.toml b/pyproject.toml
index 7a6dda0f..22dab2e4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -96,7 +96,6 @@ dependencies = [
"pandas>=2.2.0",
"platformdirs >4.0.0",
"pyarrow >=15.0.0",
- "recordlinkage >= 0.15",
"requests",
"timezonefinder >6.5.0,<9.0.0",
"xarray >=2023.11.0,!=2024.10.0"
diff --git a/src/cdm_reader_mapper/__init__.py b/src/cdm_reader_mapper/__init__.py
index 93c3c0ca..aebc705c 100755
--- a/src/cdm_reader_mapper/__init__.py
+++ b/src/cdm_reader_mapper/__init__.py
@@ -19,10 +19,6 @@
from .core.reader import read
from .core.writer import write
from .data import test_data
-from .duplicates.duplicates import (
- DupDetect,
- duplicate_check,
-)
from .mdf_reader.reader import read_data, read_mdf
from .mdf_reader.writer import write_data
from .metmetpy import (
@@ -35,11 +31,9 @@
__all__ = [
"DataBundle",
- "DupDetect",
"cdm_tables",
"correct_datetime",
"correct_pt",
- "duplicate_check",
"map_model",
"read",
"read_data",
diff --git a/src/cdm_reader_mapper/core/databundle.py b/src/cdm_reader_mapper/core/databundle.py
index 9b4b4cbd..4b3f20a8 100755
--- a/src/cdm_reader_mapper/core/databundle.py
+++ b/src/cdm_reader_mapper/core/databundle.py
@@ -17,7 +17,6 @@
split_by_index,
)
from cdm_reader_mapper.common.iterators import ParquetStreamReader, is_valid_iterator
-from cdm_reader_mapper.duplicates.duplicates import DupDetect, duplicate_check
from cdm_reader_mapper.metmetpy import (
correct_datetime,
correct_pt,
@@ -154,7 +153,6 @@ def __init__(
self._mask: pd.DataFrame | ParquetStreamReader = mask
self._imodel = imodel
self._mode = mode
- self.DupDetect: DupDetect | None = None
def __len__(self) -> int:
"""
@@ -1414,208 +1412,3 @@ def write(
mode=mode,
**kwargs,
)
-
- def duplicate_check(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None:
- r"""
- Duplicate check in :py:attr:`data`.
-
- Parameters
- ----------
- inplace : bool, default: False
- If True overwrite :py:attr:`data` in :py:class:`~DataBundle`
- else return a copy of :py:class:`~DataBundle` with :py:attr:`data` as CDM tables.
- \**kwargs : Any
- Additional keyword-arguments for duplicate check.
-
- Returns
- -------
- :py:class:`~DataBundle` or None
- DataBundle containing new :py:class:`~DupDetect` class for further duplicate check methods or None if "inplace=True".
-
- See Also
- --------
- DataBundle.get_duplicates : Get duplicate matches in `data`.
- DataBundle.flag_duplicates : Flag detected duplicates in `data`.
- DataBundle.remove_duplicates : Remove detected duplicates in `data`.
-
- Notes
- -----
- Following columns have to be provided:
-
- * `longitude`
- * `latitude`
- * `primary_station_id`
- * `report_timestamp`
- * `station_course`
- * `station_speed`
-
- This adds a new class :py:class:`~DupDetect` to :py:class:`~DataBundle`.
- This class is necessary for further duplicate check methods.
-
- For more information see :py:func:`duplicate_check`
-
- Examples
- --------
- >>> db.duplicate_check()
- """
- db_ = self._get_db(inplace)
- if db_ is None:
- return None
- if db_._mode == "tables" and "header" in db_._data:
- data = db_._data["header"]
- else:
- data = db_._data
- db_.DupDetect = duplicate_check(data, **kwargs)
- return self._return_db(db_, inplace)
-
- def flag_duplicates(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None:
- r"""
- Flag detected duplicates in :py:attr:`data`.
-
- Parameters
- ----------
- inplace : bool, default: False
- If True overwrite :py:attr:`data` in :py:class:`~DataBundle`
- else return a copy of :py:class:`~DataBundle` with :py:attr:`data` containing flagged duplicates.
- \**kwargs : Any
- Additional keyword-arguments for flagging duplicates.
-
- Returns
- -------
- :py:class:`~DataBundle` or None
- DataBundle containing duplicate flags in :py:attr:`data` or None if "inplace=True".
-
- Raises
- ------
- RuntimeError
- Before flagging duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`.
-
- See Also
- --------
- DataBundle.remove_duplicates : Remove detected duplicates in `data`.
- DataBundle.get_duplicates : Get duplicate matches in `data`.
- DataBundle.duplicate_check : Duplicate check in `data`.
-
- Notes
- -----
- For more information see :py:func:`DupDetect.flag_duplicates`
-
- Examples
- --------
- Flag duplicates without overwriting :py:attr:`data`.
-
- >>> flagged_tables = db.flag_duplicates()
-
- Flag duplicates with overwriting :py:attr:`data`.
-
- >>> db.flag_duplicates(inplace=True)
- >>> flagged_tables = db.data
- """
- db_ = self._get_db(inplace)
- if db_ is None:
- return None
-
- if db_.DupDetect is None:
- raise RuntimeError("Before flagging duplicates, a duplictate check has to be done: 'db.duplicate_check()'")
-
- db_.DupDetect.flag_duplicates(**kwargs)
-
- if db_._mode == "tables" and "header" in db_._data:
- db_._data["header"] = db_.DupDetect.result
- else:
- db_._data = db_.DupDetect.result
- return self._return_db(db_, inplace)
-
- def get_duplicates(self, **kwargs: Any) -> pd.DataFrame:
- r"""
- Get duplicate matches in :py:attr:`data`.
-
- Parameters
- ----------
- \**kwargs : Any
- Additional keyword-arguments used for getting duplicates.
-
- Returns
- -------
- pd.DataFrame
- DataFrame containing duplicate matches.
-
- Raises
- ------
- RuntimeError
- Before getting duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`.
-
- See Also
- --------
- DataBundle.remove_duplicates : Remove detected duplicates in `data`.
- DataBundle.flag_duplicates : Flag detected duplicates in `data`.
- DataBundle.duplicate_check : Duplicate check in `data`.
-
- Notes
- -----
- For more information see :py:func:`DupDetect.get_duplicates`
-
- Examples
- --------
- >>> matches = db.get_duplicates()
- """
- if self.DupDetect is None:
- raise RuntimeError("Before getting duplicates, a duplictate check has to be done: 'db.duplicate_check()'")
- return self.DupDetect.get_duplicates(**kwargs)
-
- def remove_duplicates(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None:
- r"""
- Remove detected duplicates in :py:attr:`data`.
-
- Parameters
- ----------
- inplace : bool, default: False
- If True overwrite :py:attr:`data` in :py:class:`~DataBundle`
- else return a copy of :py:class:`~DataBundle` with :py:attr:`data` containing no duplicates.
- \**kwargs : Any
- Additional keyword-arguments used to remove duplicates.
-
- Returns
- -------
- :py:class:`~DataBundle` or None
- DataBundle without duplicated rows or None if "inplace=True".
-
- Raises
- ------
- RuntimeError
- Before removing duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`.
-
- See Also
- --------
- DataBundle.flag_duplicates : Flag detected duplicates in `data`.
- DataBundle.get_duplicates : Get duplicate matches in `data`.
- DataBundle.duplicate_check : Duplicate check in `data`.
-
- Notes
- -----
- For more information see :py:func:`DupDetect.remove_duplicates`
-
- Examples
- --------
- Remove duplicates without overwriting :py:attr:`data`.
-
- >>> removed_tables = db.remove_duplicates()
-
- Remove duplicates with overwriting :py:attr:`data`.
-
- >>> db.remove_duplicates(inplace=True)
- >>> removed_tables = db.data
- """
- db_ = self._get_db(inplace)
- if db_ is None:
- return None
-
- if db_.DupDetect is None:
- raise RuntimeError("Before removing duplicates, a duplictate check has to be done: 'db.duplicate_check()'")
-
- db_.DupDetect.remove_duplicates(**kwargs)
- header_ = db_.DupDetect.result
- if not isinstance(db_._data, pd.DataFrame):
- raise TypeError("data has unsupported type: {type(db_._data)}.")
- db_._data = db_._data[db_._data.index.isin(header_.index)]
- return self._return_db(db_, inplace)
diff --git a/src/cdm_reader_mapper/data/__init__.py b/src/cdm_reader_mapper/data/__init__.py
index 9f1431a4..4f585219 100755
--- a/src/cdm_reader_mapper/data/__init__.py
+++ b/src/cdm_reader_mapper/data/__init__.py
@@ -181,7 +181,7 @@ def _get_data_dict(self, data_file: str, data_model: str, source_ext: str = "csv
@property
def test_icoads_r300_d714(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 714 test dataset.
+ Test dataset for IMMA1 deck 714.
Returns
-------
@@ -197,7 +197,7 @@ def test_icoads_r300_d714(self) -> LazyDataDict:
@property
def test_icoads_r300_d701(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 701 test dataset.
+ Test dataset for IMMA1 deck 701.
Returns
-------
@@ -213,7 +213,7 @@ def test_icoads_r300_d701(self) -> LazyDataDict:
@property
def test_icoads_r300_d706(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 706 test dataset.
+ Test dataset for IMMA1 deck 706.
Returns
-------
@@ -229,7 +229,7 @@ def test_icoads_r300_d706(self) -> LazyDataDict:
@property
def test_icoads_r300_d705(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 705 test dataset.
+ Test dataset for IMMA1 deck 705.
Returns
-------
@@ -245,7 +245,7 @@ def test_icoads_r300_d705(self) -> LazyDataDict:
@property
def test_icoads_r300_d702(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 702 test dataset.
+ Test dataset for IMMA1 deck 702.
Returns
-------
@@ -261,7 +261,7 @@ def test_icoads_r300_d702(self) -> LazyDataDict:
@property
def test_icoads_r300_d707(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 707 test dataset.
+ Test dataset for IMMA1 deck 707.
Returns
-------
@@ -277,7 +277,7 @@ def test_icoads_r300_d707(self) -> LazyDataDict:
@property
def test_icoads_r300_mixed(self) -> LazyDataDict:
"""
- Retrieve IMMA1 mixed test dataset.
+ Test dataset for IMMA1 mixed.
Returns
-------
@@ -293,7 +293,7 @@ def test_icoads_r300_mixed(self) -> LazyDataDict:
@property
def test_icoads_r302_d794(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 794 test dataset.
+ Test dataset for IMMA1 deck 794.
Returns
-------
@@ -309,7 +309,7 @@ def test_icoads_r302_d794(self) -> LazyDataDict:
@property
def test_icoads_r300_d704(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 704 test dataset.
+ Test dataset for IMMA1 deck 704.
Returns
-------
@@ -325,7 +325,7 @@ def test_icoads_r300_d704(self) -> LazyDataDict:
@property
def test_icoads_r300_d721(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 721 test dataset.
+ Test dataset for IMMA1 deck 721.
Returns
-------
@@ -341,7 +341,7 @@ def test_icoads_r300_d721(self) -> LazyDataDict:
@property
def test_icoads_r300_d730(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 730 test dataset.
+ Test dataset for IMMA1 deck 730.
Returns
-------
@@ -357,7 +357,7 @@ def test_icoads_r300_d730(self) -> LazyDataDict:
@property
def test_icoads_r300_d781(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 781 test dataset.
+ Test dataset for IMMA1 deck 781.
Returns
-------
@@ -373,7 +373,7 @@ def test_icoads_r300_d781(self) -> LazyDataDict:
@property
def test_icoads_r300_d703(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 703 test dataset.
+ Test dataset for IMMA1 deck 703.
Returns
-------
@@ -389,7 +389,7 @@ def test_icoads_r300_d703(self) -> LazyDataDict:
@property
def test_icoads_r300_d201(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 201 test dataset.
+ Test dataset for IMMA1 deck 201.
Returns
-------
@@ -405,7 +405,7 @@ def test_icoads_r300_d201(self) -> LazyDataDict:
@property
def test_icoads_r300_d892(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 892 test dataset.
+ Test dataset for IMMA1 deck 892.
Returns
-------
@@ -421,7 +421,7 @@ def test_icoads_r300_d892(self) -> LazyDataDict:
@property
def test_icoads_r300_d700(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 700 test dataset.
+ Test dataset for IMMA1 deck 700.
Returns
-------
@@ -437,7 +437,7 @@ def test_icoads_r300_d700(self) -> LazyDataDict:
@property
def test_icoads_r302_d792(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 792 test dataset.
+ Test dataset for IMMA1 deck 792.
Returns
-------
@@ -453,7 +453,7 @@ def test_icoads_r302_d792(self) -> LazyDataDict:
@property
def test_icoads_r302_d992(self) -> LazyDataDict:
"""
- Retrieve IMMA1 deck 992 test dataset.
+ Test dataset for IMMA1 deck 992.
Returns
-------
@@ -469,7 +469,7 @@ def test_icoads_r302_d992(self) -> LazyDataDict:
@property
def test_gdac(self) -> LazyDataDict:
"""
- Retrieve IMMT test dataset.
+ Test dataset for IMMT.
Returns
-------
@@ -485,7 +485,7 @@ def test_gdac(self) -> LazyDataDict:
@property
def test_craid(self) -> LazyDataDict:
"""
- Retrieve C-RAID 1260810 test dataset.
+ Test dataset for C-RAID 1260810.
Returns
-------
@@ -497,7 +497,7 @@ def test_craid(self) -> LazyDataDict:
@property
def test_marob(self) -> LazyDataDict:
"""
- Retrieve MAROB (DWD database) test dataset.
+ Test dataset for MAROB (DWD database).
Returns
-------
@@ -509,7 +509,7 @@ def test_marob(self) -> LazyDataDict:
@property
def test_cmems(self) -> LazyDataDict:
"""
- Retrieve CMEMS (copernicusmarine) test dataset.
+ Test dataset for CMEMS (copernicusmarine).
Returns
-------
@@ -521,7 +521,7 @@ def test_cmems(self) -> LazyDataDict:
@property
def test_pub47(self) -> dict[str, Any]:
"""
- Retrieve Pub47 v202501 test dataset.
+ Test dataset for Pub47 v202501.
Returns
-------
diff --git a/src/cdm_reader_mapper/duplicates/__init__.py b/src/cdm_reader_mapper/duplicates/__init__.py
deleted file mode 100755
index 5e57deb7..00000000
--- a/src/cdm_reader_mapper/duplicates/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-"""Climate Data Model (CDM) mapper package."""
-
-from __future__ import annotations
diff --git a/src/cdm_reader_mapper/duplicates/_duplicate_settings.py b/src/cdm_reader_mapper/duplicates/_duplicate_settings.py
deleted file mode 100755
index 5f108749..00000000
--- a/src/cdm_reader_mapper/duplicates/_duplicate_settings.py
+++ /dev/null
@@ -1,77 +0,0 @@
-"""Settings for duplicate check."""
-
-from __future__ import annotations
-from typing import Any
-
-from recordlinkage import Compare
-from recordlinkage.compare import Numeric
-
-
-__all__ = ["Compare"]
-
-_method_kwargs = {
- "left_on": "report_timestamp",
- "window": 5,
- "block_on": ["primary_station_id"],
-}
-
-_compare_kwargs = {
- "primary_station_id": {"method": "exact"},
- "longitude": {
- "method": "numeric",
- "kwargs": {"method": "step", "offset": 0.11},
- },
- "latitude": {
- "method": "numeric",
- "kwargs": {"method": "step", "offset": 0.11},
- },
- "report_timestamp": {
- "method": "date2",
- "kwargs": {"method": "gauss", "offset": 60.0},
- },
- "station_speed": {
- "method": "numeric",
- "kwargs": {"method": "step", "offset": 0.09},
- },
- "station_course": {
- "method": "numeric",
- "kwargs": {"method": "step", "offset": 0.9},
- },
-}
-
-_histories = {
- "duplicate_status": "Added duplicate information - flag",
- "duplicates": "Added duplicate information - duplicates",
-}
-
-
-class Date2(Numeric): # type: ignore[misc]
- """Copy of ``rl.compare.Numeric`` class."""
-
- pass
-
-
-def date2(object: Compare, *args: Any, **kwargs: Any) -> Compare:
- r"""
- New method for ``rl.Compare`` object using ``Date2`` object.
-
- Parameters
- ----------
- object : Compare
- Object to with the new method should be added.
- \*args : Any
- Positional argument for `Date2`.
- \**kwargs : Any
- Keyword-arguments for `Date2`.
-
- Returns
- -------
- Compare
- Compare object with new method.
- """
- compare = Date2(*args, **kwargs)
- object.add(compare)
- return object
-
-
-Compare.date2 = date2
diff --git a/src/cdm_reader_mapper/duplicates/duplicates.py b/src/cdm_reader_mapper/duplicates/duplicates.py
deleted file mode 100755
index 8722c721..00000000
--- a/src/cdm_reader_mapper/duplicates/duplicates.py
+++ /dev/null
@@ -1,903 +0,0 @@
-"""Common Data Model (CDM) pandas duplicate check."""
-
-from __future__ import annotations
-import datetime
-from collections.abc import Iterable
-from copy import deepcopy
-from typing import Any
-
-import numpy as np
-import pandas as pd
-import recordlinkage as rl
-
-from ._duplicate_settings import Compare, _compare_kwargs, _histories, _method_kwargs
-
-
-def convert_series(df: pd.DataFrame, conversion: dict[Any, Any]) -> pd.DataFrame:
- """
- Convert data types in Dataframe.
-
- Parameters
- ----------
- df : pd.DataFrame
- Input DataFrame.
- conversion : dict
- Conversion dictionary conating columns and
- new data type as key-value pairs.
-
- Returns
- -------
- pd.DataFrame
- DataFrame with converted data types.
- """
-
- def convert_date_to_float(date: pd.Series | pd.DatetimeIndex) -> pd.Series:
- """
- Convert datetime values to float seconds relative to the minimum value.
-
- Parameters
- ----------
- date : pd.Series or pd.DatetimeIndex
- Datetime-like values to convert.
-
- Returns
- -------
- pd.Series
- Float values representing seconds since the minimum datetime in `date`.
- """
- date = date.astype("datetime64[ns]")
- return (date - date.min()) / np.timedelta64(1, "s")
-
- df = df.copy()
- for column, method in conversion.items():
- try:
- df[column] = df[column].astype(method)
- except TypeError:
- df[column] = locals()[method](df[column])
-
- df = df.infer_objects(copy=False).fillna(9999.0)
- return df
-
-
-def add_history(df: pd.DataFrame, indexes: Iterable[int]) -> pd.DataFrame:
- """
- Append duplicate information to the 'history' column of a DataFrame.
-
- Parameters
- ----------
- df : pd.DataFrame
- The DataFrame containing a 'history' column.
- indexes : list[int] or pd.Index
- Row indexes where history should be updated.
-
- Returns
- -------
- pd.DataFrame
- A new DataFrame with updated 'history' column for the selected rows.
-
- Notes
- -----
- - If 'history' column does not exist, it will be created with empty strings.
- - Each message is prefixed with a UTC timestamp in "YYYY-MM-DD HH:MM:SS" format.
- """
-
- def _datetime_now() -> str:
- """
- Get actual datetime.
-
- Returns
- -------
- str
- Actual datetime as string representative ("%Y-%m-%d %H:%M:%S").
- """
- try:
- now = datetime.datetime.now(datetime.UTC)
- except AttributeError:
- now = datetime.datetime.utcnow()
-
- return now.strftime("%Y-%m-%d %H:%M:%S")
-
- df = df.copy()
-
- if "history" not in df.columns:
- df["history"] = ""
-
- history_tstmp = _datetime_now()
- addition = "".join([f"; {history_tstmp}. {add}" for add in _histories.items()])
- df.loc[indexes, "history"] = df.loc[indexes, "history"] + addition
- return df
-
-
-def add_duplicates(df: pd.DataFrame, dups: pd.DataFrame) -> pd.DataFrame:
- """
- Add duplicate information to the DataFrame based on the `dups` table.
-
- Parameters
- ----------
- df : pd.DataFrame
- DataFrame containing a 'report_id' column.
- dups : pd.DataFrame
- DataFrame where the index corresponds to rows in `df` and
- the values are lists of duplicate indices or duplicate IDs.
-
- Returns
- -------
- pd.DataFrame
- A new DataFrame with a 'duplicates' column containing duplicates
- as a sorted string list, e.g., "{ID1,ID2}".
-
- Notes
- -----
- - If a row has no duplicates, its 'duplicates' column is left unchanged.
- - Supports duplicates represented either by IDs (str) or by indices (int) of `report_id`.
- """
-
- def _add_dups(row: pd.Series) -> pd.Series:
- """
- Add duplicates as string representatives to series.
-
- Parameters
- ----------
- row : pd.Series
- Single row of a pd.DataFrame.
-
- Returns
- -------
- pd.Series
- Duplicates as string representatives added to `row`.
- """
- idx = row.name
- if idx not in dups.index:
- return row
-
- dup_idx = dups.loc[idx].to_list()
- if isinstance(dup_idx[0][0], str):
- v_ = sorted(dup_idx[0])
- else:
- v_ = report_ids.iloc[dup_idx[0]]
- v_ = sorted(v_.tolist())
- row["duplicates"] = "{" + ",".join(v_) + "}"
- return row
-
- df = df.copy()
-
- if "duplicates" not in df.columns:
- df["duplicates"] = ""
-
- report_ids = df["report_id"]
-
- dtypes = df.dtypes
- result = df.apply(lambda x: _add_dups(x), axis=1)
- return result.astype(dtypes)
-
-
-def add_report_quality(df: pd.DataFrame, indexes_bad: Iterable[int]) -> pd.DataFrame:
- """
- Update the 'report_quality' column in a DataFrame for bad reports.
-
- Parameters
- ----------
- df : pd.DataFrame
- DataFrame containing at least a 'report_quality' column.
- indexes_bad : iterable of int
- Row indices in the DataFrame to mark as bad quality (value=1).
-
- Returns
- -------
- pd.DataFrame
- DataFrame with updated 'report_quality' column.
- """
- df = df.copy()
- df["report_quality"] = df["report_quality"].astype(int)
- df.loc[indexes_bad, "report_quality"] = 1
- return df
-
-
-class DupDetect:
- """
- Class to detect, flag, and remove duplicate entries in a DataFrame using a comparison matrix from recordlinkage.
-
- Parameters
- ----------
- data : pd.DataFrame
- Original dataset.
- compared : pd.DataFrame
- Comparison matrix of the dataset.
- method : str
- Duplicate detection method used for recordlinkage indexing.
- method_kwargs : dict
- Keyword arguments for recordlinkage indexing method.
- compare_kwargs : dict
- Keyword arguments used for recordlinkage.Compare.
- """
-
- def __init__(
- self,
- data: pd.DataFrame,
- compared: pd.DataFrame,
- method: str,
- method_kwargs: dict[Any, Any],
- compare_kwargs: dict[Any, Any],
- ) -> None:
- """
- Initialize a DupDetect instance.
-
- Parameters
- ----------
- data : pd.DataFrame
- Original dataset.
- compared : pd.DataFrame
- Comparison matrix of the dataset.
- method : str
- Duplicate detection method used for recordlinkage indexing.
- method_kwargs : dict
- Keyword arguments for recordlinkage indexing method.
- compare_kwargs : dict
- Keyword arguments used for recordlinkage.Compare.
- """
- self.data = data.copy()
- self.compared = compared
- self.method = method
- self.method_kwargs = method_kwargs
- self.compare_kwargs = compare_kwargs
-
- def _get_limit(self, limit: str | float | None) -> float:
- """
- Resolve the duplicate threshold limit.
-
- Parameters
- ----------
- limit : str or float
- 'default', None, or a numeric limit.
-
- Returns
- -------
- float
- Threshold for total score to consider duplicates.
- """
- default_limit = 0.991
- if limit is None or limit == "default":
- return default_limit
-
- return float(limit)
-
- def _get_equal_musts(self) -> list[str]:
- """
- Identify columns that must be equal for duplicates.
-
- Returns
- -------
- list[str]
- Columns that must match exactly to consider duplicates.
- """
- equal_musts: list[str] = []
- for value in self.compare_kwargs.keys():
- if isinstance(value, str):
- value_lst = [value]
- else:
- value_lst = list(value)
- equal_musts.extend(v for v in value_lst if v in self.data.columns)
- return equal_musts
-
- def _total_score(self) -> None:
- """Compute total similarity score for each row in `self.compared`."""
- pcmax = self.compared.shape[1]
- self.score = 1 - (abs(self.compared.sum(axis=1) - pcmax) / pcmax)
-
- def get_duplicates(
- self,
- keep: str | int = "first",
- limit: str | float | None = "default",
- equal_musts: str | list[str] | None = None,
- overwrite: bool = True,
- ) -> pd.DataFrame:
- """
- Identify duplicate matches based on the comparison matrix.
-
- Parameters
- ----------
- keep : str or int
- Which entry to keep: 'first', 'last', or -1, 0.
- limit : str or float, optional, default: default
- Threshold of total similarity score to consider as duplicate.
- equal_musts : str or list[str], optional
- Columns that must exactly match.
- overwrite : bool, default: True
- Whether to recompute matches if already calculated.
-
- Returns
- -------
- pd.DataFrame
- DataFrame containing matched duplicates.
- """
- if keep not in ["first", "last", -1, 0]:
- raise ValueError("keep has to be one of 'first', 'last', -1 or 0.")
-
- if keep == "first":
- keep = -1
- elif keep == "last":
- keep = 0
-
- self.keep = keep
- if keep == 0:
- self.drop = -1
- elif keep == -1:
- self.drop = 0
-
- if overwrite is True:
- self._total_score()
- self.limit = self._get_limit(limit)
- cond = self.score >= self.limit
- if equal_musts is None:
- equal_musts = self._get_equal_musts()
- if isinstance(equal_musts, str):
- equal_musts = [equal_musts]
- for must in equal_musts:
- cond = cond & (self.compared[must])
- self.matches = self.compared[cond]
- return self.matches
-
- def flag_duplicates(
- self,
- keep: str | int = "first",
- limit: str | float | None = "default",
- equal_musts: str | list[str] | None = None,
- ) -> pd.DataFrame:
- r"""
- Get result dataset with flagged duplicates.
-
- Parameters
- ----------
- keep : str or int, default: first
- Which entry should be kept in result dataset.
- limit : str, int or float, optional
- Limit of total score that as to be exceeded to be declared as a duplicate.
- Defaults to .991.
- equal_musts : str or list, optional
- Hashable of column name(s) that must totally be equal to be declared as a duplicate.
- Default: All column names found in method_kwargs.
-
- Returns
- -------
- pd.DataFrame
- Input DataFrame with flagged duplicates, including duplicate_status_ and quality_flag_.
-
- References
- ----------
- .. _duplicate_status: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/duplicate_status/duplicate_status.html
- .. _quality_flag: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/quality_flag/quality_flag.html
- """
-
- def _get_similars(drop_dict: dict[str | int, Any], keeps: Any) -> tuple[Any, Any]:
- """
- Get similar entries from a comparison dictionary.
-
- Parameters
- ----------
- drop_dict : dict
- Dictionary containing values under keys `drop_` and `keep_` used
- to determine similarity relationships.
- keeps : Any
- Reference collection used to determine whether a value in `drop` is
- considered a match.
-
- Returns
- -------
- tuple of Any and Any
- A tuple containing the matched `drop` and `keep` values converted
- to integers if possible. If the values are not convertible or no match
- is found, returns `(None, None)`.
- """
- if drop_dict[drop] in keeps:
- drops = drop_dict[drop]
- keeps = drop_dict[keep]
- try:
- return int(drops), int(keeps)
- except ValueError:
- return drops, keeps
-
- return None, None
-
- def _get_duplicates(x: pd.DataFrame, last: Any) -> pd.Series:
- """
- Extract unique duplicate values from a DataFrame column.
-
- Parameters
- ----------
- x : pd.DataFrame
- Input DataFrame containing the column to inspect.
- last : Any
- Column name used to extract values for duplicate detection.
-
- Returns
- -------
- pd.Series
- Series containing a single key "dups" with the list of unique
- duplicate values found in the specified column.
- """
- b = list(set(x[last].values))
- return pd.Series({"dups": b})
-
- def _delete_values_equal_keys(dictionary: dict[Any, Any]) -> tuple[dict[Any, Any], list[Any]]:
- """
- Remove entries where keys and values are identical.
-
- Parameters
- ----------
- dictionary : dict
- Input mapping of keys to values.
-
- Returns
- -------
- tuple of dict and list of Any
- A tuple containing:
- - A filtered dictionary with identical key-value pairs removed
- - A list of values that were removed because key == value
- """
- new_dictionary = {}
- drops = []
- for k, v in dictionary.items():
- if k == v:
- drops.append(v)
- continue
- new_dictionary[k] = v
- return new_dictionary, drops
-
- def replace_keeps_and_drops(df: pd.DataFrame, keep: Any) -> pd.DataFrame:
- """
- Iteratively resolve and replace duplicate mappings in a DataFrame.
-
- Parameters
- ----------
- df : pd.DataFrame
- Input DataFrame containing values to be deduplicated.
- keep : Any
- Column name used to identify canonical ("keep") values.
-
- Returns
- -------
- pd.DataFrame
- Updated DataFrame with resolved duplicate mappings and cleaned
- keep-column values.
- """
- keeps = df[keep].values
- while True:
- df = df.sort_index()
- replaces = df.apply(lambda row, keeps=keeps: _get_similars(row, keeps), axis=1)
- replaces = {k: v for k, v in dict(replaces.values).items() if k is not None}
- replaces, drops = _delete_values_equal_keys(replaces)
- keys = replaces.keys()
- values = replaces.values()
- if len(drops) > 0:
- df = df.drop(drops, axis="index")
- df[keep] = df[keep].replace(replaces)
- if not set(keys).intersection(values):
- return df
-
- self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts)
- result = self.data.copy()
-
- dtypes = result.dtypes
-
- result["duplicate_status"] = 0
- if not hasattr(self, "matches"):
- self.get_duplicates(limit="default", equal_musts=equal_musts)
-
- indexes = self.matches.index
- indexes_df = indexes.to_frame()
- drop = indexes_df.columns[self.drop]
- keep = indexes_df.columns[self.keep]
- indexes_df = indexes_df.drop_duplicates(subset=[drop])
- indexes_df = replace_keeps_and_drops(indexes_df, keep)
-
- dup_keep = indexes_df.groupby(indexes_df[keep]).apply(
- lambda x: _get_duplicates(x, drop),
- include_groups=False,
- )
- dup_drop = indexes_df.groupby(indexes_df[drop]).apply(
- lambda x: _get_duplicates(x, keep),
- include_groups=False,
- )
- duplicates = pd.concat([dup_keep, dup_drop])
-
- indexes_good = indexes_df[keep].values.tolist()
- indexes_bad = indexes_df[drop].values.tolist()
- indexes = indexes_good + indexes_bad
- result.loc[indexes_good, "duplicate_status"] = 1
- result.loc[indexes_bad, "duplicate_status"] = 3
- result = add_report_quality(result, indexes_bad=indexes_bad)
- result = add_history(result, indexes)
- result = result.sort_index(ascending=True)
- result = add_duplicates(result, duplicates)
-
- self.result = result.astype(dtypes)
- self.data = self.data.sort_index(ascending=True)
-
- return self.result
-
- def remove_duplicates(
- self,
- keep: str | int = "first",
- limit: str | float | None = "default",
- equal_musts: str | list[str] | None = None,
- ) -> pd.DataFrame:
- """
- Remove duplicate entries from the dataset.
-
- Parameters
- ----------
- keep : str or int
- Which entry to keep ('first' or 'last').
- limit : str or float, optional
- Minimum similarity score to declare duplicates.
- equal_musts : str or list[str], optional
- Columns that must exactly match.
-
- Returns
- -------
- pd.DataFrame
- Dataset without duplicates.
- """
- self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts)
- result = self.data.copy()
- drops = self.matches.index.get_level_values(self.drop)
- result = result.drop(drops)
- self.result = result.sort_index(ascending=True)
- self.data = self.data.sort_index(ascending=True)
- return self.result
-
-
-def set_comparer(compare_dict: dict[Any, Any]) -> Compare:
- """
- Build a recordlinkage Compare object with optional conversion dictionary.
-
- Parameters
- ----------
- compare_dict : dict
- Dictionary of columns to compare,
- e.g. {"column_name": {"method": "exact" | "numeric" | "date2", "kwargs": {...}}}.
-
- Returns
- -------
- recordlinkage.Compare
- Compare object with added comparison methods and a 'conversion' attribute.
- """
- comparer = Compare()
- comparer.conversion = {}
- for column, c_dict in compare_dict.items():
- try:
- method = c_dict["method"]
- except KeyError as err:
- raise KeyError(
- "compare_kwargs must be hierarchically ordered: {: {'method': }}. 'method' not found"
- ) from err
- try:
- kwargs = c_dict["kwargs"]
- except KeyError:
- kwargs = {}
- getattr(comparer, method)(
- column,
- column,
- label=column,
- **kwargs,
- )
- if method == "numeric":
- comparer.conversion[column] = float
- if method == "date":
- comparer.conversion[column] = "datetime64[ns]"
- if method == "date2":
- comparer.conversion[column] = "convert_date_to_float"
-
- return comparer
-
-
-def remove_ignores(dic: dict[Any, Any], columns: str | list[str]) -> dict[Any, Any]:
- """
- Remove dictionary entries where keys or values match ignored columns.
-
- Parameters
- ----------
- dic : dict
- Original dictionary to filter.
- columns : str or list[str]
- Column(s) to ignore.
-
- Returns
- -------
- dict
- Filtered dictionary without the ignored columns.
- """
- new_dict = {}
- if isinstance(columns, str):
- columns = [columns]
- for k, v in dic.items():
- if k in columns:
- continue
- if v in columns:
- continue
- if isinstance(v, list):
- v2 = [v_ for v_ in v if v_ not in columns]
- if len(v2) == 0:
- continue
- v = v2
- new_dict[k] = v
- return new_dict
-
-
-def change_offsets(dic: dict[Any, Any], dic_o: dict[Any, Any]) -> dict[Any, Any]:
- """
- Update the 'offset' value in compare dictionary kwargs.
-
- Parameters
- ----------
- dic : dict
- Original compare dictionary.
- dic_o : dict
- Dictionary mapping column names to new offsets.
-
- Returns
- -------
- dict
- Updated compare dictionary with modified offsets.
- """
- for key in dic.keys():
- if key not in dic_o.keys():
- continue
- dic[key]["kwargs"]["offset"] = dic_o[key]
- return dic
-
-
-def reindex_nulls(df: pd.DataFrame, null_label: Any) -> pd.DataFrame:
- """
- Reindex a DataFrame in ascending order based on the number of 'null' strings in each row.
-
- Parameters
- ----------
- df : pd.DataFrame
- Input DataFrame. Cells with the string "null" are counted as nulls.
- null_label : Any
- Missing value representative.
-
- Returns
- -------
- pd.DataFrame
- DataFrame reindexed so that rows with fewer 'null' values appear first.
- Original row order is preserved for rows with the same null count.
- """
-
- def is_missing(x: Any) -> bool:
- """
- Determine whether a value is considered missing.
-
- This function supports scalar values as well as nested iterables
- (lists, tuples, numpy arrays). A value is considered missing if it is:
- - NaN (as defined by ``pandas.isna``)
- - Equal to ``null_label``
- - Any element inside an iterable is missing (recursively checked)
-
- Parameters
- ----------
- x : Any
- Value to check for missingness.
-
- Returns
- -------
- bool
- True if the value (or any nested value) is missing, otherwise False.
- """
- if isinstance(x, (list, tuple, np.ndarray)):
- return any(is_missing(x_) for x_ in x)
-
- if pd.isna(x):
- return True
-
- if x == null_label:
- return True
-
- return False
-
- def count_nulls(row: pd.Series) -> int:
- """
- Count the number of missing values in a pandas Series.
-
- Parameters
- ----------
- row : pd.Series
- Input row or Series to evaluate.
-
- Returns
- -------
- int
- Number of missing values in the Series.
- """
- return sum(is_missing(x) for x in row)
-
- null_counts = df.apply(count_nulls, axis=1)
-
- if null_counts.empty:
- return df
-
- sorted_index = null_counts.sort_values(kind="stable").index
- return df.loc[sorted_index]
-
-
-class Comparer:
- """
- Wrapper around recordlinkage.Compare to compute pairwise comparisons on a DataFrame.
-
- This class initializes a recordlinkage indexer and Compare object, optionally converting
- the data types before computing the comparisons.
-
- Parameters
- ----------
- data : pd.DataFrame
- The dataset to compare.
- method : str
- The indexing method from `recordlinkage.index`, e.g., 'SortedNeighbourhood'.
- method_kwargs : dict
- Keyword arguments to pass to the indexing method.
- compare_kwargs : dict
- Dictionary specifying columns and comparison methods for recordlinkage.Compare.
- pairs_df : list[pd.DataFrame], optional
- Optional pre-split DataFrames to pass to the indexer. Defaults to `[data]`.
- convert_data : bool, default False
- Whether to convert data using `compare_kwargs` conversion dictionary.
- """
-
- def __init__(
- self,
- data: pd.DataFrame,
- method: str,
- method_kwargs: dict[Any, Any],
- compare_kwargs: dict[Any, Any],
- pairs_df: list[pd.DataFrame] | None = None,
- convert_data: bool = False,
- ):
- """
- Initialize a Comparer instance.
-
- Parameters
- ----------
- data : pd.DataFrame
- The dataset to compare.
- method : str
- The indexing method from `recordlinkage.index`, e.g., 'SortedNeighbourhood'.
- method_kwargs : dict
- Keyword arguments to pass to the indexing method.
- compare_kwargs : dict
- Dictionary specifying columns and comparison methods for recordlinkage.Compare.
- pairs_df : list[pd.DataFrame], optional
- Optional pre-split DataFrames to pass to the indexer. Defaults to `[data]`.
- convert_data : bool, default False
- Whether to convert data using `compare_kwargs` conversion dictionary.
- """
- indexer = getattr(rl.index, method)(**method_kwargs)
- comparer = set_comparer(compare_kwargs)
- if convert_data is True:
- data_cp = convert_series(data, comparer.conversion)
- else:
- data_cp = data.copy()
-
- if pairs_df is None:
- pairs_df = [data_cp]
- pairs = indexer.index(*pairs_df)
- self.compared = comparer.compute(pairs, data_cp)
- self.data = data_cp
-
-
-def duplicate_check(
- data: pd.DataFrame,
- method: str = "SortedNeighbourhood",
- method_kwargs: dict[Any, Any] | None = None,
- compare_kwargs: dict[Any, Any] | None = None,
- table_name: str | None = None,
- ignore_columns: str | None = None,
- ignore_entries: dict[str, Any] | None = None,
- offsets: dict[str, Any] | None = None,
- reindex_by_null: bool = True,
- null_label: Any = "null",
-) -> DupDetect:
- """
- Run a duplicate check on a dataset using recordlinkage.
-
- Returns a DupDetect object.
-
- Parameters
- ----------
- data : pandas.DataFrame
- Dataset for duplicate check.
- method : str, default: SortedNeighbourhood
- Duplicate check method for recordlinkage.
- method_kwargs : dict, optional
- Keyword arguments for recordlinkage duplicate check.
- Defaults to _method_kwargs.
- compare_kwargs : dict, optional
- Keyword arguments for recordlinkage.Compare object.
- Defaults to _compare_kwargs.
- table_name : str, optional
- Name of the CDM table to be selected from data.
- ignore_columns : str or list, optional
- Name of data columns to be ignored for duplicate check.
- ignore_entries : dict, optional
- Key: Column name.
- Value: value to be ignored.
- E.g. offsets={"station_speed": null}.
- offsets : dict, optional
- Change offsets for recordlinkage Compare object.
- Key: Column name.
- Value: new offset.
- E.g. offsets={"latitude": 0.1}.
- reindex_by_null : bool, optional
- If True data is re-indexed in ascending order according to the number of nulls in each row.
- null_label : str, optional
- Null label which is used if `reindex_by_null` is True.
-
- Returns
- -------
- cdm_reader_mapper.DupDetect
- A DupDetect instance.
- """
- if reindex_by_null is True:
- data = reindex_nulls(data, null_label=null_label)
-
- index = data.index
- data.reset_index(drop=True)
-
- if table_name:
- data = data[table_name]
- if not method_kwargs:
- method_kwargs = deepcopy(_method_kwargs)
- if not compare_kwargs:
- compare_kwargs = deepcopy(_compare_kwargs)
- if ignore_columns:
- method_kwargs = remove_ignores(method_kwargs, ignore_columns)
- compare_kwargs = remove_ignores(compare_kwargs, ignore_columns)
- if offsets:
- compare_kwargs = change_offsets(compare_kwargs, offsets)
-
- dtypes = data.dtypes
-
- comparer = Comparer(
- data=data,
- method=method,
- method_kwargs=method_kwargs,
- compare_kwargs=compare_kwargs,
- convert_data=True,
- )
- compared = comparer.compared
- data_ = comparer.data
-
- if ignore_entries is None:
- return DupDetect(data, compared, method, method_kwargs, compare_kwargs)
-
- compared = [compared]
-
- for column_, entry_ in ignore_entries.items():
- if not isinstance(entry_, list):
- entry_ = [entry_]
- entries = data[column_].isin(entry_)
-
- d1 = data.mask(entries).dropna(how="all")
- d2 = data.where(entries).dropna(how="all")
- if d1.empty:
- continue
- if d2.empty:
- continue
-
- method_kwargs_ = remove_ignores(method_kwargs, column_)
- compare_kwargs_ = remove_ignores(compare_kwargs, column_)
-
- compared_ = Comparer(
- data=data_,
- method=method,
- method_kwargs=method_kwargs_,
- compare_kwargs=compare_kwargs_,
- pairs_df=[d2, d1],
- ).compared
- compared_[list(ignore_entries.keys())] = 1
- compared.append(compared_)
-
- compared = pd.concat(compared)
- data.set_index(index, inplace=True)
- data = data.astype(dtypes)
- return DupDetect(data, compared, method, method_kwargs, compare_kwargs)
diff --git a/tests/_duplicates.py b/tests/_duplicates.py
deleted file mode 100755
index 744e9f64..00000000
--- a/tests/_duplicates.py
+++ /dev/null
@@ -1,380 +0,0 @@
-from __future__ import annotations
-
-import pandas as pd
-
-from cdm_reader_mapper import read, test_data
-
-
-def _manipulate_header(df):
- df_ = _manipulation(df["header"])
- df_.columns = pd.MultiIndex.from_product([["header"], df_.columns])
- return df_
-
-
-def _manipulation(df):
- df = df.copy()
- # Duplicate : Different report_id's
- # Failure in data set;
- # each report needs a specific report_id
- df.loc[5] = df.loc[4]
- df.loc[5, "report_id"] = "ICOADS-302-N688EY"
- df.loc[5, "report_quality"] = 2
-
- # No Duplicate: Lat and Lon values differ to much
- # valid is .5 degrees
- df.loc[6] = df.loc[4]
- df.loc[6, "report_id"] = "ICOADS-302-N688EZ"
- df.loc[6, "latitude"] = -65.80
- df.loc[6, "longitude"] = 21.20
- df.loc[6, "report_quality"] = 2
-
- # Duplicate: report timestamp differs no enough
- # valid is 60 seconds
- df.loc[7] = df.loc[1]
- df.loc[7, "report_id"] = "ICOADS-302-N688DT"
- df.loc[7, "report_timestamp"] = "2022-02-01 00:01:00"
- df.loc[7, "report_quality"] = 2
-
- # No Duplicate: report timestamp differs to much
- # valid is 60 seconds
- df.loc[8] = df.loc[1]
- df.loc[8, "report_id"] = "ICOADS-302-N688DU"
- df.loc[8, "report_timestamp"] = "2022-02-02 00:00:00"
- df.loc[8, "report_quality"] = 2
-
- # Duplicate : Different report_id's
- # Failure in data set
- df.loc[9] = df.loc[2]
- df.loc[9, "report_id"] = "ICOADS-302-N688DW"
- df.loc[9, "report_quality"] = 2
-
- # Duplicate : Different report_id's
- # Failure in data set
- # each report needs a specific report_id
- df.loc[10] = df.loc[3]
- df.loc[10, "report_id"] = "ICOADS-302-N688EF"
- df.loc[10, "latitude"] = 66.00
- df.loc[10, "longitude"] = 8.50
- df.loc[10, "report_quality"] = 2
-
- # Duplicate: Lat and Lon values differ not enough
- # valid is .5 degrees
- df.loc[11] = df.loc[3]
- df.loc[11, "report_id"] = "ICOADS-302-N688EE"
- df.loc[11, "latitude"] = 66.05
- df.loc[11, "longitude"] = 8.15
- df.loc[11, "report_quality"] = 2
-
- # No Duplicate: primary_station_id differs
- df.loc[12] = df.loc[3]
- df.loc[12, "report_id"] = "ICOADS-302-N688ED"
- df.loc[12, "primary_station_id"] = "MASKSTIP"
- df.loc[12, "report_quality"] = 2
-
- # Duplicate: Lat and Lon values differ not enough
- # valid is .5 degrees
- df.loc[13] = df.loc[3]
- df.loc[13, "report_id"] = "ICOADS-302-N688EC"
- df.loc[13, "latitude"] = 65.95
- df.loc[13, "longitude"] = 8.05
- df.loc[13, "report_quality"] = 2
-
- # Duplicate: ignore primary_station_id SHIP
- df.loc[14] = df.loc[3]
- df.loc[14, "report_id"] = "ICOADS-302-N688EG"
- df.loc[14, "primary_station_id"] = "SHIP"
- df.loc[14, "report_quality"] = 2
-
- # No Duplicate: Lat and Lon values differ to much
- # valid is .5 degrees
- df.loc[15] = df.loc[4]
- df.loc[15, "report_id"] = "ICOADS-302-N688EV"
- df.loc[15, "latitude"] = 65.60
- df.loc[15, "longitude"] = -21.40
- df.loc[15, "report_quality"] = 2
-
- # Duplicate: Lat and Lon values differ not enough
- # valid is .5 degrees
- df.loc[16] = df.loc[4]
- df.loc[16, "report_id"] = "ICOADS-302-N688EW"
- df.loc[16, "latitude"] = 65.90
- df.loc[16, "longitude"] = -21.10
- df.loc[16, "report_quality"] = 2
-
- # No Duplicate:
- df.loc[17] = df.loc[1]
- df.loc[17, "report_id"] = "ICOADS-302-N688EK"
- df.loc[17, "station_course"] = 316.0
-
- # No Duplicate:
- df.loc[18] = df.loc[1]
- df.loc[18, "report_id"] = "ICOADS-302-N688EL"
- df.loc[18, "station_speed"] = 4.0
-
- # Duplicate:
- df.loc[19] = df.loc[1]
- df.loc[19, "report_id"] = "ICOADS-302-N688EM"
- # print(df["station_course"])
- df.loc[19, "station_course"] = pd.NA
-
- # Duplicate:
- df.loc[20] = df.loc[1]
- df.loc[20, "report_id"] = "ICOADS-302-N688EN"
- df.loc[20, "station_speed"] = pd.NA
- return df
-
-
-def _get_test_data(imodel):
- pattern = f"test_{imodel}"
- data_file = test_data[pattern]["cdm_header"]
- data_path = data_file.parent
- return read(
- data_path,
- suffix=f"{imodel}*",
- cdm_subset="header",
- mode="tables",
- extension="pq",
- )
-
-
-exp1 = {
- "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 0, 0],
- "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1],
- "duplicates": [
- None,
- "{ICOADS-302-N688DT}",
- "{ICOADS-302-N688DW}",
- "{ICOADS-302-N688EC,ICOADS-302-N688EE}",
- "{ICOADS-302-N688EW,ICOADS-302-N688EY}",
- "{ICOADS-302-N688EI}",
- None,
- "{ICOADS-302-N688DS}",
- None,
- "{ICOADS-302-N688DV}",
- None,
- "{ICOADS-302-N688EH}",
- None,
- "{ICOADS-302-N688EH}",
- None,
- None,
- "{ICOADS-302-N688EI}",
- None,
- None,
- None,
- None,
- ],
-}
-
-exp2 = {
- "duplicate_status": [0, 1, 1, 3, 1, 3, 0, 3, 0, 3, 0, 3, 1, 3, 3, 0, 3, 0, 0, 0, 0],
- "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1],
- "duplicates": [
- None,
- "{ICOADS-302-N688DT}",
- "{ICOADS-302-N688DW}",
- "{ICOADS-302-N688ED}",
- "{ICOADS-302-N688EW,ICOADS-302-N688EY}",
- "{ICOADS-302-N688EI}",
- None,
- "{ICOADS-302-N688DS}",
- None,
- "{ICOADS-302-N688DV}",
- None,
- "{ICOADS-302-N688ED}",
- "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EG,ICOADS-302-N688EH}",
- "{ICOADS-302-N688ED}",
- "{ICOADS-302-N688ED}",
- None,
- "{ICOADS-302-N688EI}",
- None,
- None,
- None,
- None,
- ],
-}
-
-exp3 = {
- "duplicate_status": [1, 3, 3, 3, 3, 3, 3, 3, 0, 3, 3, 3, 0, 3, 0, 3, 3, 3, 3, 3, 3],
- "report_quality": [1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1],
- "duplicates": [
- "{ICOADS-302-N688DS,ICOADS-302-N688DT,ICOADS-302-N688DV,ICOADS-302-N688DW,ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EF,ICOADS-302-N688EH,ICOADS-302-N688EI,ICOADS-302-N688EK,ICOADS-302-N688EL,ICOADS-302-N688EM,ICOADS-302-N688EN,ICOADS-302-N688EV,ICOADS-302-N688EW,ICOADS-302-N688EY,ICOADS-302-N688EZ}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- None,
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- None,
- "{ICOADS-302-N688DR}",
- None,
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- "{ICOADS-302-N688DR}",
- ],
-}
-
-exp4 = {
- "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 3, 3, 3, 0, 3, 0, 0, 0, 0],
- "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1],
- "duplicates": [
- None,
- "{ICOADS-302-N688DT}",
- "{ICOADS-302-N688DW}",
- "{ICOADS-302-N688EC,ICOADS-302-N688ED,ICOADS-302-N688EE,ICOADS-302-N688EG}",
- "{ICOADS-302-N688EW,ICOADS-302-N688EY}",
- "{ICOADS-302-N688EI}",
- None,
- "{ICOADS-302-N688DS}",
- None,
- "{ICOADS-302-N688DV}",
- None,
- "{ICOADS-302-N688EH}",
- "{ICOADS-302-N688EH}",
- "{ICOADS-302-N688EH}",
- "{ICOADS-302-N688EH}",
- None,
- "{ICOADS-302-N688EI}",
- None,
- None,
- None,
- None,
- ],
-}
-
-exp5 = {
- "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 3, 3, 0, 3, 0, 3, 3, 0, 0, 0, 0],
- "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1],
- "duplicates": [
- None,
- "{ICOADS-302-N688DT}",
- "{ICOADS-302-N688DW}",
- "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EF}",
- "{ICOADS-302-N688EV,ICOADS-302-N688EW,ICOADS-302-N688EY}",
- "{ICOADS-302-N688EI}",
- None,
- "{ICOADS-302-N688DS}",
- None,
- "{ICOADS-302-N688DV}",
- "{ICOADS-302-N688EH}",
- "{ICOADS-302-N688EH}",
- None,
- "{ICOADS-302-N688EH}",
- None,
- "{ICOADS-302-N688EI}",
- "{ICOADS-302-N688EI}",
- None,
- None,
- None,
- None,
- ],
-}
-
-exp6 = {
- "duplicate_status": [0, 0, 1, 1, 1, 3, 0, 0, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 0, 0],
- "report_quality": [1, 1, 0, 1, 1, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1],
- "duplicates": [
- None,
- None,
- "{ICOADS-302-N688DW}",
- "{ICOADS-302-N688EC,ICOADS-302-N688EE}",
- "{ICOADS-302-N688EW,ICOADS-302-N688EY}",
- "{ICOADS-302-N688EI}",
- None,
- None,
- None,
- "{ICOADS-302-N688DV}",
- None,
- "{ICOADS-302-N688EH}",
- None,
- "{ICOADS-302-N688EH}",
- None,
- None,
- "{ICOADS-302-N688EI}",
- None,
- None,
- None,
- None,
- ],
-}
-
-exp7 = {
- "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 3, 3],
- "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1],
- "duplicates": [
- None,
- "{ICOADS-302-N688DT,ICOADS-302-N688EM,ICOADS-302-N688EN}",
- "{ICOADS-302-N688DW}",
- "{ICOADS-302-N688EC,ICOADS-302-N688EE}",
- "{ICOADS-302-N688EW,ICOADS-302-N688EY}",
- "{ICOADS-302-N688EI}",
- None,
- "{ICOADS-302-N688DS}",
- None,
- "{ICOADS-302-N688DV}",
- None,
- "{ICOADS-302-N688EH}",
- None,
- "{ICOADS-302-N688EH}",
- None,
- None,
- "{ICOADS-302-N688EI}",
- None,
- None,
- "{ICOADS-302-N688DS}",
- "{ICOADS-302-N688DS}",
- ],
-}
-
-exp8 = {
- "duplicate_status": [0, 1, 1, 3, 1, 3, 0, 3, 0, 3, 0, 3, 1, 3, 3, 0, 3, 0, 0, 3, 3],
- "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1],
- "duplicates": [
- None,
- "{ICOADS-302-N688DT,ICOADS-302-N688EM,ICOADS-302-N688EN}",
- "{ICOADS-302-N688DW}",
- "{ICOADS-302-N688ED}",
- "{ICOADS-302-N688EW,ICOADS-302-N688EY}",
- "{ICOADS-302-N688EI}",
- None,
- "{ICOADS-302-N688DS}",
- None,
- "{ICOADS-302-N688DV}",
- None,
- "{ICOADS-302-N688ED}",
- "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EG,ICOADS-302-N688EH}",
- "{ICOADS-302-N688ED}",
- "{ICOADS-302-N688ED}",
- None,
- "{ICOADS-302-N688EI}",
- None,
- None,
- "{ICOADS-302-N688DS}",
- "{ICOADS-302-N688DS}",
- ],
-}
-
-method_kwargs_ = {
- "left_on": "report_timestamp",
- "window": 7,
- "block_on": ["primary_station_id"],
-}
-
-compare_kwargs_ = {
- "primary_station_id": {"method": "exact"},
- "report_timestamp": {
- "method": "date2",
- "kwargs": {"method": "gauss", "offset": 60.0},
- },
-}
-cdm_icoads = _get_test_data("icoads_r302_d792")
-cdm_icoads.data = _manipulate_header(cdm_icoads.data)
-
-cdm_craid = _get_test_data("craid")
diff --git a/tests/test_databundle.py b/tests/test_databundle.py
index 635247be..94eab1d1 100755
--- a/tests/test_databundle.py
+++ b/tests/test_databundle.py
@@ -6,7 +6,6 @@
from cdm_reader_mapper import DataBundle
from cdm_reader_mapper.common.iterators import ParquetStreamReader
from cdm_reader_mapper.core._utilities import SubscriptableMethod
-from cdm_reader_mapper.duplicates.duplicates import DupDetect
YR = ("core", "YR")
@@ -1036,124 +1035,3 @@ def test_map_model_psr():
)
pd.testing.assert_frame_equal(result.data.read()[expected.columns], expected, check_dtype=False)
-
-
-def test_duplicate_check_single_index():
- data = pd.DataFrame(
- {
- "report_id": ["A", "B", "C", "D", "E", "F"],
- "primary_station_id": ["S1", "S1", "S2", "S2", "S1", "S1"],
- "longitude": [0.1, 0.1, 0.2, 0.1, 0.1, 0.1],
- "latitude": [51.0, 51.2, 52.0, 51.0, 51.0, 51.0],
- "report_timestamp": pd.to_datetime(
- [
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- ]
- ),
- "station_speed": [10.0, 10.0, 8.0, 10.0, 8.0, 10.0],
- "station_course": [90, 90, 180, 90, 60, 90],
- "report_quality": 2,
- ("header", "duplicates"): "",
- ("header", "duplicate_status"): 4,
- ("header", "history"): "",
- }
- )
-
- db = DataBundle(
- data=data,
- )
-
- db_dupdetect = db.duplicate_check()
-
- assert hasattr(db_dupdetect, "DupDetect")
- detector = db_dupdetect.DupDetect
-
- assert isinstance(detector, DupDetect)
- assert detector.data.shape[0] == data.shape[0]
-
- duplicates = db_dupdetect.get_duplicates()
-
- assert isinstance(duplicates, pd.DataFrame)
-
- pd.testing.assert_index_equal(duplicates.index, pd.MultiIndex.from_tuples([(5, 0)]))
-
- flagged = db_dupdetect.flag_duplicates()
-
- pd.testing.assert_series_equal(
- flagged.data["duplicates"],
- pd.Series(["{F}", "", "", "", "", "{A}"], name="duplicates"),
- )
- pd.testing.assert_series_equal(
- flagged.data["duplicate_status"],
- pd.Series([1, 0, 0, 0, 0, 3], name="duplicate_status"),
- )
-
- removed = db_dupdetect.remove_duplicates()
-
- pd.testing.assert_frame_equal(data.iloc[[0, 1, 2, 3, 4]], removed.data)
-
-
-def test_duplicate_check_multi_index():
- data = pd.DataFrame(
- {
- ("header", "report_id"): ["A", "B", "C", "D", "E", "F"],
- ("header", "primary_station_id"): ["S1", "S1", "S2", "S2", "S1", "S1"],
- ("header", "longitude"): [0.1, 0.1, 0.2, 0.1, 0.1, 0.1],
- ("header", "latitude"): [51.0, 51.2, 52.0, 51.0, 51.0, 51.0],
- ("header", "report_timestamp"): pd.to_datetime(
- [
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- ]
- ),
- ("header", "station_speed"): [10.0, 10.0, 8.0, 10.0, 8.0, 10.0],
- ("header", "station_course"): [90, 90, 180, 90, 60, 90],
- ("header", "report_quality"): 2,
- ("header", "duplicates"): "",
- ("header", "duplicate_status"): 4,
- ("header", "history"): "",
- }
- )
-
- db = DataBundle(
- data=data,
- mode="tables",
- )
-
- db_dupdetect = db.duplicate_check()
-
- assert hasattr(db_dupdetect, "DupDetect")
- detector = db_dupdetect.DupDetect
-
- assert isinstance(detector, DupDetect)
- assert detector.data.shape[0] == data.shape[0]
-
- duplicates = db_dupdetect.get_duplicates()
-
- assert isinstance(duplicates, pd.DataFrame)
-
- pd.testing.assert_index_equal(duplicates.index, pd.MultiIndex.from_tuples([(5, 0)]))
-
- flagged = db_dupdetect.flag_duplicates()
-
- pd.testing.assert_series_equal(
- flagged.data[("header", "duplicates")],
- pd.Series(["{F}", "", "", "", "", "{A}"], name=("header", "duplicates")),
- )
- pd.testing.assert_series_equal(
- flagged.data[("header", "duplicate_status")],
- pd.Series([1, 0, 0, 0, 0, 3], name=("header", "duplicate_status")),
- )
-
- removed = db_dupdetect.remove_duplicates()
-
- pd.testing.assert_frame_equal(data.iloc[[0, 1, 2, 3, 4]], removed.data)
diff --git a/tests/test_duplicates.py b/tests/test_duplicates.py
deleted file mode 100755
index f4fe5016..00000000
--- a/tests/test_duplicates.py
+++ /dev/null
@@ -1,385 +0,0 @@
-from __future__ import annotations
-
-import pandas as pd
-import pytest
-
-from cdm_reader_mapper.duplicates._duplicate_settings import (
- Compare,
- _compare_kwargs,
- _histories,
- _method_kwargs,
-)
-from cdm_reader_mapper.duplicates.duplicates import (
- Comparer,
- DupDetect,
- add_duplicates,
- add_history,
- add_report_quality,
- change_offsets,
- convert_series,
- duplicate_check,
- reindex_nulls,
- remove_ignores,
- set_comparer,
-)
-
-
-def test_convert_series_basic():
- df = pd.DataFrame({"a": ["1", "2", "3"], "b": ["10.5", "20.5", "30.5"]})
- conversion = {"a": "int", "b": "float"}
-
- expected = pd.DataFrame({"a": [1, 2, 3], "b": [10.5, 20.5, 30.5]})
-
- result = convert_series(df, conversion)
- pd.testing.assert_frame_equal(result, expected)
-
-
-def test_convert_series_null_replacement():
- df = pd.DataFrame({"a": ["1", None, "3"], "b": [None, "2.5", None]})
- conversion = {"a": "float", "b": "float"}
-
- expected = pd.DataFrame({"a": [1.0, 9999.0, 3.0], "b": [9999.0, 2.5, 9999.0]})
-
- result = convert_series(df, conversion)
- pd.testing.assert_frame_equal(result, expected)
-
-
-def test_convert_series_date_to_float():
- df = pd.DataFrame({"date": ["2023-01-01", "2023-01-02", "2023-01-03"]})
- conversion = {"date": "convert_date_to_float"}
-
- result = convert_series(df, conversion)
- expected = pd.DataFrame({"date": [0.0, 86400.0, 172800.0]})
-
- pd.testing.assert_frame_equal(result, expected)
-
-
-def test_convert_series_mixed():
- df = pd.DataFrame(
- {
- "num": ["1", None, "3"],
- "val": ["10.5", "20.5", None],
- "date": ["2023-01-01", None, "2023-01-03"],
- }
- )
- conversion = {"num": "Int64", "val": "float", "date": "convert_date_to_float"}
-
- result = convert_series(df, conversion)
- expected = pd.DataFrame(
- {
- "num": [1, 9999, 3],
- "val": [10.5, 20.5, 9999.0],
- "date": [0.0, 9999.0, 172800.0],
- }
- )
-
- pd.testing.assert_frame_equal(result, expected, check_dtype=False)
-
-
-def test_add_history_basic():
- df = pd.DataFrame({"value": [10, 20, 30], "history": ["", "", ""]})
-
- updated_df = add_history(df, [0, 2])
-
- for idx in [0, 2]:
- for msg in _histories.values():
- assert msg in updated_df.loc[idx, "history"]
-
- assert updated_df.loc[1, "history"] == ""
-
-
-def test_add_history_creates_column():
- df = pd.DataFrame({"value": [1, 2, 3]})
-
- updated_df = add_history(df, [0])
- for msg in _histories.values():
- assert msg in updated_df.loc[0, "history"]
- assert updated_df.loc[1, "history"] == ""
-
-
-def test_add_duplicates_strings():
- df = pd.DataFrame({"report_id": ["A", "B", "C", "D"]})
-
- dups = pd.DataFrame(
- {
- 0: [["B", "C"], ["D"]],
- },
- index=[0, 2],
- )
-
- updated = add_duplicates(df, dups)
-
- assert updated.loc[0, "duplicates"] == "{B,C}"
- assert updated.loc[2, "duplicates"] == "{D}"
- assert updated.loc[1, "duplicates"] == ""
- assert updated.loc[3, "duplicates"] == ""
-
-
-def test_add_duplicates_indices():
- df = pd.DataFrame({"report_id": ["A", "B", "C", "D"]})
-
- dups = pd.DataFrame({0: [[1, 2], [3]]}, index=[0, 2])
-
- updated = add_duplicates(df, dups)
-
- assert updated.loc[0, "duplicates"] == "{B,C}"
- assert updated.loc[2, "duplicates"] == "{D}"
-
-
-@pytest.mark.parametrize(
- "initial, indexes_bad, expected",
- [
- ([0, 0, 0], [1], [0, 1, 0]),
- ([0, 0, 2], [0, 2], [1, 0, 1]),
- ([1, 2, 3], [], [1, 2, 3]),
- ],
-)
-def test_add_report_quality(initial, indexes_bad, expected):
- df = pd.DataFrame({"report_quality": initial})
- result = add_report_quality(df, indexes_bad)
- pd.testing.assert_series_equal(
- result["report_quality"],
- pd.Series(expected, name="report_quality"),
- check_dtype=False,
- )
-
-
-def test_set_comparer():
- compare_dict = {
- "col1": {"method": "exact"},
- "col2": {"method": "numeric", "kwargs": {"method": "step", "offset": 0.1}},
- "col3": {"method": "date2"},
- }
- comparer = set_comparer(compare_dict)
- assert isinstance(comparer, Compare)
- assert comparer.conversion["col2"] is float
- assert comparer.conversion["col3"] == "convert_date_to_float"
-
-
-def test_remove_ignores():
- dic = {"a": 1, "b": ["x", "y"], "c": "z"}
- filtered = remove_ignores(dic, ["b", "c"])
- assert "b" not in filtered
- assert "c" not in filtered
- assert "a" in filtered
-
-
-def test_change_offsets():
- dic = {"col1": {"kwargs": {"offset": 0.1}}, "col2": {"kwargs": {"offset": 0.2}}}
- new_offsets = {"col1": 0.5}
- updated = change_offsets(dic, new_offsets)
- assert updated["col1"]["kwargs"]["offset"] == 0.5
- assert updated["col2"]["kwargs"]["offset"] == 0.2
-
-
-def test_reindex_nulls_orders_by_null_count():
- df = pd.DataFrame({"a": ["null", 1, "null", 2], "b": ["null", 2, 3, "null"]})
- result = reindex_nulls(df, null_label="null")
-
- expected_order = [1, 2, 3, 0]
- assert list(result.index) == expected_order
-
-
-def test_reindex_nulls_empty_df():
- df = pd.DataFrame()
- result = reindex_nulls(df, null_label="null")
- assert result.equals(df)
-
-
-def test_comparer_basic():
- df = pd.DataFrame(
- {
- "report_id": ["A", "B", "C"],
- "primary_station_id": ["S1", "S1", "S2"],
- "longitude": [0.1, 0.15, 0.2],
- "latitude": [51.0, 51.01, 52.0],
- "report_timestamp": pd.to_datetime(["2023-01-01 00:00", "2023-01-01 00:01", "2023-01-02 00:00"]),
- "station_speed": [10.0, 12.0, 8.0],
- "station_course": [90, 180, 270],
- }
- )
-
- comp = Comparer(
- data=df,
- method="SortedNeighbourhood",
- method_kwargs=_method_kwargs,
- compare_kwargs=_compare_kwargs,
- convert_data=True,
- )
-
- assert isinstance(comp.data, pd.DataFrame)
- assert isinstance(comp.compared, pd.DataFrame)
- assert "primary_station_id" in comp.compared.columns
-
-
-def test_duplicate_check_basic():
- df = pd.DataFrame(
- {
- "report_id": ["A", "B"],
- "primary_station_id": ["S1", "S2"],
- "longitude": [10, 20],
- "latitude": [50, 60],
- "report_timestamp": pd.to_datetime(["2023-01-01", "2023-01-02"]),
- "station_speed": [5, 6],
- "station_course": [100, 200],
- }
- )
- detector = duplicate_check(df, method="SortedNeighbourhood")
- assert isinstance(detector, DupDetect)
- assert detector.data.shape[0] == df.shape[0]
-
-
-@pytest.fixture
-def dummy_data():
- df = pd.DataFrame(
- {
- "report_id": ["A", "B", "C", "D", "E", "F"],
- "primary_station_id": ["S1", "S1", "S2", "S2", "S1", "S1"],
- "longitude": [0.1, 0.1, 0.2, 0.1, 0.1, 0.1],
- "latitude": [51.0, 51.2, 52.0, 51.0, 51.0, 51.0],
- "report_timestamp": pd.to_datetime(
- [
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- "2023-01-01 00:00",
- ]
- ),
- "station_speed": [10.0, 10.0, 8.0, 10.0, 8.0, 10.0],
- "station_course": [90, 90, 180, 90, 60, 90],
- "report_quality": 2,
- }
- )
- df.index = [0, 1, 2, 3, 4, 5]
- return df
-
-
-@pytest.mark.parametrize(
- "kwargs, exp_ids",
- [
- ({}, [(5, 0)]),
- ({"offsets": {"latitude": 0.22}}, [(1, 0), (5, 0), (5, 1)]),
- (
- {"ignore_columns": ["station_speed", "station_course"]},
- [(4, 0), (5, 0), (5, 4)],
- ),
- ({"ignore_entries": {"primary_station_id": "S2"}}, [(5, 0), (3, 0), (3, 5)]),
- ({"ignore_entries": {"primary_station_id": ["S2"]}}, [(5, 0), (3, 0), (3, 5)]),
- ],
-)
-def test_get_duplicates_kwargs(dummy_data, kwargs, exp_ids):
- dd = duplicate_check(
- dummy_data,
- method="SortedNeighbourhood",
- **kwargs,
- )
-
- assert hasattr(dd, "compared")
-
- dd.get_duplicates()
-
- assert hasattr(dd, "matches")
-
- pd.testing.assert_index_equal(dd.matches.index, pd.MultiIndex.from_tuples(exp_ids))
-
-
-def test_duplicate_check_reindex(dummy_data):
- dd = duplicate_check(
- dummy_data,
- method="SortedNeighbourhood",
- reindex_by_null=False,
- )
-
- assert hasattr(dd, "compared")
-
- result = dd.compared
-
- exp_idx = pd.MultiIndex.from_tuples([(1, 0), (3, 2), (4, 0), (4, 1), (5, 0), (5, 1), (5, 4)])
- pd.testing.assert_index_equal(dd.compared.index, exp_idx)
-
- assert list(result.columns) == [
- "primary_station_id",
- "longitude",
- "latitude",
- "report_timestamp",
- "station_speed",
- "station_course",
- ]
-
-
-def test_get_duplicates_limit_and_equal_musts(dummy_data):
- dd = duplicate_check(dummy_data, method="SortedNeighbourhood")
-
- matches_default = dd.get_duplicates(keep="first", limit=0.5)
- expected_indexes = pd.MultiIndex.from_tuples([(5, 0)])
- pd.testing.assert_index_equal(matches_default.index, expected_indexes)
-
- matches_eq_str = dd.get_duplicates(keep="first", equal_musts="primary_station_id")
- expected_indexes = pd.MultiIndex.from_tuples([(5, 0)])
- pd.testing.assert_index_equal(matches_eq_str.index, expected_indexes)
-
- matches_eq_list = dd.get_duplicates(keep="first", equal_musts=["primary_station_id", "longitude"])
- expected_indexes = pd.MultiIndex.from_tuples([(5, 0)])
- pd.testing.assert_index_equal(matches_eq_list.index, expected_indexes)
-
-
-@pytest.mark.parametrize(
- "keep, exp_duplicate_status, exp_duplicates",
- [
- ("first", [1, 0, 0, 0, 0, 3], ["{F}", "", "", "", "", "{A}"]),
- ("last", [3, 0, 0, 0, 0, 1], ["{F}", "", "", "", "", "{A}"]),
- (0, [3, 0, 0, 0, 0, 1], ["{F}", "", "", "", "", "{A}"]),
- (-1, [1, 0, 0, 0, 0, 3], ["{F}", "", "", "", "", "{A}"]),
- ],
-)
-def test_flag_duplicates(dummy_data, keep, exp_duplicate_status, exp_duplicates):
- dd = duplicate_check(dummy_data, method="SortedNeighbourhood")
-
- result = dd.flag_duplicates(keep=keep)
-
- assert "duplicate_status" in result.columns
- assert "duplicates" in result.columns
- assert "history" in result.columns
-
- expected_duplicate_status = pd.Series(exp_duplicate_status, name="duplicate_status")
- expected_duplicates = pd.Series(exp_duplicates, name="duplicates")
-
- pd.testing.assert_series_equal(result["duplicate_status"], expected_duplicate_status)
- pd.testing.assert_series_equal(result["duplicates"], expected_duplicates)
-
-
-@pytest.mark.parametrize(
- "keep, exp_idx",
- [
- ("first", [0, 1, 2, 3, 4]),
- ("last", [1, 2, 3, 4, 5]),
- (0, [1, 2, 3, 4, 5]),
- (-1, [0, 1, 2, 3, 4]),
- ],
-)
-def test_remove_duplicates(dummy_data, keep, exp_idx):
- dd = duplicate_check(dummy_data, method="SortedNeighbourhood")
-
- result = dd.remove_duplicates(keep=keep)
- pd.testing.assert_index_equal(result.index, pd.Index(exp_idx))
-
-
-def test_get_total_score(dummy_data):
- dd = duplicate_check(dummy_data, method="SortedNeighbourhood")
- dd._total_score()
-
- assert hasattr(dd, "score")
-
- expected = pd.Series(
- [5.0 / 6.0, 0.5, 2.0 / 3.0, 0.5, 1.0, 5.0 / 6.0, 2.0 / 3.0],
- index=pd.MultiIndex.from_tuples([(1, 0), (3, 2), (4, 0), (4, 1), (5, 0), (5, 1), (5, 4)]),
- )
- pd.testing.assert_series_equal(dd.score, expected)
-
-
-def test_get_duplicates_raises(dummy_data):
- dd = duplicate_check(dummy_data)
- with pytest.raises(ValueError):
- dd.get_duplicates(keep=1)
diff --git a/tests/test_duplicates_data.py b/tests/test_duplicates_data.py
deleted file mode 100755
index 71577122..00000000
--- a/tests/test_duplicates_data.py
+++ /dev/null
@@ -1,121 +0,0 @@
-from __future__ import annotations
-
-import pandas as pd
-import pytest
-from numpy.testing import assert_array_equal
-from pandas.testing import assert_frame_equal
-
-from ._duplicates import (
- cdm_craid,
- cdm_icoads,
- compare_kwargs_,
- exp1,
- exp2,
- exp3,
- exp4,
- exp5,
- exp6,
- exp7,
- exp8,
- method_kwargs_,
-)
-
-
-@pytest.mark.parametrize(
- "method, method_kwargs, compare_kwargs, ignore_columns, ignore_entries, offsets, expected",
- [
- (None, None, None, None, None, None, exp1),
- (
- None,
- None,
- None,
- None,
- {"primary_station_id": ["SHIP", "MASKSTID"]},
- None,
- exp2,
- ),
- (
- None,
- None,
- None,
- None,
- {"station_speed": pd.NA, "station_course": pd.NA},
- None,
- exp7,
- ),
- (
- None,
- None,
- None,
- None,
- {
- "primary_station_id": ["SHIP", "MASKSTID"],
- "station_speed": pd.NA,
- "station_course": pd.NA,
- },
- None,
- exp8,
- ),
- (None, method_kwargs_, None, None, None, None, exp1),
- (None, None, compare_kwargs_, None, None, None, exp3),
- (None, None, None, ["primary_station_id"], None, None, exp4),
- (
- None,
- None,
- None,
- None,
- None,
- {"latitude": 1.0, "longitude": 1.0, "report_timestamp": 360},
- exp5,
- ),
- ("Block", {"left_on": "report_timestamp"}, None, None, None, None, exp6),
- ],
-)
-def test_duplicates_flag(
- method,
- method_kwargs,
- compare_kwargs,
- ignore_columns,
- ignore_entries,
- offsets,
- expected,
-):
- if method is None:
- method = "SortedNeighbourhood"
- cdm_icoads.duplicate_check(
- method=method,
- method_kwargs=method_kwargs,
- compare_kwargs=compare_kwargs,
- ignore_columns=ignore_columns,
- ignore_entries=ignore_entries,
- offsets=offsets,
- inplace=True,
- )
- tables_dups_flagged = cdm_icoads.flag_duplicates()
- result = tables_dups_flagged["header"]
- assert_array_equal(result["duplicate_status"], expected["duplicate_status"])
- assert_array_equal(result["report_quality"], expected["report_quality"])
- assert_array_equal(result["duplicates"], expected["duplicates"])
-
-
-def test_duplicates_remove():
- cdm_icoads.duplicate_check(
- ignore_entries={
- "primary_station_id": ["SHIP", "MASKSTID"],
- "station_speed": pd.NA,
- "station_course": pd.NA,
- },
- inplace=True,
- )
-
- tables_dups_removed = cdm_icoads.remove_duplicates().data
- expected = cdm_icoads.iloc[[0, 1, 2, 4, 6, 8, 10, 12, 15, 17, 18]]
- assert_frame_equal(expected, tables_dups_removed)
-
-
-def test_duplicates_craid():
- cdm_craid.duplicate_check(ignore_columns="primary_station_id", inplace=True)
- cdm_craid.flag_duplicates(inplace=True)
- assert_array_equal(cdm_craid[("header", "duplicate_status")], [0] * 10)
- assert_array_equal(cdm_craid[("header", "report_quality")], [2] * 10)
- assert_array_equal(cdm_craid[("header", "duplicates")], [None] * 10)