Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 250 additions & 4 deletions application/cmd/cre_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
logger.setLevel(logging.INFO)

app = None
DEFAULT_UPSTREAM_API_URL = "https://opencre.org/rest/v1"
UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS = 30
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV = "CRE_UPSTREAM_SYNC_MAX_MAP_ANALYSIS_PAIRS"


def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node:
Expand Down Expand Up @@ -461,14 +464,244 @@ def review_from_spreadsheet(cache: str, spreadsheet_url: str, share_with: str) -
# logger.info("A spreadsheet view is at %s" % sheet_url)


def _upstream_api_url() -> str:
return os.environ.get("CRE_UPSTREAM_API_URL", DEFAULT_UPSTREAM_API_URL).rstrip("/")


def _progressively_sync_weak_links_for_pair(
collection: db.Node_collection,
upstream_api_url: str,
base_standard: str,
compare_standard: str,
result_payload: Dict[str, Any],
) -> Tuple[int, int]:
weak_attempted = 0
weak_synced = 0

for key, value in result_payload.items():
if not isinstance(key, str) or not isinstance(value, dict):
continue

extra = value.get("extra")
try:
extra = int(extra) if extra is not None else 0
except (TypeError, ValueError):
extra = 0
if extra <= 0:
continue

weak_cache_key = gap_analysis.make_subresources_key(
standards=[base_standard, compare_standard], key=key
)
if collection.gap_analysis_exists(weak_cache_key):
continue

weak_attempted += 1
try:
weak_response = requests.get(
f"{upstream_api_url}/map_analysis_weak_links",
params=[
("standard", base_standard),
("standard", compare_standard),
("key", key),
],
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
except requests.RequestException as exc:
logger.warning(
"Could not sync weak links for %s >> %s (key=%s): %s",
base_standard,
compare_standard,
key,
exc,
)
continue
if weak_response.status_code != 200:
logger.info(
"Skipping weak links for %s >> %s (key=%s) from upstream (status=%s)",
base_standard,
compare_standard,
key,
weak_response.status_code,
)
continue

try:
weak_payload = weak_response.json()
except ValueError:
logger.warning(
"Skipping weak links for %s >> %s (key=%s) due to invalid JSON payload",
base_standard,
compare_standard,
key,
)
continue
if not isinstance(weak_payload, dict) or weak_payload.get("result") is None:
continue

collection.add_gap_analysis_result(
cache_key=weak_cache_key,
ga_object=json.dumps({"result": weak_payload.get("result")}),
)
weak_synced += 1

return weak_attempted, weak_synced


def _progressively_sync_gap_analysis_from_upstream(
collection: db.Node_collection, upstream_api_url: str
) -> None:
max_pairs_raw = os.environ.get(UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV, "0")
try:
max_pairs = int(max_pairs_raw)
except ValueError:
logger.warning(
"%s should be an integer, got '%s'. Falling back to full sync.",
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV,
max_pairs_raw,
)
max_pairs = 0
if max_pairs < 0:
max_pairs = 0

try:
standards_response = requests.get(
f"{upstream_api_url}/standards",
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
except requests.RequestException as exc:
logger.warning(
"Failed to fetch standards from upstream map analysis API: %s", exc
)
return
if standards_response.status_code != 200:
logger.warning(
"Could not fetch standards from upstream (status=%s), skipping map analysis sync",
standards_response.status_code,
)
return

try:
standards = standards_response.json()
except ValueError:
logger.warning("Upstream /standards response is not valid JSON, skipping")
return
if not isinstance(standards, list):
logger.warning("Upstream /standards response is not a list, skipping")
return
standards = [standard for standard in standards if isinstance(standard, str)]
standards = list(dict.fromkeys(standards))

total_pairs = len(standards) * (len(standards) - 1)
if total_pairs == 0:
logger.info("No standard pairs found for progressive map analysis sync")
return

logger.info(
"Starting progressive map analysis sync for up to %s pair(s) out of %s total",
max_pairs if max_pairs else "all",
total_pairs,
)

attempted_pairs = 0
synced_pairs = 0
weak_links_attempted = 0
weak_links_synced = 0

for standard_a in standards:
for standard_b in standards:
if standard_a == standard_b:
continue

cache_key = gap_analysis.make_resources_key([standard_a, standard_b])
if collection.gap_analysis_exists(cache_key):
continue

if max_pairs and synced_pairs >= max_pairs:
logger.info(
"Reached %s=%s after syncing %s pair(s), stopping early",
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV,
max_pairs,
synced_pairs,
)
return

attempted_pairs += 1
try:
response = requests.get(
f"{upstream_api_url}/map_analysis",
params=[("standard", standard_a), ("standard", standard_b)],
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
except requests.RequestException as exc:
logger.warning(
"Could not sync map analysis for %s >> %s: %s",
standard_a,
standard_b,
exc,
)
continue
if response.status_code != 200:
logger.info(
"Skipping map analysis %s >> %s from upstream (status=%s)",
standard_a,
standard_b,
response.status_code,
)
continue

try:
payload = response.json()
except ValueError:
logger.warning(
"Skipping map analysis %s >> %s due to invalid JSON payload",
standard_a,
standard_b,
)
continue
if not isinstance(payload, dict) or payload.get("result") is None:
continue

collection.add_gap_analysis_result(
cache_key=cache_key,
ga_object=json.dumps({"result": payload.get("result")}),
)
synced_pairs += 1

weak_attempted, weak_synced = _progressively_sync_weak_links_for_pair(
collection=collection,
upstream_api_url=upstream_api_url,
base_standard=standard_a,
compare_standard=standard_b,
result_payload=payload.get("result"),
)
weak_links_attempted += weak_attempted
weak_links_synced += weak_synced

if synced_pairs % 25 == 0:
logger.info(
"Progressive map analysis sync: synced %s pair(s) so far",
synced_pairs,
)

logger.info(
"Progressive map analysis sync complete. Attempted %s missing pair(s), synced %s pair(s), attempted %s weak-link result(s), synced %s weak-link result(s)",
attempted_pairs,
synced_pairs,
weak_links_attempted,
weak_links_synced,
)


def download_graph_from_upstream(cache: str) -> None:
imported_cres = {}
collection = db_connect(path=cache).with_graph()
upstream_api_url = _upstream_api_url()

def download_cre_from_upstream(creid: str):
cre_response = requests.get(
os.environ.get("CRE_UPSTREAM_API_URL", "https://opencre.org/rest/v1")
+ f"/id/{creid}"
f"{upstream_api_url}/id/{creid}",
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
if cre_response.status_code != 200:
raise RuntimeError(
Expand All @@ -487,8 +720,8 @@ def download_cre_from_upstream(creid: str):
download_cre_from_upstream(link.document.id)

root_cres_response = requests.get(
os.environ.get("CRE_UPSTREAM_API_URL", "https://opencre.org/rest/v1")
+ "/root_cres"
f"{upstream_api_url}/root_cres",
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
if root_cres_response.status_code != 200:
raise RuntimeError(
Expand All @@ -503,6 +736,19 @@ def download_cre_from_upstream(creid: str):
if link.document.doctype == defs.Credoctypes.CRE:
download_cre_from_upstream(link.document.id)

if not os.environ.get("CRE_NO_NEO4J"):
try:
populate_neo4j_db(cache)
except Exception as exc:
logger.warning(
"Could not populate local neo4j DB during upstream sync: %s", exc
)

_progressively_sync_gap_analysis_from_upstream(
collection=collection,
upstream_api_url=upstream_api_url,
)


# def review_from_disk(cache: str, cre_file_loc: str, share_with: str) -> None:
# """--review --cre_loc <path>
Expand Down
Loading