diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..c903a16 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,6 @@ +# CODEOWNERS +# These owners will be the default owners for everything in +# the repo. Unless a later match takes precedence, +# @garyedwards and @gnathoir will be requested for +# review when someone opens a pull request. +* @tomtitherington @garyedwards @gnathoi diff --git a/.gitignore b/.gitignore index 856c297..6685d36 100644 --- a/.gitignore +++ b/.gitignore @@ -30,7 +30,7 @@ MANIFEST # Virtual environments venv/ ENV/ -env/ +env*/ .venv/ .ENV/ .env/ diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..5573c5f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,45 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added +- Add a `CODEOWNERS` file for ownership management. +- Add a `CHANGELOG.md` file for ckear documentation of the development of the asset-api. +- Add `nl_query_assist.py` file for simple natural language querying. + +### Changed +- Refactor the query_assist.py output folder structure for dates of observations. +- New triplestore endpoint. + +--- + +## [0.0.2] - 2025-06-25 + +### Added +- Implement testing infrastructure. + +### Changed +- Refactor the query_assist.py output folder structure for types. + +--- + +## [0.0.1] - 2025-05-02 + +### Added +- Add initial Python examples. +- Create a `LICENSE` file. +- Add `.gitignore` to the project setup. +- Add the initial OpenAPI specification. +- Add the project `README.md` file. + +### Changed +- Update and enhance Python examples. +- Update the `LICENSE` file. + +### Fixed +- Correct formatting to be comma-separated. diff --git a/examples/README.md b/examples/README.md index 741662d..93e39bb 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,6 +1,17 @@ # Python examples -This directory contains a few Python scripts that will load and provision the graph database with provided turtle (.ttl) files and execute queries against it. Some will also then use results from queries to download assets from the API. +This directory contains Python scripts for interacting with the DID triplestore (SPARQL endpoint) and the Asset API: + +1. `query_assist.py` – Structured CLI for downloading assets, listing UPRNs by output area, or mapping ODS→UPRN. +2. `nl_query_assist.py` – Natural language (NL) wrapper that plans and executes one- or two-stage workflows using `query_assist.py` underneath. + +The scripts can: + +- Discover UPRNs by output area codes. +- Map NHS ODS codes to UPRNs (with recommendation codes). +- Download assets for specified UPRNs with optional sensor and asset-type filters. +- Accept CSV file inputs for batch operations. +- Use NL instructions (via `nl_query_assist.py`) to infer plans automatically. ## Setup @@ -19,13 +30,11 @@ source venv/bin/activate ``` **Windows (CMD)** - ```bash venv\Scripts\activate.bat ``` **Windows (PowerShell)** - ```powershell venv\Scripts\Activate.ps1 ``` @@ -39,103 +48,47 @@ pip install -r requirements.txt ### 4. Set your API key **Temporary (session only)** - ```bash export API_KEY="your_api_key" ``` -**Permanent (automatic when Virtualenv is activated)** - -If you want this variable to be automatically set every time you activate your virtual environment, add the export line to the activate script inside your virtual environment. For example (macOS/Linux): - -Open `venv/bin/activate` and add the environment variable near the bottom (but before any final `unset` lines if present). - -## Running scripts - -Once your environment is set up then you can run any of the `.py` files in the `/examples` directory. **Before you run a script** verify that the constants in the file are set correctly to match your local environment. - -### Scripts - -Granular scripts to improve legibility when viewing the SPARQL queries: -- `get_all_assets_for_a_list_of_uprns.py` -- `get_all_assets_for_a_uprn.py` -- `get_all_assets_for_a_uprn_made_by_a_sensor.py` -- `get_all_assets_of_type_for_list_of_uprns.py` - -Unified script: -- `query_assist.py` - -This unified script replaces and extends the above utilities by allowing you to: - -- Specify one or more UPRNs via **`--uprn`** (space- or comma-separated), or provide a CSV file path (column `uprn`) to `--uprn`. -- Specify one or more ODS codes via **`--ods`** (space- or comma-separated), or provide a CSV file path (column `ods`) to `--ods` for ODS→UPRN mapping. -- Specify one or more output-area IRIs or codes via **`--output-area`**/`--oa` (space- or comma-separated), or provide a CSV file path (column `output_area`) to list UPRNs by output area. -- Filter by **sensor** type (`--sensor`, e.g. `bess:OusterLidarSensor`). -- Filter by **asset type** (`--types`, e.g. `did:rgb-image,did:lidar-pointcloud-merged`). -- Override the **SPARQL endpoint** (`--db-url`). -- Change the **download directory** (`--download-dir`). -- Use a custom **API key** environment variable (`--api-key-env`). +## Running the structured CLI (`query_assist.py`) -#### Supported sensors +`query_assist.py` supports three modes (mutually exclusive per invocation): -- `bess:PhidgetHumiditySensor` -- `bess:PhidgetTemperatureSensor` -- `bess:OusterLidarSensor` -- `bess:FlirOryxCamera` -- `bess:FlirA70Camera` - -#### Supported asset types +1. Asset download: `--uprn` (one/many or CSV path with column `uprn`) +2. Output area → UPRN listing: `--output-area` / `--oa` (codes or CSV path column `output_area`) +3. ODS → UPRN mapping: `--ods` (codes or CSV path column `ods`) -- **Merged lidar point clouds**: `did:lidar-pointcloud-merged` -- **Pointcloud frame**: `did:lidar-pointcloud-frame` -- **Lidar range panorama images**: `did:lidar-range-pano` -- **Lidar reflectance for panorama**: `did:lidar-reflectance-pano` -- **Lidar signal intensity for panoramas**: `did:lidar-signal-pano` -- **Lidar Near Infrared for panoramas**: `did:lidar-nearir-pano` -- **Temperature in celsius** (no contentUrl): `did:celsius-temperature` -- **Relative humidity** (no contentUrl): `did:relative-humidity` -- **IR false colour**: `did:ir-false-color-image` -- **IR temperature array**: `did:ir-temperature-array` -- **IR counts**: `did:ir-count-image` -- **RGB image**: `did:rgb-image` +Optional filters / overrides: +- `--sensor bess:OusterLidarSensor` (or other supported sensor IRI) +- `--types did:rgb-image,did:lidar-pointcloud-merged` (comma separated IRIs) +- `--db-url http://host:3030/didtriplestore/query` (override SPARQL endpoint) +- `--download-dir /path/to/downloads` (default `./downloads`) +- `--api-key-env MY_KEY` (environment var containing API key; default `API_KEY`) -Pointclouds are brotli compressed .pcd files. These can be decompressed using the Brotli CLI tool +Example usages: ```bash -brew install brotli -``` - -Or using the `br_decompress.py` script. - -```bash -python3 br_decompress.py --directory ./downloads -``` - -#### `query_assist.py` Usage - -```bash -# Single UPRN +# Single UPRN asset download python3 query_assist.py --uprn 100023334911 -# Multiple UPRNs (space-separated) +# Multiple UPRNs (space separated) python3 query_assist.py --uprn 100023334911 100023268138 -# Multiple UPRNs (comma-separated) -python3 query_assist.py --uprn 100023334911, 100023268138, 46251044 +# Multiple UPRNs (comma separated in one argument) +python3 query_assist.py --uprn 100023334911,100023268138,46251044 -# CSV-only for UPRNs +# CSV of UPRNs python3 query_assist.py --uprn path/to/uprns.csv -# ODS→UPRN mapping with recommendation code A (accepted) I (intervention recommended) +# ODS→UPRN mapping python3 query_assist.py --ods G85013 -# Output-area mode (single code) -python3 query_assist.py --output-area E00004550 - -# Output-area mode (multiple codes) +# Output areas → UPRN listing (mixed raw codes) python3 query_assist.py --output-area E00004550 E00032882 E00063193 E00047411 -# CSV-only for output-area +# CSV of output areas python3 query_assist.py --output-area path/to/areas.csv # Sensor filter @@ -154,7 +107,7 @@ python3 query_assist.py --uprn 5045394 --download-dir /data/assets export MY_KEY="..." python3 query_assist.py --uprn 5045394 --api-key-env MY_KEY -# A Few options at once +# Multiple options combined export MY_KEY="..." python3 query_assist.py \ --uprn 200003455212,5045394 \ @@ -165,39 +118,111 @@ python3 query_assist.py \ --api-key-env MY_KEY ``` -Run `python3 query_assist.py -h` to see the full list of command-line options and examples. +Run `python3 query_assist.py -h` for full help text. + +## Natural language workflow (`nl_query_assist.py`) + +`nl_query_assist.py` lets you describe tasks conversationally; it plans steps (e.g. output-area lookup → asset download) and calls `query_assist.py` accordingly. + +Prerequisite: An [Ollama](https://ollama.com/) server must be running locally (or remotely) with the desired model already pulled. Set the server URL via `export OLLAMA_HOST=http://host:port` (defaults to `http://localhost:11434`). Pull a model first, e.g.: + +```bash +ollama pull gpt-oss:20b +``` + +If you use a different model tag, pass it with `--model-id`. + +Key flags: +- `--once "your NL request"` run a single NL instruction and exit. +- `--dry-run` plan and show commands without executing downloads. +- `--plan-only` output the inferred plan (JSON-like) and exit. +- `--model-id gpt-oss:20b` choose Ollama model (set `OLLAMA_HOST` to change server URL). +- Decoding knobs: `--temperature`, `--top-p`, `--num-predict`, `--num-ctx`, `--keep-alive`, `--no-force-json`. + +Interactive session: +```bash +python3 nl_query_assist.py +> download merged lidar point clouds and rgb images for UPRNs 5045394 and 200003455212 into /tmp/assets +``` + +Single command: +```bash +python3 nl_query_assist.py --once "list UPRNs in output areas E00004550 and E00032882 then download rgb images" +``` + +Dry run: +```bash +python3 nl_query_assist.py --dry-run --once "download point clouds for ODS G85013" +``` + +Verbose (show planning internals): +```bash +python3 nl_query_assist.py -vv --once "rgb images for UPRNs in areas E00004550,E00032882" +``` + +## Supported sensors + +- `bess:PhidgetHumiditySensor` +- `bess:PhidgetTemperatureSensor` +- `bess:OusterLidarSensor` +- `bess:FlirOryxCamera` +- `bess:FlirA70Camera` + +## Supported asset types + +- Merged lidar point clouds: `did:lidar-pointcloud-merged` +- Pointcloud frame: `did:lidar-pointcloud-frame` +- Lidar range panorama images: `did:lidar-range-pano` +- Lidar reflectance panorama images: `did:lidar-reflectance-pano` +- Lidar signal intensity panorama images: `did:lidar-signal-pano` +- Lidar Near Infrared panorama images: `did:lidar-nearir-pano` +- Temperature in celsius (no contentUrl): `did:celsius-temperature` +- Relative humidity (no contentUrl): `did:relative-humidity` +- IR false colour images: `did:ir-false-color-image` +- IR temperature arrays: `did:ir-temperature-array` +- IR counts images: `did:ir-count-image` +- RGB images: `did:rgb-image` +Point clouds are now provided as LAZ (.laz) compressed files. Most point cloud processing tools (e.g. PDAL, CloudCompare, Potree converters) handle `.laz` directly—no manual decompression step is required. -# Additional Data Information +## Additional Data Information -## RGB +### RGB -sRGB images are provided in the API at a resolution optimised for computer vision tasks. Vehicles and humans are masked out using an automated process, if a user finds an unmasked person or vehicle (most critically the number plate), please report it to [xRI](mailto:info@xri.online). +sRGB images are optimised for computer vision tasks. Vehicles and humans are masked out automatically. Please report any unmasked person or vehicle (especially number plates) to [xRI](mailto:info@xri.online). +### IR -## IR +Edge regions are masked due to sensor heating. In temperature arrays masked regions are NaN. Radiometric assumptions: -The outermost regions of the IR images and temperature arrays have been masked out, this is due to hot edges due to the IR detector heating itself up during operation. In the temperature arrays the masked areas are NaN elements in the compressed numpy array. +1. Pixel distance is currently a sensible hard-coded value (dynamic derivation from LiDAR is in progress). +2. Emissivity is assumed constant (typical building materials fall in $\epsilon \in [0.85, 0.93]$; dynamic estimation is in development). +3. Daytime data is reflectance dominated; radiometric temperatures are only provided for night hours (1h after sunset to 1h before sunrise). +4. Sky regions fall outside reliable radiometric interpretation and are excluded. -Additionally when working with the IR data there are some assumptions to note about the way in which radiometic temperature pixels themselves are calculated. +### LiDAR -- The formula requires the distance of each pixel from the detector, currently this is a sensible hard coded value, we are in the process of calculating these distances from the lidar. -- Building materials tend to be in a narrow range of emissivities $\epsilon\in [0.85,0.93]$, we currently hard code a single sensible value for emissivity but are developing methods for estimating building materials dynamically. -- During the day, we are in a reflectance dominated regime due to the influence of the sun, radiometric temperatures calculated in this regime are not reliable. Thermal data is provided for the night hours only (1 hour after sunset to 1 hour before sunrise). -- The sky is an object outside the scope of the radiometric temperature calculation, this is a low reflectance, low emissivity regime that our radiometric temperature calculations cannot say anything meaningful about. +Four 360° grayscale panoramas are provided: -## LiDAR +- Near-infrared (NIR): captures near-infrared spectrum for vegetation and surface texture analysis. +- Range: distance (mm) from sensor to objects (depth map). +- Reflectance: intensity of returned signal (material/angle dependent). +- Signal strength: quality of LiDAR return (helps assess reliability & environmental conditions). -We have four 360 degree grey scale panormas these are: +Point cloud modalities: -- Near-infrared (NIR) capturing light in the near-infrared spectrum (just beyond visible light). NIR is often used to assess vegetation health, surface properties, and for capturing detailed textures in low-light conditions. +- Merged point cloud: dense, registered aggregate from multiple frames using Iterative Closest Point (ICP). +- Single frame point cloud: most orthogonal frame (fallback if ICP merge is unusable). -- The range modality provides the distance from the LiDAR sensor to objects in the environment. Each pixel in this image represents a distance measurement in millimeters, creating a depth map of the scene. +ICP can fail, producing dense but misaligned merged clouds; use the single frame as a fallback. -- The reflectivity image captures the intensity of the LiDAR signal that bounces back to the sensor. Reflectivity depends on the surface material and angle of incidence, making it useful for distinguishing between materials or identifying road markings, signs, and other objects. +## Troubleshooting -- The signal strength or return signal intensity measures the quality of the LiDAR return. Stronger signals usually indicate clearer, more reliable measurements. It can also reflect surface properties and environmental conditions. +- Missing downloads? Ensure the API key environment variable (`API_KEY` or your override) is exported in the same shell session. +- Empty CSV outputs: Verify the codes (UPRN / ODS / Output area) exist in the triplestore and that `--db-url` is correct. +- Slow queries: Consider filtering with `--types` and/or `--sensor` to reduce result size. +- NL planning returns "No actionable plan": add explicit codes (e.g. UPRNs) or clarify intent ("download rgb images" vs. "rgb"). -We also have two pointcloud types one is a single frame that is closest to orthogonal to the UPRN, the other is a dense, orchstrated pointcloud created by merging many pointcloud frames on either side of the most orthogonal frame using the [Iterative Closes Point (ICP) registration algorithm](http://ki-www.cvl.iis.u-tokyo.ac.jp/class2013/2013w/paper/correspondingAndRegistration/03_Levoy.pdf). +## License -ICP registration can also fail completely resulting in dense but unaligned pointclouds. The single centre frame is provided as a failback pointcloud in the event of an unusable merged pointcloud. +See the root `LICENSE` file for details. diff --git a/examples/br_decompress.py b/examples/br_decompress.py deleted file mode 100644 index 09fbfa6..0000000 --- a/examples/br_decompress.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import os - -import brotli - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - - -def decompress_and_replace(input_file): - """ - Decompresses a Brotli-compressed .pcd.br file and replaces it with the decompressed .pcd file. - - :param input_file: Path to the Brotli-compressed .pcd.br file - """ - try: - output_file = input_file[:-3] - with open(input_file, "rb") as compressed_file: - compressed_data = compressed_file.read() - - decompressed_data = brotli.decompress(compressed_data) - - with open(output_file, "wb") as decompressed_file: - decompressed_file.write(decompressed_data) - - os.remove(input_file) - - logging.info(f"Decompressed and replaced: {input_file} -> {output_file}") - except Exception as e: - logging.error(f"Error processing {input_file}: {e}") - - -def find_and_replace_pcd_br(directory): - """ - Finds all Brotli-compressed .pcd.br files in the specified directory recursively, - decompresses them, and replaces the original files with the decompressed .pcd files. - - :param directory: Path to the directory to search for .pcd.br files - """ - if not os.path.exists(directory): - logging.error(f"Directory not found: {directory}") - return - - # Find all .pcd.br files - pcd_br_files = [] - for root, _, files in os.walk(directory): - for file in files: - if file.endswith(".pcd.br"): - pcd_br_files.append(os.path.join(root, file)) - - if not pcd_br_files: - logging.warning(f"No .pcd.br files found in directory: {directory}") - return - - logging.info(f"Found {len(pcd_br_files)} .pcd.br files to decompress.") - - for file_path in pcd_br_files: - decompress_and_replace(file_path) - - logging.info("All decompression and replacement tasks completed.") - - -def main(): - parser = argparse.ArgumentParser( - description="Decompress Brotli-compressed .pcd.br files." - ) - parser.add_argument( - "-d", - "--directory", - default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "./downloads"), - help="Directory to search for .pcd.br files (default: ./downloads)", - ) - args = parser.parse_args() - - logging.info(f"Using directory: {args.directory}") - find_and_replace_pcd_br(args.directory) - - -if __name__ == "__main__": - main() diff --git a/examples/get_all_assets_for_a_list_of_uprns.py b/examples/get_all_assets_for_a_list_of_uprns.py deleted file mode 100644 index 0d30e6d..0000000 --- a/examples/get_all_assets_for_a_list_of_uprns.py +++ /dev/null @@ -1,119 +0,0 @@ -import logging -import os -import re - -import httpx -from rdflib.plugins.stores.sparqlstore import SPARQLStore -from rdflib.query import ResultRow - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - -# --- Configuration --------------------------------------------------------- - -# SPARQL endpoint -DB_URL = "http://ec2-18-175-116-201.eu-west-2.compute.amazonaws.com:3030/didtriplestore/query" -endpoint = SPARQLStore(query_endpoint=DB_URL, returnFormat="json") - -# Base download directory -here = os.path.dirname(os.path.abspath(__file__)) -DOWNLOAD_DIR = os.path.join(here, "downloads") -os.makedirs(DOWNLOAD_DIR, exist_ok=True) - -# Comma-separated list of UPRNs (note the space after each comma) -UPRNs = "200003455212, 5045394" - -# SPARQL query: return both the UPRN value and the asset URL -QUERY = f""" -PREFIX dob: -PREFIX so: -PREFIX sosa: -PREFIX prov: - -SELECT DISTINCT ?uprnValue ?contentUrl -WHERE {{ - # 1) Grab any resource (?res) carrying a contentUrl - ?res - so:contentUrl ?contentUrl . - - # 2) Crawl back through: - # – sosa:hasResult from an Observation - # – prov:generated / prov:used chains from Processing → DerivedResult → Result - # (including any number of chained DerivedResults) - ?res - ( - ^sosa:hasResult - | ^prov:generated / prov:used - )* - / sosa:hasFeatureOfInterest - / so:identifier - / so:value - ?uprnValue . - - # 3) Restrict to only the UPRNs you care about - FILTER (?uprnValue IN ({UPRNs})) -}} -""" - - -# Your API key from environment -API_KEY = os.getenv("API_KEY") -if not API_KEY: - raise ValueError( - "API_KEY environment variable is not set. Please set it to your API key." - ) - - -# --- Helper to download a single asset into a given folder ---------------- - - -def download_asset(url: str, save_dir: str) -> None: - try: - resp = httpx.get(url, headers={"x-api-key": API_KEY}) - resp.raise_for_status() - - # Derive filename from Content-Disposition or fallback to URL basename - cd_header = resp.headers.get("Content-Disposition", "") - m = re.search(r'filename="([^"]+)"', cd_header) - filename = m.group(1) if m else os.path.basename(url) - - # Ensure the target folder exists - os.makedirs(save_dir, exist_ok=True) - save_path = os.path.join(save_dir, filename) - - # Write out the file - with open(save_path, "wb") as f: - f.write(resp.content) - - logging.info(f"✔ Saved {url} → {save_path}") - except Exception as e: - logging.error(f"✖ Failed to download {url}: {e}") - - -# --- Main execution -------------------------------------------------------- - - -def main(): - # Run the SPARQL query - results = endpoint.query(QUERY) - - # Iterate and dispatch each download into its UPRN folder - for row in results: - if not isinstance(row, ResultRow): - continue - - uprn_val = str(row["uprnValue"]) - content_url = str(row["contentUrl"]) - uprn_folder = os.path.join(DOWNLOAD_DIR, uprn_val) - - logging.info(f"⤷ Downloading {content_url} into {uprn_folder}/ …") - download_asset(content_url, uprn_folder) - - -if __name__ == "__main__": - try: - main() - except Exception as e: - logging.error(f"Error: {e}") diff --git a/examples/get_all_assets_for_a_uprn.py b/examples/get_all_assets_for_a_uprn.py deleted file mode 100644 index d8db0cd..0000000 --- a/examples/get_all_assets_for_a_uprn.py +++ /dev/null @@ -1,105 +0,0 @@ -import logging -import os -import re - -import httpx -from rdflib.plugins.stores.sparqlstore import SPARQLStore -from rdflib.query import ResultRow - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - -# --- Configuration --------------------------------------------------------- - -# SPARQL endpoint -DB_URL = "http://ec2-18-175-116-201.eu-west-2.compute.amazonaws.com:3030/didtriplestore/query" -endpoint = SPARQLStore(query_endpoint=DB_URL, returnFormat="json") - -# Base download directory -here = os.path.dirname(os.path.abspath(__file__)) -DOWNLOAD_DIR = os.path.join(here, "downloads") -os.makedirs(DOWNLOAD_DIR, exist_ok=True) - - -# Query parameters -UPRN = "5045394" - -# SPARQL query: return all asset URLs for the given UPRN -QUERY = f""" -PREFIX dob: -PREFIX rdfs: -PREFIX sosa: -PREFIX so: -PREFIX owl: - - -SELECT DISTINCT ?contentUrl -WHERE {{ - ?result a sosa:Result ; - so:contentUrl ?contentUrl . - ?observation a sosa:Observation ; - sosa:hasResult ?result ; - sosa:hasFeatureOfInterest ?foi . - ?foi a sosa:FeatureOfInterest ; - so:identifier ?uprn . - ?uprn a dob:UPRNValue ; - so:value ?uprnValue . - FILTER(str(?uprnValue) = "{UPRN}") -}} -""" - -# Your API key from environment -API_KEY = os.getenv("API_KEY") -if not API_KEY: - raise ValueError( - "API_KEY environment variable is not set. Please set it to your API key." - ) - -# --- Helper to download a single asset into a given folder ---------------- - - -def download_asset(url: str, save_dir: str) -> None: - try: - resp = httpx.get(url, headers={"x-api-key": API_KEY}) - resp.raise_for_status() - - # Derive filename from Content-Disposition or fallback to URL basename - cd_header = resp.headers.get("Content-Disposition", "") - m = re.search(r'filename="([^"]+)"', cd_header) - filename = m.group(1) if m else os.path.basename(url) - - os.makedirs(save_dir, exist_ok=True) - path = os.path.join(save_dir, filename) - - with open(path, "wb") as f: - f.write(resp.content) - - logging.info(f"✔ Saved {url} → {path}") - except Exception as e: - logging.error(f"✖ Failed to download {url}: {e}") - - -# --- Main execution -------------------------------------------------------- - - -def main(): - # Run the SPARQL query - results = endpoint.query(QUERY) - - # Download each asset into the base download directory - for row in results: - if not isinstance(row, ResultRow): - continue - - content_url = str(row["contentUrl"]) - logging.info(f"⤷ Downloading {content_url} …") - download_asset(content_url, DOWNLOAD_DIR) - - -if __name__ == "__main__": - try: - main() - except Exception as e: - logging.error(f"Error: {e}") diff --git a/examples/get_all_assets_for_a_uprn_made_by_a_sensor.py b/examples/get_all_assets_for_a_uprn_made_by_a_sensor.py deleted file mode 100644 index 52061b7..0000000 --- a/examples/get_all_assets_for_a_uprn_made_by_a_sensor.py +++ /dev/null @@ -1,130 +0,0 @@ -import logging -import os -import re - -import httpx -from rdflib.plugins.stores.sparqlstore import SPARQLStore -from rdflib.query import ResultRow - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - -# --- Configuration --------------------------------------------------------- - -# SPARQL endpoint -DB_URL = "http://ec2-18-175-116-201.eu-west-2.compute.amazonaws.com:3030/didtriplestore/query" -endpoint = SPARQLStore(query_endpoint=DB_URL, returnFormat="json") - -# Base download directory -here = os.path.dirname(os.path.abspath(__file__)) -DOWNLOAD_DIR = os.path.join(here, "downloads") -os.makedirs(DOWNLOAD_DIR, exist_ok=True) - -# Query parameters -UPRN = "5045394" -SENSOR = "bess:OusterLidarSensor" - -# The available sensors are: -# - bess:PhidgetHumiditySensor -# - bess:PhidgetTemperatureSensor -# - bess:OusterLidarSensor -# - bess:FlirOryxCamera -# - bess:FlirA70Camera - -# SPARQL query: return the asset URL for the given UPRN and sensor -QUERY = f""" -PREFIX dob: -PREFIX rdfs: -PREFIX sosa: -PREFIX bess: -PREFIX so: -PREFIX prov: -PREFIX owl: - -SELECT DISTINCT ?uprnValue ?contentUrl -WHERE {{ - # 1) Grab any resource carrying a contentUrl - ?res - so:contentUrl ?contentUrl . - - # 2) Crawl back through either: - # - Observation → sosa:hasResult - # - Processing → DerivedResult → (prov:generated / prov:used) - # (any number of times) - ?res - ( - ^sosa:hasResult - | ^prov:generated / prov:used - )* - ?obs . - - # 3) Now we’re at the Observation; pull out sensor & UPRN - ?obs a sosa:Observation ; - sosa:madeBySensor ?sensor ; - sosa:hasFeatureOfInterest/so:identifier/so:value ?uprnValue . - - # 4) Filter on specific sensor type and UPRN - ?sensor a {SENSOR} . - FILTER(str(?uprnValue) = "{UPRN}") -}} -""" - -# Your API key from environment -API_KEY = os.getenv("API_KEY") -if not API_KEY: - raise ValueError( - "API_KEY environment variable is not set. Please set it to your API key." - ) - - -# --- Helper to download a single asset into a given folder ---------------- - - -def download_asset(url: str, save_dir: str) -> None: - try: - resp = httpx.get(url, headers={"x-api-key": API_KEY}) - resp.raise_for_status() - - # Derive filename from Content-Disposition or fallback to URL basename - cd = resp.headers.get("Content-Disposition", "") - m = re.search(r'filename="([^"]+)"', cd) - filename = m.group(1) if m else os.path.basename(url) - - os.makedirs(save_dir, exist_ok=True) - path = os.path.join(save_dir, filename) - - with open(path, "wb") as f: - f.write(resp.content) - - logging.info(f"✔ Saved {url} → {path}") - except Exception as e: - logging.error(f"✖ Failed to download {url}: {e}") - - -# --- Main execution -------------------------------------------------------- - - -def main(): - # Run the SPARQL query - results = endpoint.query(QUERY) - - # Dispatch each download into the UPRN folder - for row in results: - if not isinstance(row, ResultRow): - continue - - uprn_val = str(row["uprnValue"]) - content_url = str(row["contentUrl"]) - target_dir = os.path.join(DOWNLOAD_DIR, uprn_val) - - logging.info(f"⤷ Downloading {content_url} into {target_dir}/ …") - download_asset(content_url, target_dir) - - -if __name__ == "__main__": - try: - main() - except Exception as e: - logging.error(f"Error: {e}") diff --git a/examples/get_all_assets_of_type_for_list_of_uprns.py b/examples/get_all_assets_of_type_for_list_of_uprns.py deleted file mode 100644 index 84b9b0d..0000000 --- a/examples/get_all_assets_of_type_for_list_of_uprns.py +++ /dev/null @@ -1,123 +0,0 @@ -import logging -import os -import re - -import httpx -from rdflib.plugins.stores.sparqlstore import SPARQLStore -from rdflib.query import ResultRow - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - -# --- Configuration --------------------------------------------------------- - -# SPARQL endpoint -DB_URL = "http://ec2-18-175-116-201.eu-west-2.compute.amazonaws.com:3030/didtriplestore/query" -endpoint = SPARQLStore(query_endpoint=DB_URL, returnFormat="json") - -# Base download directory -here = os.path.dirname(os.path.abspath(__file__)) -DOWNLOAD_DIR = os.path.join(here, "downloads") -os.makedirs(DOWNLOAD_DIR, exist_ok=True) - -# Assets can be of the following types: -# - Merged lidar point clouds: https://w3id.org/dob/id/lidar-pointcloud-merged -# - Lidar range panorama images: https://w3id.org/dob/id/lidar-range-pano -# - Lidar reflectance for panorama: https://w3id.org/dob/id/lidar-reflectance-pano -# - Temperature in celsius: https://w3id.org/dob/id/celsius-temperature (no contentUrl) -# - Lidar signal intensity for panoramas: https://w3id.org/dob/id/lidar-signal-pano -# - Lidar Near Infrared for panoramas: https://w3id.org/dob/id/lidar-nearir-pano -# - Relative humidity: https://w3id.org/dob/id/relative-humidity (no contentUrl) -# - Pointcloud frame: https://w3id.org/dob/id/lidar-pointcloud-frame -# - IR false colour: https://w3id.org/dob/id/ir-false-color-image -# - IR temperature array: https://w3id.org/dob/id/ir-temperature-array -# - IR counts: https://w3id.org/dob/id/ir-count-image -# - RBG image: https://w3id.org/dob/id/rgb-image - -# Which UPRNs and which enums (types) to pull -UPRNs = "200003455212, 5045394" -TYPES = "did:rgb-image, did:lidar-pointcloud-merged, did:ir-temperature-array" - -# --- SPARQL: find any resource with your enum & contentUrl, then crawl back to UPRN --- -QUERY = f""" -PREFIX dob: -PREFIX sosa: -PREFIX so: -PREFIX prov: -PREFIX did: - -SELECT DISTINCT ?uprnValue ?contentUrl -WHERE {{ - # 1) Pick up any resource carrying the enum & contentUrl - ?res - dob:typeQualifier ?enum ; - so:contentUrl ?contentUrl . - FILTER(?enum IN ({TYPES})) - - # 2) Crawl back arbitrarily through DerivedResult→Processing→Result→Observation - # (and even chained DerivedResults) to get the UPRN literal - ?res - ( - ^prov:generated / prov:used - | ^sosa:hasResult - )* - / sosa:hasFeatureOfInterest - / so:identifier - / so:value - ?uprnValue . - - # 3) Only the UPRNs you care about - FILTER(?uprnValue IN ({UPRNs})) -}} -""" - -# Your API key from environment -API_KEY = os.getenv("API_KEY") -if not API_KEY: - raise RuntimeError("API_KEY environment variable is not set.") - -# --- Download helper ------------------------------------------------------- - - -def download_asset(url: str, save_dir: str) -> None: - try: - resp = httpx.get(url, headers={"x-api-key": API_KEY}, timeout=60) - resp.raise_for_status() - - # Derive filename - cd = resp.headers.get("Content-Disposition", "") - m = re.search(r'filename="([^"]+)"', cd) - fn = m.group(1) if m else os.path.basename(url) - - os.makedirs(save_dir, exist_ok=True) - path = os.path.join(save_dir, fn) - with open(path, "wb") as f: - f.write(resp.content) - - logging.info(f"✔ {url} → {path}") - except Exception as e: - logging.error(f"✖ Failed {url}: {e}") - - -# --- Main ------------------------------------------------------------------ - - -def main(): - results = endpoint.query(QUERY) - - for row in results: - if not isinstance(row, ResultRow): - continue - - uprn = str(row["uprnValue"]) - url = str(row["contentUrl"]) - folder = os.path.join(DOWNLOAD_DIR, uprn) - - logging.info(f"Downloading into {folder}/ ← {url}") - download_asset(url, folder) - - -if __name__ == "__main__": - main() diff --git a/examples/nl_query_assist.py b/examples/nl_query_assist.py new file mode 100644 index 0000000..2b954ef --- /dev/null +++ b/examples/nl_query_assist.py @@ -0,0 +1,963 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import dataclasses +import json +import logging +import os +import re +import shlex +import shutil +import subprocess +import sys +import textwrap +import time +from typing import Any, Literal, TypedDict + +import requests +from langgraph.checkpoint.memory import MemorySaver +from langgraph.graph import END, START, StateGraph + +SYSTEM_ROUTER_PROMPT = """You are a rigorous function-call router for a Python CLI named query_assist.py. + +Supported commands and how to populate them: + +1) download_assets + Required: uprn (string CSV path OR array of strings like ["5045394","200003455212"]) + Optional: sensor (string, e.g., "bess:OusterLidarSensor") + types (array of strings; each a type IRI, e.g., ["did:rgb-image","did:lidar-pointcloud-merged"]) + download_dir (string path) + api_key_env (string, name of env var with API key) + db_url (string URL to SPARQL endpoint) + +2) ods_to_uprn + Required: ods (string CSV path OR array of strings like ["G85013","Q12345"]) + +3) uprns_by_output_area + Required: output_area (string CSV path OR array of strings, e.g., ["E00004550","E00032882"]) + +Schema (MUST output exactly one JSON object with these keys as needed): +{ + "command": "download_assets" | "ods_to_uprn" | "uprns_by_output_area", + "uprn": string | string[] | null, + "ods": string | string[] | null, + "output_area": string | string[] | null, + "sensor": string | null, + "types": string[] | null, + "download_dir": string | null, + "api_key_env": string | null, + "db_url": string | null +} + +Constraints: +- Return ONLY the JSON object. No prose, no markdown. +- If the user request implies asset types, map them to the supported IRIs if possible: + - RGB image -> "did:rgb-image" + - merged lidar point cloud -> "did:lidar-pointcloud-merged" + - lidar range panorama -> "did:lidar-range-pano" + - lidar reflectance panorama -> "did:lidar-reflectance-pano" + - lidar signal panorama -> "did:lidar-signal-pano" + - lidar near-infrared panorama -> "did:lidar-nearir-pano" + - IR false color -> "did:ir-false-color-image" + - IR temperature array -> "did:ir-temperature-array" + - IR counts -> "did:ir-count-image" + - temperature (no contentUrl) -> "did:celsius-temperature" + - relative humidity (no contentUrl) -> "did:relative-humidity" + - UPRNs are the UK OS Unique Property Reference Numbers. Queries may call them buildings or other built environment associated words. + - Output areas may be called OAs. + - ODS codes are unique identifiers for UK NHS buildings, hence words like medical, practice, hospital, etc... may be used. +- Prefer being decisive. When in doubt, infer sensible defaults. +""" + +TYPE_ALIASES = { + "rgb": "did:rgb-image", + "rgb image": "did:rgb-image", + "merged lidar": "did:lidar-pointcloud-merged", + "merged lidar point cloud": "did:lidar-pointcloud-merged", + "lidar point cloud": "did:lidar-pointcloud-frame", + "point cloud": "did:lidar-pointcloud-frame", + "point clouds": None, # expands to both merged + frame + "lidar range panorama": "did:lidar-range-pano", + "lidar reflectance panorama": "did:lidar-reflectance-pano", + "lidar signal panorama": "did:lidar-signal-pano", + "lidar nearir panorama": "did:lidar-nearir-pano", + "ir false color": "did:ir-false-color-image", + "ir temperature array": "did:ir-temperature-array", + "ir counts": "did:ir-count-image", + "temperature": "did:celsius-temperature", + "relative humidity": "did:relative-humidity", +} + +POINTCLOUD_BOTH = ["did:lidar-pointcloud-merged", "did:lidar-pointcloud-frame"] + + +def _render_box(title: str, body: str) -> str: + term_width = shutil.get_terminal_size(fallback=(100, 24)).columns + max_width = max(60, min(term_width - 2, 100)) + wrap_width = max_width - 4 + body_lines = [] + for para in body.splitlines(): + if not para.strip(): + body_lines.append("") + else: + body_lines.extend(textwrap.wrap(para, width=wrap_width)) + title = title.strip() + title_line = f" {title} " + top = "┌" + "─" * (max_width - 2) + "┐" + sep = "├" + "─" * (max_width - 2) + "┤" + bot = "└" + "─" * (max_width - 2) + "┘" + if len(title_line) <= (max_width - 2): + left = (max_width - 2 - len(title_line)) // 2 + right = max_width - 2 - len(title_line) - left + top = "┌" + "─" * left + title_line + "─" * right + "┐" + content = "\n".join("│ " + line.ljust(max_width - 4) + " │" for line in body_lines) + return "\n".join([top, sep, content, bot]) + + +def _extract_first_json(text: str) -> dict | None: + start = text.find("{") + if start == -1: + return None + depth = 0 + for i, c in enumerate(text[start:], start=start): + if c == "{": + depth += 1 + elif c == "}": + depth -= 1 + if depth == 0: + try: + return json.loads(text[start : i + 1]) + except Exception: + return None + return None + + +def _find_csvs_emitted(stream_text: str) -> list[str]: + """Parse query_assist.py logs to discover created CSVs.""" + csvs: list[str] = [] + patterns = [ + r"Saved CSV for .*? → ([^\s]+\.csv)", + r"Saved CSV for .*? -> ([^\s]+\.csv)", + r"Saved ODS.?UPRN CSV .*? → ([^\s]+\.csv)", + r"Saved ODS.?UPRN CSV .*? -> ([^\s]+\.csv)", + ] + for pat in patterns: + for m in re.finditer(pat, stream_text): + csvs.append(m.group(1)) + seen: set[str] = set() + out: list[str] = [] + for p in csvs: + if p not in seen: + out.append(p) + seen.add(p) + return out + + +def _ensure_list_or_path(v: None | str | list[str]) -> list[str]: + """Convert (None | str | list[str]) to a flat argv-ready list.""" + if v is None: + return [] + if isinstance(v, list): + return [str(x) for x in v if str(x).strip()] + s = str(v).strip() + return [s] if s else [] + + +def _build_argv(spec: dict[str, Any], py: str, qa_path: str) -> list[str]: + cmd = [py, qa_path] + command = spec.get("command") + if command == "download_assets": + uprn = _ensure_list_or_path(spec.get("uprn")) + if not uprn: + raise ValueError("download_assets requires 'uprn'.") + cmd += ["--uprn"] + uprn + if spec.get("sensor"): + cmd += ["--sensor", str(spec["sensor"])] + if spec.get("types"): + cmd += ["--types", ",".join(spec["types"])] + elif command == "ods_to_uprn": + ods = _ensure_list_or_path(spec.get("ods")) + if not ods: + raise ValueError("ods_to_uprn requires 'ods'.") + cmd += ["--ods"] + ods + elif command == "uprns_by_output_area": + oa = _ensure_list_or_path(spec.get("output_area")) + if not oa: + raise ValueError("uprns_by_output_area requires 'output_area'.") + cmd += ["--output-area"] + oa + else: + raise ValueError(f"Unsupported command: {command!r}") + + if spec.get("db_url"): + cmd += ["--db-url", str(spec["db_url"])] + if spec.get("download_dir"): + cmd += ["--download-dir", str(spec["download_dir"])] + if spec.get("api_key_env"): + cmd += ["--api-key-env", str(spec["api_key_env"])] + return cmd + + +def _map_types_from_text(lowered: str) -> list[str] | None: + wants_pointclouds = re.search(r"\bpoint\s*clouds?\b", lowered) is not None + wants_merged = "merged lidar" in lowered or re.search( + r"merged\s+lidar\s+point\s*cloud", lowered + ) + wants_frame = "pointcloud frame" in lowered or "single frame" in lowered + + types: list[str] = [] + if wants_pointclouds: + types.extend(POINTCLOUD_BOTH) + if wants_merged: + types.append("did:lidar-pointcloud-merged") + if wants_frame: + types.append("did:lidar-pointcloud-frame") + + if "rgb" in lowered and "image" in lowered: + types.append("did:rgb-image") + if "range panorama" in lowered: + types.append("did:lidar-range-pano") + if "reflectance panorama" in lowered: + types.append("did:lidar-reflectance-pano") + if "signal panorama" in lowered: + types.append("did:lidar-signal-pano") + if "nearir" in lowered or "near-infrared" in lowered: + types.append("did:lidar-nearir-pano") + if "ir false" in lowered: + types.append("did:ir-false-color-image") + if ( + "ir temperature array" in lowered + or re.search(r"thermal\s+arrays?", lowered) + or re.search(r"temperature\s+arrays?", lowered) + ): + types.append("did:ir-temperature-array") + if re.search(r"thermal\s+images?", lowered): + types.append("did:ir-false-color-image") + + if not types: + return None + + out: list[str] = [] + seen: set[str] = set() + for t in types: + if t not in seen: + out.append(t) + seen.add(t) + return out + + +def ollama_chat( + base_url: str, + model: str, + messages: list[dict[str, str]], + temperature: float = 0.0, + top_p: float = 0.95, + num_predict: int = 256, + num_ctx: int | None = None, + keep_alive: str | None = None, + force_json: bool = True, + timeout_s: float = 120.0, +) -> dict[str, Any]: + url = base_url.rstrip("/") + "/api/chat" + payload: dict[str, Any] = { + "model": model, + "messages": messages, + "stream": False, + "options": { + "temperature": float(temperature), + "top_p": float(top_p), + "num_predict": int(num_predict), + }, + } + if num_ctx is not None: + payload["options"]["num_ctx"] = int(num_ctx) + if keep_alive: + payload["keep_alive"] = str(keep_alive) + if force_json: + payload["format"] = "json" + r = requests.post(url, json=payload, timeout=(5.0, timeout_s)) + r.raise_for_status() + return r.json() + + +class StepSpec(TypedDict, total=False): + command: Literal["download_assets", "ods_to_uprn", "uprns_by_output_area"] + uprn: list[str] | str | None + ods: list[str] | str | None + output_area: list[str] | str | None + sensor: str | None + types: list[str] | None + download_dir: str | None + api_key_env: str | None + db_url: str | None + uprn_from_previous_csvs: bool + + +@dataclasses.dataclass +class WFState: + nl: str + plan: list[StepSpec] + current: int = 0 + artifacts: dict[str, Any] = dataclasses.field(default_factory=dict) + log: list[str] = dataclasses.field(default_factory=list) + actions: list[dict[str, Any]] = dataclasses.field(default_factory=list) + dry_run: bool = False + plan_only: bool = False + py_exe: str = sys.executable + qa_path: str = os.path.join(os.path.dirname(__file__), "query_assist.py") + base_url: str = os.environ.get("OLLAMA_HOST", "http://localhost:11434") + model_id: str = "gpt-oss:20b" + temperature: float = 0.0 + top_p: float = 0.95 + num_predict: int = 256 + num_ctx: int | None = None + keep_alive: str | None = None + force_json: bool = True + verbose_level: int = logging.INFO + max_steps: int = 8 + + +def heuristic_plan(nl: str, defaults: dict[str, Any]) -> list[StepSpec] | None: + text = nl.strip() + if not text: + return None + lowered = text.lower() + + csv_paths = re.findall(r'(?:(?:[A-Za-z]:)?[^\s"\'<>|]+\.csv)\b', text) + oa_codes = re.findall(r"\bE\d{8}\b", text) + ods_codes = re.findall(r"\b[A-Z]\d{5}\b", text) + uprns = re.findall(r"\b\d{6,}\b", text) + endpoint_match = re.search(r"(https?://[\w\.-:%/]+)", text) + endpoint_url = endpoint_match.group(1) if endpoint_match else None + + download_dir = None + m = re.search(r"(?: to | into )\s+(/[^ ]+)", lowered) + if m: + download_dir = m.group(1) + + types = _map_types_from_text(lowered) + + if csv_paths and ("uprn" in lowered or "uprns" in lowered): + step: dict[str, Any] = { + "command": "download_assets", + "uprn": csv_paths if len(csv_paths) > 1 else csv_paths[0], + "types": types, + "download_dir": download_dir or defaults.get("download_dir"), + "db_url": endpoint_url or defaults.get("db_url"), + } + return [step] + + if (oa_codes or "output area" in lowered or "output areas" in lowered) and ( + types + or "point cloud" in lowered + or "assets" in lowered + or "download" in lowered + ): + oa_list: list[str] = [] + if oa_codes: + oa_list.extend(oa_codes) + if csv_paths and ("uprn" not in lowered): + oa_list.extend(csv_paths) + if not oa_list: + return None + second: dict[str, Any] = { + "command": "download_assets", + "uprn_from_previous_csvs": True, + "download_dir": download_dir or defaults.get("download_dir"), + "db_url": defaults.get("db_url"), + } + if types: + second["types"] = types + return [ + { + "command": "uprns_by_output_area", + "output_area": oa_list + if oa_list + else (csv_paths[0] if csv_paths else None), + "download_dir": defaults.get("download_dir"), + "db_url": defaults.get("db_url"), + }, + second, + ] + + if (ods_codes or "ods" in lowered) and (types is not None): + ods_list: list[str] = [] + if ods_codes: + ods_list.extend(ods_codes) + if csv_paths: + ods_list.extend(csv_paths) + if not ods_list: + return None + return [ + { + "command": "ods_to_uprn", + "ods": ods_list, + "download_dir": defaults.get("download_dir"), + "db_url": defaults.get("db_url"), + }, + { + "command": "download_assets", + "uprn_from_previous_csvs": True, + "types": types, + "download_dir": download_dir or defaults.get("download_dir"), + "db_url": defaults.get("db_url"), + }, + ] + + if oa_codes or ("output area" in lowered or "output areas" in lowered): + oa_list: list[str] = [] + if oa_codes: + oa_list.extend(oa_codes) + if csv_paths and ("uprn" not in lowered): + oa_list.extend(csv_paths) + if not oa_list: + return None + return [ + { + "command": "uprns_by_output_area", + "output_area": oa_list if len(oa_list) > 1 else oa_list[0], + "download_dir": defaults.get("download_dir"), + "db_url": defaults.get("db_url"), + } + ] + + if ods_codes or "ods" in lowered: + ods_list: list[str] = [] + if ods_codes: + ods_list.extend(ods_codes) + if csv_paths: + ods_list.extend(csv_paths) + if not ods_list: + return None + return [ + { + "command": "ods_to_uprn", + "ods": ods_list if len(ods_list) > 1 else ods_list[0], + "download_dir": defaults.get("download_dir"), + "db_url": defaults.get("db_url"), + } + ] + + if uprns and ( + "download" in lowered + or "assets" in lowered + or "point" in lowered + or "rgb" in lowered + or "image" in lowered + or "lidar" in lowered + ): + step: dict[str, Any] = { + "command": "download_assets", + "uprn": uprns, + "types": types, + "download_dir": download_dir or defaults.get("download_dir"), + "db_url": endpoint_url or defaults.get("db_url"), + } + return [step] + + return None + + +def llm_plan( + nl: str, + defaults: dict[str, Any], + base_url: str, + model_id: str, + temperature: float, + top_p: float, + num_predict: int, + num_ctx: int | None, + keep_alive: str | None, + force_json: bool, +) -> list[StepSpec] | None: + messages = [ + { + "role": "system", + "content": ( + "You are a planning assistant that compiles an ordered plan for query_assist.py.\n" + "Return JSON with a 'steps' array; use 'uprn_from_previous_csvs': true when a download_assets step should consume prior CSVs.\n" + "Types to use when implied: did:rgb-image, did:lidar-pointcloud-merged, did:lidar-pointcloud-frame, did:lidar-range-pano, did:lidar-reflectance-pano, did:lidar-signal-pano, did:lidar-nearir-pano, did:ir-false-color-image, did:ir-temperature-array, did:ir-count-image, did:celsius-temperature, did:relative-humidity." + ), + }, + {"role": "user", "content": nl}, + ] + try: + resp = ollama_chat( + base_url=base_url, + model=model_id, + messages=messages, + temperature=temperature, + top_p=top_p, + num_predict=num_predict, + num_ctx=num_ctx, + keep_alive=keep_alive, + force_json=force_json, + ) + content = None + if isinstance(resp.get("message"), dict): + content = resp["message"].get("content") + if not content and isinstance(resp.get("response"), str): + content = resp["response"] + if not content: + return None + plan_obj = None + try: + plan_obj = json.loads(content) + except Exception: + plan_obj = _extract_first_json(content) + if not isinstance(plan_obj, dict): + return None + steps_raw = plan_obj.get("steps") + if not isinstance(steps_raw, list) or not steps_raw: + return None + steps: list[StepSpec] = [] + for s in steps_raw: + if not isinstance(s, dict): + continue + cmd = s.get("command") + if cmd not in {"download_assets", "ods_to_uprn", "uprns_by_output_area"}: + continue + step: StepSpec = {"command": cmd} + for key in [ + "uprn", + "ods", + "output_area", + "types", + "sensor", + "download_dir", + "api_key_env", + "db_url", + ]: + if key in s: + step[key] = s[key] + if s.get("uprn_from_previous_csvs"): + step["uprn_from_previous_csvs"] = True + if "download_dir" not in step and defaults.get("download_dir"): + step["download_dir"] = defaults["download_dir"] + steps.append(step) + return steps or None + except Exception: + return None + + +def llm_route_to_spec(nl: str, base_url: str, model_id: str, **opts) -> StepSpec | None: + messages = [ + {"role": "system", "content": SYSTEM_ROUTER_PROMPT}, + {"role": "user", "content": nl}, + ] + try: + resp = ollama_chat(base_url, model_id, messages, **opts) + except Exception: + return None + content = None + if isinstance(resp.get("message"), dict): + content = resp["message"].get("content") + if not content and isinstance(resp.get("response"), str): + content = resp["response"] + if not content: + return None + obj = _extract_first_json(content) or None + if not obj or "command" not in obj: + return None + step: StepSpec = { + "command": obj["command"], + "uprn": obj.get("uprn"), + "ods": obj.get("ods"), + "output_area": obj.get("output_area"), + "sensor": obj.get("sensor"), + "types": obj.get("types"), + "download_dir": obj.get("download_dir"), + "api_key_env": obj.get("api_key_env"), + "db_url": obj.get("db_url"), + } + return step + + +def upgrade_single_spec_to_plan( + nl: str, spec: StepSpec, defaults: dict[str, Any] +) -> list[StepSpec]: + lowered = nl.lower() + types = spec.get("types") or _map_types_from_text(lowered) + if spec.get("command") == "uprns_by_output_area" and ( + types + or "point cloud" in lowered + or "download" in lowered + or "assets" in lowered + ): + second: dict[str, Any] = { + "command": "download_assets", + "uprn_from_previous_csvs": True, + "download_dir": spec.get("download_dir"), + "db_url": spec.get("db_url"), + } + if types: + second["types"] = types + return [spec, second] + if spec.get("command") == "ods_to_uprn" and (types is not None): + return [ + spec, + { + "command": "download_assets", + "uprn_from_previous_csvs": True, + "types": types, + "download_dir": spec.get("download_dir"), + "api_key_env": spec.get("api_key_env"), + "db_url": spec.get("db_url"), + }, + ] + return [spec] + + +def run_query_assist_step( + step: StepSpec, py_exe: str, qa_path: str, dry_run: bool +) -> tuple[int, str]: + argv = _build_argv(step, py_exe, qa_path) + printable = " ".join(shlex.quote(x) for x in argv) + logging.info("Command: %s", printable) + if dry_run: + return 0, f"[dry-run] {printable}\n" + + p = subprocess.Popen( + argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + ) + captured_lines: list[str] = [] + try: + assert p.stdout is not None + for line in p.stdout: + sys.stdout.write(line) + captured_lines.append(line) + finally: + rc = p.wait() + return rc, "".join(captured_lines) + + +def materialize_previous_uprn_csvs(state: WFState) -> list[str]: + from_logs = state.artifacts.get("csvs", []) + if from_logs: + return from_logs + dl_base = state.plan[0].get("download_dir") or os.path.join( + os.getcwd(), "downloads" + ) + candidate = os.path.join(dl_base, "ods_to_uprn.csv") + if os.path.isfile(candidate): + return [candidate] + return [] + + +def node_plan(state: WFState) -> WFState: + defaults = { + "download_dir": None, + "api_key_env": "API_KEY", + "db_url": None, + } + # 1) LLM multi-step plan + plan = llm_plan( + state.nl, + defaults, + base_url=state.base_url, + model_id=state.model_id, + temperature=state.temperature, + top_p=state.top_p, + num_predict=state.num_predict, + num_ctx=state.num_ctx, + keep_alive=state.keep_alive, + force_json=state.force_json, + ) + # 2) Heuristic plan + if not plan: + plan = heuristic_plan(state.nl, defaults) + # 3) LLM single-step → upgrade + if not plan: + spec = llm_route_to_spec( + state.nl, + base_url=state.base_url, + model_id=state.model_id, + temperature=state.temperature, + top_p=state.top_p, + num_predict=state.num_predict, + num_ctx=state.num_ctx, + keep_alive=state.keep_alive, + force_json=state.force_json, + ) + if spec: + plan = upgrade_single_spec_to_plan(state.nl, spec, defaults) + + # Validate required args; if invalid (e.g., output_area missing), try fallback routes + def _valid(p: list[StepSpec]) -> bool: + for st in p: + cmd = st.get("command") + if cmd == "uprns_by_output_area" and not _ensure_list_or_path( + st.get("output_area") + ): + return False + if cmd == "ods_to_uprn" and not _ensure_list_or_path(st.get("ods")): + return False + if cmd == "download_assets": + if st.get("uprn_from_previous_csvs"): + continue + if not _ensure_list_or_path(st.get("uprn")): + return False + return True + + if plan and not _valid(plan): + plan = heuristic_plan(state.nl, defaults) + if plan and not _valid(plan): + spec = llm_route_to_spec( + state.nl, + state.base_url, + state.model_id, + temperature=state.temperature, + top_p=state.top_p, + num_predict=state.num_predict, + num_ctx=state.num_ctx, + keep_alive=state.keep_alive, + force_json=state.force_json, + ) + if spec: + plan = upgrade_single_spec_to_plan(state.nl, spec, defaults) + + state.plan = plan or [] + + if state.plan and state.verbose_level <= logging.INFO: + print("Plan:") + for i, step in enumerate(state.plan): + show = {k: v for k, v in step.items() if k != "uprn_from_previous_csvs"} + print(f" {i+1}. {json.dumps(show, ensure_ascii=False)}") + print() + if not state.plan: + state.log.append("No actionable plan could be inferred.") + return state + + +def node_execute(state: WFState) -> WFState: + if state.current >= len(state.plan): + return state + step = state.plan[state.current] + + if step.get("uprn_from_previous_csvs"): + csvs = materialize_previous_uprn_csvs(state) + if not csvs: + state.log.append("No CSVs found from previous step(s).") + state.current = len(state.plan) + return state + step = dict(step) + step.pop("uprn_from_previous_csvs", None) + step["uprn"] = csvs + + rc, captured = run_query_assist_step( + step, state.py_exe, state.qa_path, state.dry_run + ) + state.log.append(captured) + + newly_found = _find_csvs_emitted(captured) + if newly_found: + extant = state.artifacts.get("csvs", []) + state.artifacts["csvs"] = list(dict.fromkeys(extant + newly_found)) + + try: + argv_for_record = _build_argv(step, state.py_exe, state.qa_path) + except Exception: + argv_for_record = [] + state.actions.append( + { + "index": state.current + 1, + "command": step.get("command"), + "argv": argv_for_record, + "rc": rc, + "emitted_csvs": newly_found, + } + ) + + if rc != 0: + state.log.append(f"Step {state.current} returned non-zero exit {rc}.") + state.current = len(state.plan) + else: + state.current += 1 + return state + + +def after_plan(state: WFState) -> str: + if not state.plan: + return END + if state.plan_only: + return END + return "execute" + + +def check_done(state: WFState) -> str: + if state.current >= len(state.plan): + return END + if state.current >= state.max_steps: + state.log.append(f"Aborting: exceeded max_steps={state.max_steps}") + return END + return "execute" + + +def parse_args() -> argparse.Namespace: + ap = argparse.ArgumentParser( + description="LangGraph NL workflow for query_assist.py (one- and two-stage)" + ) + ap.add_argument("--model-id", default="gpt-oss:20b", help="Ollama model name/tag") + ap.add_argument( + "--query-assist-path", + default=os.path.join(os.path.dirname(__file__), "query_assist.py"), + help="Path to query_assist.py", + ) + ap.add_argument( + "--base-url", + default=os.environ.get("OLLAMA_HOST", "http://localhost:11434"), + help="Base URL of the Ollama server (or set OLLAMA_HOST)", + ) + ap.add_argument( + "--dry-run", + action="store_true", + help="Plan & print commands but do not execute", + ) + ap.add_argument( + "--plan-only", action="store_true", help="Only compile/print the plan and exit" + ) + ap.add_argument("--once", "-q", help="Run a single NL query and exit") + + # Decoding/runtime knobs + ap.add_argument("--temperature", type=float, default=0.0) + ap.add_argument("--top-p", type=float, default=0.95) + ap.add_argument("--num-predict", type=int, default=256) + ap.add_argument("--num-ctx", type=int, default=None) + ap.add_argument("--keep-alive", default=None) + ap.add_argument("--no-force-json", action="store_true") + ap.add_argument("--max-steps", type=int, default=8) + + # Logging controls + ap.add_argument( + "-v", "--verbose", action="count", default=0, help="-v=info, -vv=debug" + ) + return ap.parse_args() + + +def main() -> None: + args = parse_args() + + # Logging level + if args.verbose >= 2: + level = logging.DEBUG + elif args.verbose == 1: + level = logging.INFO + else: + level = logging.WARNING + logging.basicConfig(level=level, format="%(levelname)s: %(message)s") + + if level <= logging.INFO: + body = ( + "• Parses natural language into a multi-stage plan to execute SPARQL and retrieve assets.\n" + "• Executes via LangGraph with artifact passing.\n" + "• Optional filters: sensor, types; optional overrides: download_dir, api_key_env, db_url \n" + "• Supports dry-run and plan-only modes." + ) + print(_render_box(f"Query Assist AI — {args.model_id}", body)) + + # Build LangGraph + builder = StateGraph(WFState) + builder.add_node("plan", node_plan) + builder.add_node("execute", node_execute) + builder.add_edge(START, "plan") + builder.add_conditional_edges("plan", after_plan, {"execute": "execute", END: END}) + builder.add_conditional_edges( + "execute", check_done, {"execute": "execute", END: END} + ) + memory = MemorySaver() + graph = builder.compile(checkpointer=memory) + + def run_once(nl: str) -> int: + st = WFState( + nl=nl, + plan=[], + dry_run=bool(args.dry_run), + plan_only=bool(args.plan_only), + qa_path=args.query_assist_path, + base_url=args.base_url, + model_id=args.model_id, + temperature=args.temperature, + top_p=args.top_p, + num_predict=args.num_predict, + num_ctx=args.num_ctx, + keep_alive=args.keep_alive, + force_json=(not args.no_force_json), + verbose_level=level, + max_steps=args.max_steps, + ) + final_state = graph.invoke( + st, config={"configurable": {"thread_id": f"tid-{time.time_ns()}"}} + ) + logs = ( + final_state.get("log") + if isinstance(final_state, dict) + else getattr(final_state, "log", []) + ) or [] + trailing = [ + l + for l in logs + if any( + t in l + for t in ( + "[dry-run]", + "No CSVs found", + "non-zero exit", + "Planner", + "No actionable plan", + ) + ) + ] + if trailing: + print("\n" + "\n".join(l.strip() for l in trailing)) + actions = ( + final_state.get("actions") + if isinstance(final_state, dict) + else getattr(final_state, "actions", []) + ) or [] + if actions and level <= logging.INFO: + print("\nACTIONS (LangGraph Execution):") + for a in actions: + argv = " ".join(shlex.quote(x) for x in a.get("argv", [])) + rc = a.get("rc") + em = ", ".join(a.get("emitted_csvs", []) or []) + print( + f" {a.get('index')}. {a.get('command')} [rc={rc}]\n argv: {argv}" + ) + if em: + print(f" emitted CSVs: {em}") + return 0 if not any("non-zero exit" in l for l in logs) else 1 + + try: + if args.once: + rc = run_once(args.once) + if rc != 0: + logging.warning("Workflow exited with code %d", rc) + return + if level <= logging.INFO: + print( + "LangGraph NL workflow for query_assist.py. Type 'exit' or Ctrl-D to quit." + ) + while True: + try: + nl = input("> ") + except EOFError: + break + if not nl.strip(): + continue + if nl.strip().lower() in {"exit", "quit"}: + break + rc = run_once(nl) + if rc != 0: + logging.warning("Workflow exited with code %d", rc) + except KeyboardInterrupt: + print() + logging.info("Interrupted.") + sys.exit(130) + except Exception as e: + logging.error("Fatal error: %s", e) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/examples/query_assist.py b/examples/query_assist.py index 099bbef..ece4c0a 100644 --- a/examples/query_assist.py +++ b/examples/query_assist.py @@ -4,6 +4,7 @@ import logging import os import re +from datetime import datetime import httpx from rdflib.plugins.stores.sparqlstore import SPARQLStore @@ -15,6 +16,7 @@ def parse_args(): + """Parses command-line arguments.""" parser = argparse.ArgumentParser( description="Download assets, list UPRNs by output area, or map ODS→UPRN from a DID triplestore." ) @@ -41,7 +43,7 @@ def parse_args(): ) parser.add_argument( "--db-url", - default="http://ec2-18-175-116-201.eu-west-2.compute.amazonaws.com:3030/didtriplestore/query", + default="http://ec2-3-10-233-191.eu-west-2.compute.amazonaws.com:3030/mytriplestore/query", help="SPARQL endpoint URL", ) parser.add_argument( @@ -58,6 +60,7 @@ def parse_args(): def load_column_from_csv(path, column): + """Loads a single column from a CSV file.""" values = [] with open(path, newline="") as cf: reader = csv.DictReader(cf) @@ -82,19 +85,22 @@ def asset_subdir(enum_iri: str) -> str: def build_asset_query(uprn_list, args): + """Builds the SPARQL query to fetch asset data including phenomenon times.""" prefixes = """ PREFIX did: PREFIX dob: PREFIX so: PREFIX sosa: PREFIX prov: + PREFIX xsd: """ - select = "SELECT DISTINCT ?uprnValue ?contentUrl ?enum\n" + select = "SELECT DISTINCT ?uprnValue ?contentUrl ?enum ?phenomenonTime\n" where = [ " ?res so:contentUrl ?contentUrl .", - " ?res dob:typeQualifier ?enum .", # ① now unconditional + " ?res dob:typeQualifier ?enum .", " ?res ( ^sosa:hasResult | ^prov:generated / prov:used )* ?obs .", " ?obs a sosa:Observation ;", + " sosa:phenomenonTime ?phenomenonTime ;", " sosa:hasFeatureOfInterest ?foi .", " ?foi so:identifier ?uprnRes .", " ?uprnRes a dob:UPRNValue ; so:value ?uprnValue .", @@ -102,15 +108,17 @@ def build_asset_query(uprn_list, args): if args.sensor: where.append(f" ?obs sosa:madeBySensor {args.sensor} .") - quoted = ", ".join(f'"{u}"' for u in uprn_list) - where.append(f" FILTER(str(?uprnValue) IN ({quoted}))") + quoted_uprns = ", ".join(f'"{u}"' for u in uprn_list) + where.append(f" FILTER(str(?uprnValue) IN ({quoted_uprns}))") if args.types: - where.append(f" FILTER(?enum IN ({args.types}))") + quoted_types = ", ".join(f"<{t.strip()}>" for t in args.types.split(",")) + where.append(f" FILTER(?enum IN ({quoted_types}))") return prefixes + select + "WHERE {\n" + "\n".join(where) + "\n}" def build_output_area_query(area_list): + """Builds the SPARQL query to fetch UPRNs within given output areas.""" prefixes = """ PREFIX spr: PREFIX so: @@ -129,6 +137,7 @@ def build_output_area_query(area_list): def build_ods_to_uprn_query(ods_list): + """Builds the SPARQL query to map ODS codes to UPRNs.""" prefixes = """ PREFIX dob: PREFIX so: @@ -148,22 +157,34 @@ def build_ods_to_uprn_query(ods_list): def download_asset(url: str, save_dir: str, api_key: str): + """Downloads a single asset from a URL to a specified directory.""" try: - resp = httpx.get(url, headers={"x-api-key": api_key}, timeout=120) - resp.raise_for_status() - cd = resp.headers.get("Content-Disposition", "") - m = re.search(r'filename="([^"]+)"', cd) - fn = m.group(1) if m else os.path.basename(url) - os.makedirs(save_dir, exist_ok=True) - path = os.path.join(save_dir, fn) - with open(path, "wb") as f: - f.write(resp.content) - logging.info(f"✔ Saved {url} → {path}") + with httpx.Client(timeout=120.0) as client: + resp = client.get(url, headers={"x-api-key": api_key}) + resp.raise_for_status() + + # Determine filename from Content-Disposition or URL + cd = resp.headers.get("Content-Disposition", "") + m = re.search(r'filename="([^"]+)"', cd) + fn = m.group(1) if m else os.path.basename(url) + + os.makedirs(save_dir, exist_ok=True) + path = os.path.join(save_dir, fn) + + with open(path, "wb") as f: + f.write(resp.content) + logging.info(f"✔ Saved {url} → {path}") + + except httpx.HTTPStatusError as e: + logging.error( + f"✖ HTTP error downloading {url}: {e.response.status_code} - {e.response.text}" + ) except Exception as e: logging.error(f"✖ Failed to download {url}: {e}") def main(): + """Main execution function.""" args = parse_args() download_base = args.download_dir or os.path.join(os.getcwd(), "downloads") os.makedirs(download_base, exist_ok=True) @@ -176,11 +197,11 @@ def main(): ods_list.extend(load_column_from_csv(entry, "ods")) else: ods_list.extend(o.strip() for o in entry.split(",") if o.strip()) - ods_list = list(dict.fromkeys(ods_list)) + ods_list = sorted(list(dict.fromkeys(ods_list))) store = SPARQLStore(query_endpoint=args.db_url, returnFormat="json") q = build_ods_to_uprn_query(ods_list) - logging.info("SPARQL query for ODS→UPRN mapping with recCodeAddress:\n%s", q) + logging.info("SPARQL query for ODS→UPRN mapping:\n%s", q) res = store.query(q) out_csv = os.path.join(download_base, "ods_to_uprn.csv") @@ -224,7 +245,9 @@ def main(): grouping = {} for row in res: - grouping.setdefault(row["outputArea"], []).append(row["uprnValue"]) + grouping.setdefault(str(row["outputArea"]), []).append( + str(row["uprnValue"]) + ) for oa, uprns in grouping.items(): name = oa.split("/")[-1] @@ -232,7 +255,7 @@ def main(): with open(out_csv, "w", newline="") as cf: writer = csv.writer(cf) writer.writerow(["uprn"]) - for u in uprns: + for u in sorted(uprns): writer.writerow([u]) logging.info(f"✔ Saved CSV for {oa} → {out_csv}") @@ -244,13 +267,15 @@ def main(): uprn_list.extend(load_column_from_csv(entry, "uprn")) else: uprn_list.extend(u.strip() for u in entry.split(",") if u.strip()) - uprn_list = list(dict.fromkeys(uprn_list)) + uprn_list = sorted(list(dict.fromkeys(uprn_list))) # Sort for consistent query if uprn_list: api_key = os.getenv(args.api_key_env) if not api_key: - logging.error(f"Env var {args.api_key_env!r} is not set") - raise RuntimeError(f"Env var {args.api_key_env!r} is not set") + logging.error( + f"API key environment variable {args.api_key_env!r} is not set." + ) + return store = SPARQLStore(query_endpoint=args.db_url, returnFormat="json") q = build_asset_query(uprn_list, args) @@ -258,15 +283,34 @@ def main(): res = store.query(q) for row in res: - uprn_val = str(row["uprnValue"]) - url = str(row["contentUrl"]) - enum_iri = str(row["enum"]) - subdir = asset_subdir(enum_iri) + try: + uprn_val = str(row["uprnValue"]) + url = str(row["contentUrl"]) + enum_iri = str(row["enum"]) - tgt_dir = os.path.join(download_base, uprn_val, subdir) - logging.info(f"⤷ Downloading {url} into {tgt_dir}/") + phenomenon_time_obj = row["phenomenonTime"].value + if isinstance(phenomenon_time_obj, datetime): + date_str = phenomenon_time_obj.strftime("%Y-%m-%d") + else: + date_str = str(phenomenon_time_obj).split("T")[0] - download_asset(url, tgt_dir, api_key) + asset_type_subdir = asset_subdir(enum_iri) + + tgt_dir = os.path.join( + download_base, uprn_val, date_str, asset_type_subdir + ) + + logging.info(f"⤷ Queuing download for {url} into {tgt_dir}/") + download_asset(url, tgt_dir, api_key) + + except KeyError as e: + logging.error( + f"✖ Query result row was missing expected key: {e}. Row: {row}" + ) + except Exception as e: + logging.error( + f"✖ An unexpected error occurred while processing row {row}: {e}" + ) if __name__ == "__main__": diff --git a/examples/tests/test_br_decompression.py b/examples/tests/test_br_decompression.py deleted file mode 100644 index 6eaf47c..0000000 --- a/examples/tests/test_br_decompression.py +++ /dev/null @@ -1,73 +0,0 @@ -import br_decompress as br -import brotli -import query_assist as qa - - -def _make_compressed_pair(): - """Return (raw_bytes, brotli_compressed_bytes).""" - raw = b"FOR UNIT TEST ONLY - pretend this is a PCD header\n" - return raw, brotli.compress(raw) - - -class _DummyResponse: - def __init__(self, data): - self.status_code = 200 - self.headers = {"Content-Disposition": 'attachment; filename="cloud.pcd.br"'} - self.content = data - - def raise_for_status(self): - pass - - -class _DummyStore: - """Fake rdflib SPARQLStore that yields exactly one result row.""" - - def __init__(self, *_, **__): - pass - - def query(self, *_): - return [ - { - "uprnValue": "999", - "contentUrl": "https://example.com/cloud.pcd.br", - "enum": "did:lidar-pointcloud-merged", - } - ] - - -def test_download_and_decompress_brotli(tmp_path, monkeypatch): - raw, compressed = _make_compressed_pair() - - monkeypatch.setattr(qa, "SPARQLStore", _DummyStore) - - monkeypatch.setattr(qa.httpx, "get", lambda *a, **k: _DummyResponse(compressed)) - - monkeypatch.setenv("API_KEY", "DUMMY") - - monkeypatch.setattr( - qa, - "parse_args", - lambda: qa.argparse.Namespace( - uprn=["999"], - ods=None, - sensor=None, - types=None, - output_area=None, - db_url="http://dummy", - download_dir=str(tmp_path), - api_key_env="API_KEY", - ), - ) - - qa.main() - - p_br = tmp_path / "999" / "lidar-pointcloud-merged" / "cloud.pcd.br" - assert p_br.is_file(), "compressed asset should have been saved by query_assist" - - br.find_and_replace_pcd_br(str(tmp_path)) - - p_raw = p_br.with_suffix("") - assert p_raw.is_file(), "decompressed .pcd should exist" - assert not p_br.exists(), ".pcd.br should have been removed" - - assert p_raw.read_bytes() == raw, "decompressed bytes should match original" diff --git a/examples/tests/test_download_scripts.py b/examples/tests/test_download_scripts.py deleted file mode 100644 index 324a8e9..0000000 --- a/examples/tests/test_download_scripts.py +++ /dev/null @@ -1,96 +0,0 @@ -import importlib -from pathlib import Path - -import pytest - - -class _DummyRow(dict): - """row['uprnValue'] / row['contentUrl'] lookup just like ResultRow""" - - def __getitem__(self, key): - return super().get(key) - - -class _DummyEndpoint: - """Replaces SPARQLStore instance inside each script.""" - - def __init__(self, rows): - self._rows = rows - - def query(self, *_): - return self._rows - - -def _fake_response(): - class _R: - status_code = 200 - headers = {"Content-Disposition": 'attachment; filename="file.bin"'} - content = b"DUMMY" - - def raise_for_status(self): - pass - - return _R() - - -@pytest.mark.parametrize( - "mod_name, expects_uprn_subfolder", - [ - ("examples.get_all_assets_for_a_list_of_uprns", True), - ("examples.get_all_assets_for_a_uprn", False), - ("examples.get_all_assets_for_a_uprn_made_by_a_sensor", True), - ("examples.get_all_assets_of_type_for_list_of_uprns", True), - ], -) -def test_script_downloads(tmp_path, monkeypatch, mod_name, expects_uprn_subfolder): - """Import the script as a module, monkey-patch, run main(), check the file.""" - - import httpx - - monkeypatch.setattr(httpx, "get", lambda *a, **k: _fake_response()) - - monkeypatch.setenv("API_KEY", "UNIT-TEST-KEY") - - mod = importlib.import_module(mod_name) - - monkeypatch.setattr(mod, "DOWNLOAD_DIR", str(tmp_path)) - - monkeypatch.setattr(mod, "ResultRow", _DummyRow) - - dummy_rows = [_DummyRow({"uprnValue": "999", "contentUrl": "https://x/y.bin"})] - if hasattr(mod, "endpoint"): - monkeypatch.setattr(mod, "endpoint", _DummyEndpoint(dummy_rows)) - else: - monkeypatch.setattr(mod, "endpoint", _DummyEndpoint(dummy_rows)) - - mod.main() - - if expects_uprn_subfolder: - expected = Path(tmp_path) / "999" / "file.bin" - else: - expected = Path(tmp_path) / "file.bin" - - assert expected.is_file(), f"{mod_name}: expected {expected} to exist" - - -@pytest.mark.parametrize( - "mod_name, substrings", - [ - ("examples.get_all_assets_for_a_list_of_uprns", ["200003455212", "5045394"]), - ("examples.get_all_assets_for_a_uprn", ["5045394"]), - ( - "examples.get_all_assets_for_a_uprn_made_by_a_sensor", - ["5045394", "bess:OusterLidarSensor"], - ), - ( - "examples.get_all_assets_of_type_for_list_of_uprns", - ["did:rgb-image", "lidar-pointcloud-merged"], - ), - ], -) -def test_query_contains_expected_literals(mod_name, substrings): - """Make sure the hard-coded constants really appear in the QUERY string.""" - mod = importlib.import_module(mod_name) - q = mod.QUERY - for s in substrings: - assert s in q, f"{mod_name}: missing {s} in QUERY" diff --git a/examples/tests/test_query_assist.py b/examples/tests/test_query_assist.py index a431aee..9f39c81 100644 --- a/examples/tests/test_query_assist.py +++ b/examples/tests/test_query_assist.py @@ -69,11 +69,19 @@ def __init__(self, *a, **k): pass def query(self, *_): + class _PhenomenonTime: + def __init__(self): + # fixed date for deterministic path + from datetime import datetime + + self.value = datetime(2024, 1, 2) + return [ { "uprnValue": "42", "contentUrl": "https://example.com/file.bin", "enum": "did:rgb-image", + "phenomenonTime": _PhenomenonTime(), } ] @@ -81,7 +89,22 @@ def query(self, *_): def test_cli_download_creates_nested_dir(tmp_path, monkeypatch): """Full happy-path run – ensures ///file.bin is created.""" monkeypatch.setattr(qa, "SPARQLStore", _DummyStore) - monkeypatch.setattr(qa.httpx, "get", lambda *a, **k: _dummy_http_response()) + + # Mock httpx.Client context manager used in download_asset + class _DummyClient: + def __init__(self, *a, **k): + pass + + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + def get(self, *a, **k): + return _dummy_http_response() + + monkeypatch.setattr(qa.httpx, "Client", _DummyClient) monkeypatch.setenv("API_KEY", "FAKE-KEY") argv = ["query_assist", "--uprn", "42", "--download-dir", str(tmp_path)] @@ -102,12 +125,13 @@ def test_cli_download_creates_nested_dir(tmp_path, monkeypatch): qa.main() - expected = tmp_path / "42" / "rgb-image" / "file.bin" + # Includes date directory now (YYYY-MM-DD) + expected = tmp_path / "42" / "2024-01-02" / "rgb-image" / "file.bin" assert expected.is_file(), f"expected {expected} to exist" -def test_cli_fails_without_api_key(monkeypatch): - """Main should raise RuntimeError if API_KEY env var is missing.""" +def test_cli_missing_api_key_logs_error(monkeypatch, caplog): + """Modern behaviour: when API key missing, log error and return without raising.""" monkeypatch.setattr(qa, "SPARQLStore", _DummyStore) monkeypatch.delenv("API_KEY", raising=False) @@ -125,5 +149,9 @@ def test_cli_fails_without_api_key(monkeypatch): api_key_env="API_KEY", ), ) - with pytest.raises(RuntimeError, match="Env var 'API_KEY' is not set"): - qa.main() + caplog.clear() + qa.main() + assert any( + "API key environment variable 'API_KEY' is not set." in r.message + for r in caplog.records + ), "Expected error log for missing API key"