From 55832f97843e2cfb104275bc3bf37efe51740106 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Wed, 17 Sep 2025 14:23:20 -0700 Subject: [PATCH 1/3] python(feat): Try using faster yaml loader with fallback for older python versions. --- .../lib/sift_py/ingestion/config/yaml/load.py | 11 +--- .../lib/sift_py/yaml/calculated_channels.py | 65 +++++++++---------- python/lib/sift_py/yaml/report_templates.py | 32 +++++---- python/lib/sift_py/yaml/rule.py | 47 +++++++------- python/lib/sift_py/yaml/utils.py | 17 ++++- 5 files changed, 86 insertions(+), 86 deletions(-) diff --git a/python/lib/sift_py/ingestion/config/yaml/load.py b/python/lib/sift_py/ingestion/config/yaml/load.py index 5d7b9ebd0..4c33eb83b 100644 --- a/python/lib/sift_py/ingestion/config/yaml/load.py +++ b/python/lib/sift_py/ingestion/config/yaml/load.py @@ -1,8 +1,6 @@ from pathlib import Path from typing import Any, Dict, List, cast -import yaml - import sift_py.yaml.rule as rule_yaml from sift_py.ingestion.config.yaml.error import YamlConfigError from sift_py.ingestion.config.yaml.spec import ( @@ -11,7 +9,7 @@ ) from sift_py.yaml.channel import ChannelConfigYamlSpec, _validate_channel, _validate_channel_anchor from sift_py.yaml.rule import RuleYamlSpec -from sift_py.yaml.utils import _type_fqn +from sift_py.yaml.utils import _type_fqn, try_fast_yaml_load load_named_expression_modules = rule_yaml.load_named_expression_modules @@ -22,7 +20,7 @@ def read_and_validate(path: Path) -> TelemetryConfigYamlSpec: step will return an error whose source is the `yaml` package. Any errors that may occur during the validation step will return a `sift_py.ingestion.config.yaml.error.YamlConfigError`. """ - raw_config = _read_yaml(path) + raw_config = try_fast_yaml_load(path) return _validate_yaml(raw_config) @@ -88,11 +86,6 @@ def _validate_yaml(raw_config: Dict[Any, Any]) -> TelemetryConfigYamlSpec: return cast(TelemetryConfigYamlSpec, raw_config) -def _read_yaml(path: Path) -> Dict[Any, Any]: - with open(path, "r") as f: - return cast(Dict[Any, Any], yaml.safe_load(f.read())) - - def _validate_flow(val: Any): flow = cast(Dict[Any, Any], val) diff --git a/python/lib/sift_py/yaml/calculated_channels.py b/python/lib/sift_py/yaml/calculated_channels.py index e7a5a97e0..94cc5fa33 100644 --- a/python/lib/sift_py/yaml/calculated_channels.py +++ b/python/lib/sift_py/yaml/calculated_channels.py @@ -1,11 +1,9 @@ from pathlib import Path -from typing import Any, Dict, List, cast - -import yaml +from typing import List from sift_py.calculated_channels.config import CalculatedChannelConfig from sift_py.ingestion.config.yaml.error import YamlConfigError -from sift_py.yaml.utils import _handle_subdir +from sift_py.yaml.utils import _handle_subdir, try_fast_yaml_load def load_calculated_channels(paths: List[Path]) -> List[CalculatedChannelConfig]: @@ -29,36 +27,35 @@ def update_calculated_channels(path: Path): def _read_calculated_channels_yaml(path: Path) -> List[CalculatedChannelConfig]: calculated_channel_configs = [] - with open(path, "r") as f: - channel_config_yaml = cast(Dict[str, Any], yaml.safe_load(f.read())) - - calculated_channel_list = channel_config_yaml.get("calculated_channels", []) - for calc_channel in calculated_channel_list: - if not isinstance(calc_channel, dict): + channel_config_yaml = try_fast_yaml_load(path) + + calculated_channel_list = channel_config_yaml.get("calculated_channels", []) + for calc_channel in calculated_channel_list: + if not isinstance(calc_channel, dict): + raise YamlConfigError( + f"Expected 'calculated_channels' to be a list of dictionaries in yaml: '{path}'" + ) + for channel_ref in calc_channel.get("channel_references", []): + parsed_channel_refs = [] + if not isinstance(channel_ref, dict): raise YamlConfigError( - f"Expected 'calculated_channels' to be a list of dictionaries in yaml: '{path}'" + f"Expected 'channel_references' to be a list of dictionaries in yaml: '{path}'" ) - for channel_ref in calc_channel.get("channel_references", []): - parsed_channel_refs = [] - if not isinstance(channel_ref, dict): - raise YamlConfigError( - f"Expected 'channel_references' to be a list of dictionaries in yaml: '{path}'" - ) - if "channel_reference" not in channel_ref: - for k, v in channel_ref.items(): - parsed_channel_refs.append(dict(channel_reference=k, channel_identifier=v)) - else: - parsed_channel_refs.append(channel_ref) - calc_channel["channel_references"] = parsed_channel_refs - - if not isinstance(calculated_channel_list, list): - raise YamlConfigError(f"Expected 'calculated_channels' to be a list in yaml: '{path}'") + if "channel_reference" not in channel_ref: + for k, v in channel_ref.items(): + parsed_channel_refs.append(dict(channel_reference=k, channel_identifier=v)) + else: + parsed_channel_refs.append(channel_ref) + calc_channel["channel_references"] = parsed_channel_refs + + if not isinstance(calculated_channel_list, list): + raise YamlConfigError(f"Expected 'calculated_channels' to be a list in yaml: '{path}'") + + for calc_channel in calculated_channel_list: + try: + calc_channel_cfg = CalculatedChannelConfig(**calc_channel) + calculated_channel_configs.append(calc_channel_cfg) + except Exception as e: + raise YamlConfigError(f"Error parsing calculated channel '{calc_channel}'") from e - for calc_channel in calculated_channel_list: - try: - calc_channel_cfg = CalculatedChannelConfig(**calc_channel) - calculated_channel_configs.append(calc_channel_cfg) - except Exception as e: - raise YamlConfigError(f"Error parsing calculated channel '{calc_channel}'") from e - - return calculated_channel_configs + return calculated_channel_configs diff --git a/python/lib/sift_py/yaml/report_templates.py b/python/lib/sift_py/yaml/report_templates.py index defeb0aab..06a947612 100644 --- a/python/lib/sift_py/yaml/report_templates.py +++ b/python/lib/sift_py/yaml/report_templates.py @@ -1,13 +1,12 @@ from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, cast +from typing import List -import yaml from typing_extensions import NotRequired, TypedDict from sift_py.ingestion.config.yaml.error import YamlConfigError from sift_py.report_templates.config import ReportTemplateConfig -from sift_py.yaml.utils import _handle_subdir +from sift_py.yaml.utils import _handle_subdir, try_fast_yaml_load def load_report_templates(paths: List[Path]) -> List[ReportTemplateConfig]: @@ -31,23 +30,22 @@ def update_report_templates(path: Path): def _read_report_template_yaml(path: Path) -> List[ReportTemplateConfig]: report_templates = [] - with open(path, "r") as f: - report_templates_yaml = cast(Dict[str, Any], yaml.safe_load(f.read())) + report_templates_yaml = try_fast_yaml_load(path) - report_template_list = report_templates_yaml.get("report_templates") - if not isinstance(report_template_list, list): - raise YamlConfigError( - f"Expected 'report_templates' to be a list in report template yaml: '{path}'" - ) + report_template_list = report_templates_yaml.get("report_templates") + if not isinstance(report_template_list, list): + raise YamlConfigError( + f"Expected 'report_templates' to be a list in report template yaml: '{path}'" + ) - for report_template in report_template_list: - try: - report_template_config = ReportTemplateConfig(**report_template) - report_templates.append(report_template_config) - except Exception as e: - raise YamlConfigError(f"Error parsing report template '{report_template}'") from e + for report_template in report_template_list: + try: + report_template_config = ReportTemplateConfig(**report_template) + report_templates.append(report_template_config) + except Exception as e: + raise YamlConfigError(f"Error parsing report template '{report_template}'") from e - return report_templates + return report_templates class ReportTemplateYamlSpec(TypedDict): diff --git a/python/lib/sift_py/yaml/rule.py b/python/lib/sift_py/yaml/rule.py index 8f509268c..abb05d965 100644 --- a/python/lib/sift_py/yaml/rule.py +++ b/python/lib/sift_py/yaml/rule.py @@ -4,7 +4,6 @@ from pathlib import Path from typing import Any, Dict, List, Literal, Union, cast -import yaml from typing_extensions import NotRequired, TypedDict from sift_py.ingestion.config.yaml.error import YamlConfigError @@ -13,7 +12,7 @@ ChannelConfigYamlSpec, _validate_channel_reference, ) -from sift_py.yaml.utils import _handle_subdir, _type_fqn +from sift_py.yaml.utils import _handle_subdir, _type_fqn, try_fast_yaml_load _SUB_EXPRESSION_REGEX = re.compile(r"^\$[a-zA-Z_]+$") @@ -63,36 +62,34 @@ def update_rule_modules(rule_module_path: Path): def _read_named_expression_module_yaml(path: Path) -> Dict[str, str]: - with open(path, "r") as f: - named_expressions = cast(Dict[Any, Any], yaml.safe_load(f.read())) + named_expressions = try_fast_yaml_load(path) - for key, value in named_expressions.items(): - if not isinstance(key, str): - raise YamlConfigError( - f"Expected '{key}' to be a string in named expression module '{path}'." - ) - if not isinstance(value, str): - raise YamlConfigError( - f"Expected expression of '{key}' to be a string in named expression module '{path}'." - ) + for key, value in named_expressions.items(): + if not isinstance(key, str): + raise YamlConfigError( + f"Expected '{key}' to be a string in named expression module '{path}'." + ) + if not isinstance(value, str): + raise YamlConfigError( + f"Expected expression of '{key}' to be a string in named expression module '{path}'." + ) - return cast(Dict[str, str], named_expressions) + return cast(Dict[str, str], named_expressions) def _read_rule_module_yaml(path: Path) -> List[RuleYamlSpec]: - with open(path, "r") as f: - module_rules = cast(Dict[Any, Any], yaml.safe_load(f.read())) - rules = module_rules.get("rules") - if not isinstance(rules, list): - raise YamlConfigError( - f"Expected '{rules}' to be a list in rule module yaml: '{path}'" - f"{_type_fqn(RuleYamlSpec)}" - ) + module_rules = try_fast_yaml_load(path) + rules = module_rules.get("rules") + if not isinstance(rules, list): + raise YamlConfigError( + f"Expected '{rules}' to be a list in rule module yaml: '{path}'" + f"{_type_fqn(RuleYamlSpec)}" + ) - for rule in cast(List[Any], rules): - _validate_rule(rule) + for rule in cast(List[Any], rules): + _validate_rule(rule) - return cast(List[RuleYamlSpec], rules) + return cast(List[RuleYamlSpec], rules) def _validate_rule(val: Any): diff --git a/python/lib/sift_py/yaml/utils.py b/python/lib/sift_py/yaml/utils.py index 0132d5f46..c512080a6 100644 --- a/python/lib/sift_py/yaml/utils.py +++ b/python/lib/sift_py/yaml/utils.py @@ -1,5 +1,7 @@ from pathlib import Path -from typing import Callable, Type +from typing import Any, Callable, Dict, Type, cast + +import yaml def _handle_subdir(path: Path, file_handler: Callable): @@ -13,3 +15,16 @@ def _handle_subdir(path: Path, file_handler: Callable): def _type_fqn(typ: Type) -> str: return f"{typ.__module__}.{typ.__name__}" + + +def try_fast_yaml_load(path: Path) -> Dict[Any, Any]: + """ + Try to load the YAML file using the CSafeLoader, which is faster than the pyyaml safe loader but not built into the wheel for earlier versions of python.. + If the CSafeLoader is not available, use the pyyaml safe loader. + """ + with open(path, "r") as f: + try: + loader = yaml.CSafeLoader + return cast(Dict[Any, Any], yaml.load(f.read(), Loader=loader)) + except AttributeError: + return cast(Dict[Any, Any], yaml.safe_load(f.read())) From b3ca39bb7cbd681626687b5fbeb8a6a2291bbdb6 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Thu, 18 Sep 2025 15:43:20 -0700 Subject: [PATCH 2/3] hasattr --- python/examples/ingestion_with_yaml_config/main.py | 4 ++-- python/lib/sift_py/yaml/utils.py | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/python/examples/ingestion_with_yaml_config/main.py b/python/examples/ingestion_with_yaml_config/main.py index 85f367c58..c5c0a550f 100644 --- a/python/examples/ingestion_with_yaml_config/main.py +++ b/python/examples/ingestion_with_yaml_config/main.py @@ -20,7 +20,7 @@ load_dotenv() - apikey = os.getenv("SIFT_API_KEY") + apikey = os.getenv("SIFT_LOCAL_API_KEY") if apikey is None: raise Exception("Missing 'SIFT_API_KEY' environment variable.") @@ -34,7 +34,7 @@ telemetry_config = nostromos_lv_426() # Create a gRPC transport channel configured specifically for the Sift API - sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey) + sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey, use_ssl=False) with use_sift_channel(sift_channel_config) as channel: # Create ingestion service using the telemetry config we loaded in diff --git a/python/lib/sift_py/yaml/utils.py b/python/lib/sift_py/yaml/utils.py index c512080a6..4270046f2 100644 --- a/python/lib/sift_py/yaml/utils.py +++ b/python/lib/sift_py/yaml/utils.py @@ -23,8 +23,7 @@ def try_fast_yaml_load(path: Path) -> Dict[Any, Any]: If the CSafeLoader is not available, use the pyyaml safe loader. """ with open(path, "r") as f: - try: - loader = yaml.CSafeLoader - return cast(Dict[Any, Any], yaml.load(f.read(), Loader=loader)) - except AttributeError: + if hasattr(yaml, "CSafeLoader"): + return cast(Dict[Any, Any], yaml.load(f.read(), Loader=yaml.CSafeLoader)) + else: return cast(Dict[Any, Any], yaml.safe_load(f.read())) From 488dbced0a3acc389859ad1d9af148f730fadb5e Mon Sep 17 00:00:00 2001 From: Ian Later Date: Tue, 23 Sep 2025 12:42:03 -0400 Subject: [PATCH 3/3] Undo testing changes to example. --- python/examples/ingestion_with_yaml_config/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/examples/ingestion_with_yaml_config/main.py b/python/examples/ingestion_with_yaml_config/main.py index c5c0a550f..85f367c58 100644 --- a/python/examples/ingestion_with_yaml_config/main.py +++ b/python/examples/ingestion_with_yaml_config/main.py @@ -20,7 +20,7 @@ load_dotenv() - apikey = os.getenv("SIFT_LOCAL_API_KEY") + apikey = os.getenv("SIFT_API_KEY") if apikey is None: raise Exception("Missing 'SIFT_API_KEY' environment variable.") @@ -34,7 +34,7 @@ telemetry_config = nostromos_lv_426() # Create a gRPC transport channel configured specifically for the Sift API - sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey, use_ssl=False) + sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey) with use_sift_channel(sift_channel_config) as channel: # Create ingestion service using the telemetry config we loaded in