Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
# name: Test-e2e
# on: [push, pull_request]
# jobs:
# build:
# name: Test-e2e
# runs-on: ubuntu-latest
# timeout-minutes: 10
# steps:
# - name: Check out code
# uses: actions/checkout@v4
# - uses: actions/setup-python@v4
# with:
# python-version: '3.11.4'
# cache: 'pip'
# - uses: actions/setup-node@v3
# with:
# cache: 'yarn'
# node-version: 'v20.12.1'
# - name: Install dependencies
# run: |
# sudo apt-get update
# sudo apt-get install -y python3-setuptools python3-pip python3-virtualenv chromium-browser libgbm1
# make install
# - name: DB setup
# run: |
# make migrate-upgrade
# python cre.py --upstream_sync
# # - name: Run app and e2e tests
# run: |
# make e2e
name: Test-e2e
on: [push, pull_request]
jobs:
build:
name: Test-e2e
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Check out code
uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: '3.11.4'
cache: 'pip'
- uses: actions/setup-node@v3
with:
cache: 'yarn'
node-version: 'v20.12.1'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y python3-setuptools python3-pip python3-virtualenv chromium-browser libgbm1
make install
- name: DB setup
run: |
make migrate-upgrade
python cre.py --upstream_sync
# - name: Run app and e2e tests
run: |
make e2e
55 changes: 55 additions & 0 deletions application/tests/web_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,25 @@ def test_find_by_id(self) -> None:
)
self.assertEqual(re.sub("\s", "", md_response.data.decode()), md_expected)

cdx_response = client.get(
f"/rest/v1/id/{cres['cd'].id}?format=cyclonedx",
headers={"Content-Type": "application/json"},
)
cdx_payload = json.loads(cdx_response.data.decode())
self.assertEqual(200, cdx_response.status_code)
self.assertEqual("CycloneDX", cdx_payload["bomFormat"])
self.assertTrue(len(cdx_payload["components"]) == 1)
components = cdx_payload["components"]
component_cd = next(
(c for c in components if c.get("name") == "CD"),
None,
)
self.assertIsNotNone(component_cd)
external_references = component_cd.get("externalReferences", [])
self.assertTrue(
any(ref.get("type") == "attestation" for ref in external_references)
)

def test_find_by_name(self) -> None:
collection = db.Node_collection().with_graph()
cres = {
Expand Down Expand Up @@ -384,6 +403,24 @@ def test_find_node_by_name(self) -> None:
]
self.assertCountEqual(expected, actual)

cdx_response = client.get(
f"/rest/v1/standard/{nodes['sa'].name}?format=cyclonedx"
)
cdx_payload = json.loads(cdx_response.data.decode())
self.assertEqual(200, cdx_response.status_code)
self.assertEqual("CycloneDX", cdx_payload["bomFormat"])
self.assertEqual(5, len(cdx_payload["components"]))
self.assertTrue(
any(
any(
prop.get("name") == "opencre:doctype"
and prop.get("value") == "Standard"
for prop in component.get("properties", [])
)
for component in cdx_payload["components"]
)
)

def test_find_document_by_tag(self) -> None:
collection = db.Node_collection()
cres = {
Expand All @@ -406,6 +443,12 @@ def test_find_document_by_tag(self) -> None:
self.assertEqual(200, response.status_code)
self.assertCountEqual(json.loads(response.data.decode()), expected)

cdx_response = client.get("/rest/v1/tags?tag=ta&format=cyclonedx")
cdx_payload = json.loads(cdx_response.data.decode())
self.assertEqual(200, cdx_response.status_code)
self.assertEqual("CycloneDX", cdx_payload["bomFormat"])
self.assertEqual(2, len(cdx_payload["components"]))

def test_test_search(self) -> None:
collection = db.Node_collection()
docs = {
Expand Down Expand Up @@ -442,6 +485,12 @@ def test_test_search(self) -> None:
self.assertEqual(200, resp.status_code)
self.assertDictEqual(resp.json[0], expected[0])

cdx_response = client.get("/rest/v1/text_search?text=SB&format=cyclonedx")
cdx_payload = json.loads(cdx_response.data.decode())
self.assertEqual(200, cdx_response.status_code)
self.assertEqual("CycloneDX", cdx_payload["bomFormat"])
self.assertEqual(1, len(cdx_payload["components"]))

def test_find_root_cres(self) -> None:
self.maxDiff = None
collection = db.Node_collection().with_graph()
Expand Down Expand Up @@ -485,6 +534,12 @@ def test_find_root_cres(self) -> None:
self.assertEqual(json.loads(response.data.decode()), expected)
self.assertEqual(200, response.status_code)

cdx_response = client.get("/rest/v1/root_cres?format=cyclonedx")
cdx_payload = json.loads(cdx_response.data.decode())
self.assertEqual(200, cdx_response.status_code)
self.assertEqual("CycloneDX", cdx_payload["bomFormat"])
self.assertEqual(2, len(cdx_payload["components"]))

def test_smartlink(self) -> None:
self.maxDiff = None
collection = db.Node_collection().with_graph()
Expand Down
76 changes: 76 additions & 0 deletions application/web/web_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

# silence mypy for the routes file
import csv
from datetime import datetime, timezone
from functools import wraps
import json
import logging
import os
import io
import pathlib
import urllib.parse
import uuid
from alive_progress import alive_bar
from typing import Any
from application.utils import oscal_utils, redis
Expand Down Expand Up @@ -66,6 +68,68 @@ class SupportedFormats(Enum):
JSON = "json"
YAML = "yaml"
OSCAL = "oscal"
CycloneDX = "cyclonedx"


def _document_attestation_url(document: defs.Document) -> str:
if document.doctype == defs.Credoctypes.CRE and document.id:
return f"https://www.opencre.org/cre/{document.id}"
if getattr(document, "hyperlink", ""):
return document.hyperlink
identifier = str(document.id) if getattr(document, "id", None) else document.name
return f"https://www.opencre.org/node/{document.doctype.value.lower()}/{urllib.parse.quote(identifier)}"


def _document_to_cyclonedx_component(document: defs.Document) -> dict[str, Any]:
component = {
"type": "data",
"name": document.name,
"bom-ref": f"{document.doctype.value}:{document.id or document.name}",
"properties": [
{"name": "opencre:doctype", "value": document.doctype.value},
{"name": "opencre:id", "value": str(document.id or "")},
],
"externalReferences": [
{
"type": "attestation",
"url": _document_attestation_url(document),
"comment": "OpenCRE source attestation",
}
],
}
if document.description:
component["description"] = document.description
if document.tags:
component["properties"].append(
{
"name": "opencre:tags",
"value": ",".join(tag for tag in document.tags if tag),
}
)
return component


def _documents_to_cyclonedx(documents: list[defs.Document]) -> dict[str, Any]:
return {
"bomFormat": "CycloneDX",
"specVersion": "1.6",
"serialNumber": f"urn:uuid:{uuid.uuid4()}",
"version": 1,
"metadata": {
"timestamp": datetime.now(timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
"component": {
"type": "application",
"name": "OpenCRE",
"externalReferences": [
{"type": "website", "url": "https://www.opencre.org/"}
],
},
},
"components": [_document_to_cyclonedx_component(doc) for doc in documents],
}


def extend_cre_with_tag_links(
Expand Down Expand Up @@ -137,6 +201,10 @@ def find_cre(creid: str = None, crename: str = None) -> Any: # refer

elif opt_format == SupportedFormats.OSCAL.value:
result = {"data": json.loads(oscal_utils.document_to_oscal(cre))}
elif opt_format == SupportedFormats.CycloneDX.value:
result = _documents_to_cyclonedx([cre])
elif opt_format is not None:
abort(400, "Unsupported format")

return jsonify(result)
abort(404, "CRE does not exist")
Expand Down Expand Up @@ -227,6 +295,8 @@ def find_node_by_name(

elif opt_format == SupportedFormats.OSCAL.value:
return jsonify(json.loads(oscal_utils.list_to_oscal(nodes)))
elif opt_format == SupportedFormats.CycloneDX.value:
return jsonify(_documents_to_cyclonedx(nodes))

# if opt_osib:
# result["osib"] = odefs.cre2osib(nodes).todict()
Expand Down Expand Up @@ -263,6 +333,8 @@ def find_document_by_tag() -> Any:
return write_csv(docs=docs).getvalue().encode("utf-8")
elif opt_format == SupportedFormats.OSCAL.value:
return jsonify(json.loads(oscal_utils.list_to_oscal(documents)))
elif opt_format == SupportedFormats.CycloneDX.value:
return jsonify(_documents_to_cyclonedx(documents))

return jsonify(result)
abort(404, "Tag does not exist")
Expand Down Expand Up @@ -423,6 +495,8 @@ def text_search() -> Any:
return write_csv(docs=docs).getvalue().encode("utf-8")
elif opt_format == SupportedFormats.OSCAL.value:
return jsonify(json.loads(oscal_utils.list_to_oscal(documents)))
elif opt_format == SupportedFormats.CycloneDX.value:
return jsonify(_documents_to_cyclonedx(documents))
Comment on lines 496 to +499
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CycloneDX support was added to multiple endpoints (/rest/v1/tags, /rest/v1/text_search, /rest/v1/root_cres), but the new tests only exercise the id and standard routes. Since these new format=cyclonedx branches are separate code paths, please add assertions in the existing tests for these endpoints (or new tests) to cover the CycloneDX response shape and edge cases.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot open a new pull request to apply changes based on this feedback


res = [doc.todict() for doc in documents]
return jsonify(res)
Expand Down Expand Up @@ -457,6 +531,8 @@ def find_root_cres() -> Any:
return write_csv(docs=docs).getvalue().encode("utf-8")
elif opt_format == SupportedFormats.OSCAL.value:
return jsonify(json.loads(oscal_utils.list_to_oscal(documents)))
elif opt_format == SupportedFormats.CycloneDX.value:
return jsonify(_documents_to_cyclonedx(documents))

return jsonify(result)
abort(404, "No root CREs")
Expand Down
Loading