diff --git a/jobs/epic-cron/Dockerfile b/jobs/epic-cron/Dockerfile index 7129f12..30c1d14 100644 --- a/jobs/epic-cron/Dockerfile +++ b/jobs/epic-cron/Dockerfile @@ -44,7 +44,12 @@ COPY ./requirements.txt . COPY ./requirements/ ./requirements/ RUN pip install --upgrade pip -RUN pip install --no-cache-dir -r requirements.txt +# The editable repo libraries currently have incompatible transitive pins. Install +# the cron image dependency set first, then install the repo libraries themselves +# without re-resolving their install_requires. +RUN sed '/^-e git+/d' requirements.txt > /tmp/requirements-no-repos.txt \ + && pip install --no-cache-dir -r /tmp/requirements-no-repos.txt \ + && pip install --no-cache-dir --no-deps -r requirements/repo-libraries.txt # Copy the rest of the application after pip install to avoid conflicts with git dependencies COPY . . @@ -64,4 +69,4 @@ RUN chown -R $user:root $HOME \ && chmod g-w $HOME/cron/crontab USER $user -ENTRYPOINT ["bash", "docker-entrypoint.sh"] \ No newline at end of file +ENTRYPOINT ["bash", "docker-entrypoint.sh"] diff --git a/jobs/epic-cron/config.py b/jobs/epic-cron/config.py index 65d37a3..b638347 100644 --- a/jobs/epic-cron/config.py +++ b/jobs/epic-cron/config.py @@ -107,8 +107,11 @@ class _Config(): # pylint: disable=too-few-public-methods CONDITION_API_BASE_URL = os.getenv("CONDITION_API_BASE_URL") EPIC_PUBLIC_BASE_URL = os.getenv("EPIC_PUBLIC_BASE_URL", "https://projects.eao.gov.bc.ca") + EPIC_PUBLIC_SEARCH_PATH = os.getenv("EPIC_PUBLIC_SEARCH_PATH", "/api/public/search") EPIC_PUBLIC_DOCUMENT_TYPE_IDS = os.getenv("EPIC_PUBLIC_DOCUMENT_TYPE_IDS", "") EPIC_PUBLIC_DOCUMENT_TYPE_ID_MAP = os.getenv("EPIC_PUBLIC_DOCUMENT_TYPE_ID_MAP", "") + EPIC_PUBLIC_MAX_PAGES = os.getenv("EPIC_PUBLIC_MAX_PAGES", "") + EPIC_PUBLIC_MAX_DOCUMENTS = os.getenv("EPIC_PUBLIC_MAX_DOCUMENTS", "") KEYCLOAK_BASE_URL = os.getenv('KEYCLOAK_BASE_URL') KEYCLOAK_REALM_NAME = os.getenv('KEYCLOAK_REALM_NAME', 'eao-epic') SERVICE_ACCOUNT_ID = os.getenv('SERVICE_ACCOUNT_ID') diff --git a/jobs/epic-cron/requirements.txt b/jobs/epic-cron/requirements.txt index 6cd9a27..a49efd4 100644 --- a/jobs/epic-cron/requirements.txt +++ b/jobs/epic-cron/requirements.txt @@ -1,58 +1,64 @@ -Flask-Caching==2.3.1 -Flask-Mail==0.10.0 -Flask-Moment==1.0.6 -Flask-SQLAlchemy==3.1.1 -Flask==3.1.3 -Jinja2==3.1.6 -MarkupSafe==3.0.3 -PyJWT==2.12.0 -SQLAlchemy-Continuum==1.6.0 -SQLAlchemy-Utils==0.42.1 -SQLAlchemy==2.0.48 -Werkzeug==3.1.6 aniso8601==10.0.1 -attrs==25.4.0 +attrs==26.1.0 +beautifulsoup4==4.13.3 blinker==1.9.0 cachelib==0.13.0 certifi==2026.2.25 -cffi==2.0.0 -charset-normalizer==3.4.5 +charset-normalizer==3.4.7 clamd==1.0.2 -click==8.3.1 -cryptography==46.0.5 -flake8_import_order==0.19.2 +click==8.3.2 +ecdsa==0.19.2 +Flask-Caching==2.3.1 flask-cors==6.0.2 flask-jwt-oidc==0.7.0 -flask-marshmallow==1.2.1 +Flask-Migrate==4.1.0 +Flask-Mail==0.10.0 +flask-marshmallow==1.4.0 +Flask-Moment==1.0.6 flask-restx==1.3.2 -gunicorn==25.1.0 +Flask-SQLAlchemy==3.1.1 +Flask==3.1.3 +fiona==1.10.1 +geopandas==1.1.3 +gunicorn==25.3.0 idna==3.11 importlib_resources==6.5.2 itsdangerous==2.2.0 +Jinja2==3.1.6 jsonschema-specifications==2025.9.1 jsonschema==4.26.0 +lxml==5.1.0 +MarkupSafe==3.0.3 marshmallow-enum==1.5.1 marshmallow-sqlalchemy==1.0.0 marshmallow==3.18.0 +openpyxl==3.1.5 packaging==26.0 +pandas==2.3.1 psycopg2-binary==2.9.11 -pycodestyle==2.14.0 -pycparser==3.0 -pyflakes==3.4.0 +pyasn1==0.6.3 +pycryptodome==3.20.0 pyhumps==3.8.0 +pyproj==3.7.2 +python-docx==1.1.2 python-dotenv==1.2.2 +python-jose==3.5.0 pytz==2026.1.post1 referencing==0.37.0 -requests==2.32.5 +requests==2.33.1 rpds-py==0.30.0 +rsa==4.9.1 secure==1.0.1 -setuptools==82.0.1 +shapely==2.1.2 six==1.17.0 -tomli==2.4.0 +SQLAlchemy-Continuum==1.6.0 +SQLAlchemy-Utils==0.42.1 +SQLAlchemy==2.0.49 +tenacity==9.0.0 typing_extensions==4.15.0 urllib3==2.6.3 -zimports==0.6.3 +Werkzeug==3.1.8 -e git+https://github.com/bcgov/EPIC.submit.git@develop#egg=submit-api&subdirectory=submit-api #-e git+https://github.com/bcgov/EPIC.track.git@develop#egg=api&subdirectory=epictrack-api -e git+https://github.com/bcgov/EPIC.compliance.git@develop#egg=compliance-api&subdirectory=compliance-api --e git+https://github.com/bcgov/EPIC.conditions.git@main#egg=condition-api&subdirectory=condition-api \ No newline at end of file +-e git+https://github.com/bcgov/EPIC.conditions.git@main#egg=condition-api&subdirectory=condition-api diff --git a/jobs/epic-cron/requirements/prod.txt b/jobs/epic-cron/requirements/prod.txt index c6859ce..ce11668 100644 --- a/jobs/epic-cron/requirements/prod.txt +++ b/jobs/epic-cron/requirements/prod.txt @@ -5,7 +5,7 @@ Flask-Moment Flask-SQLAlchemy SQLAlchemy-Continuum flask-restx -flask-marshmallow==1.2.1 +flask-marshmallow==1.4.0 flask-jwt-oidc==0.7.0 python-dotenv psycopg2-binary @@ -20,10 +20,22 @@ sqlalchemy-utils Flask-Caching sqlalchemy secure +Flask-Migrate +geopandas +fiona +pyproj +shapely +beautifulsoup4==4.13.3 +lxml==5.1.0 +openpyxl==3.1.5 +pandas==2.3.1 +pycryptodome==3.20.0 +python-docx==1.1.2 +tenacity==9.0.0 python-dotenv requests flask_cors pyhumps importlib-resources clamd -pytz \ No newline at end of file +pytz diff --git a/jobs/epic-cron/sample.env b/jobs/epic-cron/sample.env index 3a7ddb4..2c381b5 100644 --- a/jobs/epic-cron/sample.env +++ b/jobs/epic-cron/sample.env @@ -60,17 +60,28 @@ CONDITION_API_BASE_URL=http://localhost:5000 # Base URL for the BC EPIC Public API used to fetch published documents EPIC_PUBLIC_BASE_URL=https://projects.eao.gov.bc.ca +# EPIC Public Search Path +# Override only if the deployment exposes search on a non-default route. +EPIC_PUBLIC_SEARCH_PATH=/api/public/search + # EPIC Public Document Type IDs (comma-separated) # Filters document fetching to specific EPIC document types. -# Leave unset to use the keys from EPIC_PUBLIC_DOCUMENT_TYPE_ID_MAP (or the built-in defaults). +# Leave unset to use the keys from EPIC_PUBLIC_DOCUMENT_TYPE_ID_MAP. +# If both this value and the map below are empty, the extractor fetches all published PROJECT documents. EPIC_PUBLIC_DOCUMENT_TYPE_IDS= # EPIC Public Document Type ID Map (comma-separated epicId:conditionTypeId pairs) # Maps EPIC Public type IDs to condition repo document_type_id values. -# Leave unset to use the built-in defaults in EpicPublicService.DEFAULT_DOCUMENT_TYPE_ID_MAP. +# Leave unset to skip type-specific mapping and use the default condition document type ID. # Example: 5cf00c03a266b7e1877504d1:3 EPIC_PUBLIC_DOCUMENT_TYPE_ID_MAP= +# EPIC Public Local Debug Limits +# Leave unset in normal/prod runs. +# Set these locally to reduce the number of API calls and documents processed. +EPIC_PUBLIC_MAX_PAGES= +EPIC_PUBLIC_MAX_DOCUMENTS= + # ------------------------------------------------------------------------------ # KEYCLOAK AUTHENTICATION # ------------------------------------------------------------------------------ diff --git a/jobs/epic-cron/src/epic_cron/services/epic_public_service.py b/jobs/epic-cron/src/epic_cron/services/epic_public_service.py index 66d7025..6587461 100644 --- a/jobs/epic-cron/src/epic_cron/services/epic_public_service.py +++ b/jobs/epic-cron/src/epic_cron/services/epic_public_service.py @@ -12,6 +12,19 @@ class EpicPublicService: RETRY_DELAY = 5 # seconds between retries on transient errors DEFAULT_DOCUMENT_TYPE_ID = 1 + DEFAULT_SEARCH_PATH = "/api/public/search" + + @classmethod + def _get_optional_int_config(cls, key): + value = current_app.config.get(key, "") + if value in (None, ""): + return None + try: + parsed = int(value) + return parsed if parsed > 0 else None + except (TypeError, ValueError): + current_app.logger.warning("Invalid integer for %s: %r", key, value) + return None @classmethod def _get_document_type_id_map(cls): @@ -48,8 +61,30 @@ def fetch_all_documents(cls): Returns: list[dict]: Combined list of mapped document dicts from all types. """ + type_ids = cls._get_document_type_ids() + current_app.logger.info( + "EPIC Public fetch starting with base_url=%s search_path=%s configured_type_ids=%s " + "type_map_size=%s max_pages=%s max_documents=%s", + current_app.config.get("EPIC_PUBLIC_BASE_URL", "https://projects.eao.gov.bc.ca"), + current_app.config.get("EPIC_PUBLIC_SEARCH_PATH", cls.DEFAULT_SEARCH_PATH), + type_ids, + len(cls._get_document_type_id_map()), + cls._get_optional_int_config("EPIC_PUBLIC_MAX_PAGES"), + cls._get_optional_int_config("EPIC_PUBLIC_MAX_DOCUMENTS"), + ) + + if not type_ids: + current_app.logger.warning( + "No EPIC Public document type IDs configured; fetching all published PROJECT " + "documents without a type filter." + ) + raw_docs = cls._fetch_documents_by_type() + mapped = cls._map_documents(raw_docs) + current_app.logger.info(f"Fetched {len(mapped)} documents without type filtering.") + return mapped + all_documents = [] - for type_id in cls._get_document_type_ids(): + for type_id in type_ids: raw_docs = cls._fetch_documents_by_type(type_id) mapped = cls._map_documents(raw_docs, type_id) all_documents.extend(mapped) @@ -59,21 +94,27 @@ def fetch_all_documents(cls): return all_documents @classmethod - def _fetch_documents_by_type(cls, type_id): + def _fetch_documents_by_type(cls, type_id=None): """Fetch all documents for a single document type, paginating until exhausted. Args: - type_id: EPIC Public document type ID. + type_id: Optional EPIC Public document type ID. Returns: list[dict]: Raw document dicts for this type. """ base_url = current_app.config.get("EPIC_PUBLIC_BASE_URL", "https://projects.eao.gov.bc.ca") - endpoint = f"{base_url}/api/public/search" + search_path = current_app.config.get("EPIC_PUBLIC_SEARCH_PATH", cls.DEFAULT_SEARCH_PATH) + endpoint = f"{base_url}{search_path}" page_num = 0 documents = [] + max_pages = cls._get_optional_int_config("EPIC_PUBLIC_MAX_PAGES") + max_documents = cls._get_optional_int_config("EPIC_PUBLIC_MAX_DOCUMENTS") - current_app.logger.info(f"Fetching documents for type: {type_id}") + if type_id: + current_app.logger.info(f"Fetching documents for type: {type_id}") + else: + current_app.logger.info("Fetching documents without type filter") while True: params = { @@ -81,18 +122,43 @@ def _fetch_documents_by_type(cls, type_id): "pageNum": page_num, "pageSize": cls.DOCUMENT_PAGE_SIZE, "projectLegislation": "default", - "sortBy": "-datePosted,", + "sortBy": "-datePosted", "populate": "true", "fields": "", - "fuzzy": "true", + "fuzzy": "false", "and[documentSource]": "PROJECT", - "and[type]": type_id, } + if type_id: + params["and[type]"] = type_id + + current_app.logger.info( + "Requesting EPIC Public documents endpoint=%s type_id=%s page=%s page_size=%s params=%s", + endpoint, + type_id, + page_num, + cls.DOCUMENT_PAGE_SIZE, + params, + ) data = cls._fetch_page(endpoint, params, type_id, page_num) results = data[0].get("searchResults", []) if isinstance(data, list) and data else [] + meta = data[0].get("meta", []) if isinstance(data, list) and data else [] + total = meta[0].get("searchResultsTotal") if meta and isinstance(meta[0], dict) else None + + current_app.logger.info( + "Received EPIC Public response type_id=%s page=%s result_count=%s reported_total=%s", + type_id, + page_num, + len(results), + total, + ) if not results: + current_app.logger.warning( + "No EPIC Public documents returned for type_id=%s on page=%s. Stopping pagination.", + type_id, + page_num, + ) break documents.extend(results) @@ -100,12 +166,32 @@ def _fetch_documents_by_type(cls, type_id): f" Type {type_id} | Page {page_num}: fetched {len(results)} documents" ) + if max_documents and len(documents) >= max_documents: + documents = documents[:max_documents] + current_app.logger.warning( + "Stopping EPIC Public fetch early because EPIC_PUBLIC_MAX_DOCUMENTS=%s was reached " + "for type_id=%s.", + max_documents, + type_id, + ) + break + if len(results) < cls.DOCUMENT_PAGE_SIZE: break + if max_pages and (page_num + 1) >= max_pages: + current_app.logger.warning( + "Stopping EPIC Public fetch early because EPIC_PUBLIC_MAX_PAGES=%s was reached " + "for type_id=%s.", + max_pages, + type_id, + ) + break + page_num += 1 - current_app.logger.info(f" Type {type_id}: {len(documents)} documents total") + scope = f"Type {type_id}" if type_id else "Unfiltered search" + current_app.logger.info(f" {scope}: {len(documents)} documents total") return documents @classmethod @@ -113,6 +199,13 @@ def _fetch_page(cls, endpoint, params, type_id, page_num): """Fetch a single page with retry on transient server errors (5xx).""" for attempt in range(1, cls.MAX_RETRIES + 1): try: + current_app.logger.info( + "Fetching EPIC Public page attempt=%s/%s type_id=%s page=%s", + attempt, + cls.MAX_RETRIES, + type_id, + page_num, + ) response = requests.get(endpoint, params=params, timeout=60) response.raise_for_status() return response.json() @@ -141,25 +234,32 @@ def _fetch_page(cls, endpoint, params, type_id, page_num): raise @classmethod - def _map_documents(cls, raw_docs, type_id): + def _map_documents(cls, raw_docs, type_id=None): """Map raw EPIC Public document records to the format expected by the extractor. Args: raw_docs: Raw document dicts from the EPIC Public API. - type_id: The EPIC Public type ID used to fetch these docs. + type_id: The optional EPIC Public type ID used to fetch these docs. Returns: list[dict]: Mapped documents, skipping any with missing required fields. """ mapped = [] condition_type_id = cls._get_document_type_id_map().get(type_id, cls.DEFAULT_DOCUMENT_TYPE_ID) + skipped_missing_document_id = 0 + skipped_missing_project_id = 0 for item in raw_docs: document_id = item.get("_id") project = item.get("project") or {} project_id = project.get("_id") if isinstance(project, dict) else project - if not document_id or not project_id: + if not document_id: + skipped_missing_document_id += 1 + continue + + if not project_id: + skipped_missing_project_id += 1 continue mapped.append({ @@ -172,4 +272,13 @@ def _map_documents(cls, raw_docs, type_id): "document_type_id": condition_type_id, }) + current_app.logger.info( + "Mapped EPIC Public documents type_id=%s mapped=%s skipped_missing_document_id=%s " + "skipped_missing_project_id=%s condition_type_id=%s", + type_id, + len(mapped), + skipped_missing_document_id, + skipped_missing_project_id, + condition_type_id, + ) return mapped diff --git a/jobs/epic-cron/tasks/epic_public_extractor.py b/jobs/epic-cron/tasks/epic_public_extractor.py index edf7855..1412c9a 100644 --- a/jobs/epic-cron/tasks/epic_public_extractor.py +++ b/jobs/epic-cron/tasks/epic_public_extractor.py @@ -15,6 +15,13 @@ class EpicPublicExtractor: def do_sync(cls): """Perform the sync from EPIC Public to the Condition Repo.""" current_app.logger.info(f"Starting EPIC Public Extractor at {datetime.now()}") + current_app.logger.info( + "EPIC Public extractor config summary: base_url=%s search_path=%s type_ids=%s type_map=%s", + current_app.config.get("EPIC_PUBLIC_BASE_URL"), + current_app.config.get("EPIC_PUBLIC_SEARCH_PATH", "/api/public/search"), + current_app.config.get("EPIC_PUBLIC_DOCUMENT_TYPE_IDS", ""), + current_app.config.get("EPIC_PUBLIC_DOCUMENT_TYPE_ID_MAP", ""), + ) target_session = init_conditions_db(current_app) @@ -39,6 +46,8 @@ def _sync_documents(cls, documents, target_session): failed = 0 project_not_found = 0 + project_not_found_examples = [] + existing_examples = [] with target_session() as session: for doc in documents: @@ -49,6 +58,11 @@ def _sync_documents(cls, documents, target_session): ).first() if not project_exists: project_not_found += 1 + if len(project_not_found_examples) < 10: + project_not_found_examples.append({ + "document_id": doc["document_id"], + "project_id": doc["project_id"], + }) current_app.logger.debug( f"Skipping document {doc['document_id']}: " f"project {doc['project_id']} not found in Condition Repo." @@ -61,6 +75,8 @@ def _sync_documents(cls, documents, target_session): if existing: skipped += 1 + if len(existing_examples) < 10: + existing_examples.append(doc["document_id"]) continue # Parse date_issued from ISO string @@ -109,3 +125,13 @@ def _sync_documents(cls, documents, target_session): f"Document sync complete: {inserted} inserted, {skipped} skipped (existing), " f"{project_not_found} skipped (project not loaded), {failed} failed." ) + if project_not_found_examples: + current_app.logger.info( + "Sample documents skipped because project was not loaded: %s", + project_not_found_examples, + ) + if existing_examples: + current_app.logger.info( + "Sample existing documents skipped: %s", + existing_examples, + )