Conversation
WalkthroughMigrates decoding from Pandas to Spark: core APIs now accept a SparkSession and return Spark DataFrames, decode_mplog_dataframe uses distributed processing (mapInPandas) with per-row schema lookup and metadata extraction; CLI, utils, types, formats, and docs updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Spark as SparkSession
participant Exec as Executor (per-row mapInPandas)
participant Cache as Schema Cache
participant Output as SparkDataFrame
Client->>Spark: call decode_mplog_dataframe(df, spark, ...)
activate Spark
Spark->>Spark: repartition (optional)
Spark->>Exec: mapInPandas(decode_batch)
activate Exec
Exec->>Exec: iterate rows in Pandas batch
loop per row
Exec->>Cache: get_feature_schema(mp_config_id, version)
Cache-->>Exec: schema (cached or fetched)
Exec->>Exec: extract metadata (_extract_metadata_byte)
Exec->>Exec: decode features (proto/arrow/parquet)
Exec->>Exec: format floats
Exec-->>Output: yield decoded dict row
end
deactivate Exec
Spark->>Output: assemble rows into SparkDataFrame
deactivate Spark
Output-->>Client: return decoded SparkDataFrame
No actionable comments were generated in the recent review. 🎉 Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
py-sdk/inference_logging_client/inference_logging_client/types.py (1)
44-49:⚠️ Potential issue | 🟠 MajorPython 3.8 compatibility issue with type hints.
The
list[str]syntax for type hints is only available at runtime starting from Python 3.9. Sincepyproject.tomldeclaresrequires-python = ">=3.8", this will cause aTypeErroron Python 3.8 when the module is imported.🐛 Proposed fix for Python 3.8 compatibility
Add the annotations import at the top of the file:
"""Type definitions for inference-logging-client.""" +from __future__ import annotations from enum import Enum from dataclasses import dataclass, fieldAlternatively, use
typing.List:+from typing import List from dataclasses import dataclass, field # ... later in DecodedMPLog ... - entities: list[str] = field(default_factory=list) - parent_entity: list[str] = field(default_factory=list) + entities: List[str] = field(default_factory=list) + parent_entity: List[str] = field(default_factory=list)py-sdk/inference_logging_client/inference_logging_client/utils.py (1)
178-190:⚠️ Potential issue | 🟠 MajorPython 3.8 compatibility issue with return type annotation.
The
tuple[bool, int, int]return type syntax requires Python 3.9+. This will fail on Python 3.8 which is listed as a supported version.🐛 Proposed fix
Add at the top of the file:
"""Utility functions for type normalization and formatting.""" +from __future__ import annotations from typing import OptionalOr use
typing.Tuple:-from typing import Optional +from typing import Optional, Tuple -def unpack_metadata_byte(metadata_byte: int) -> tuple[bool, int, int]: +def unpack_metadata_byte(metadata_byte: int) -> Tuple[bool, int, int]:py-sdk/inference_logging_client/inference_logging_client/io.py (1)
22-24:⚠️ Potential issue | 🟠 MajorPython 3.8 compatibility issue with generic type annotations.
The
OrderedDict[tuple[str, int], list[FeatureInfo]]syntax requires Python 3.9+. This will cause aTypeErroron Python 3.8.🐛 Proposed fix
Add at the top of the file:
"""I/O operations for schema fetching and protobuf parsing.""" +from __future__ import annotations import osOr use typing generics:
-from collections import OrderedDict +from collections import OrderedDict +from typing import Dict, List, Tuple -_schema_cache: OrderedDict[tuple[str, int], list[FeatureInfo]] = OrderedDict() +_schema_cache: "OrderedDict[Tuple[str, int], List[FeatureInfo]]" = OrderedDict()
🤖 Fix all issues with AI agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 306-309: The empty-DataFrame branch currently returns
spark.createDataFrame([], StructType([])) which drops the expected output
schema; in the df.limit(1).count() == 0 block replace the empty StructType with
the actual output schema used by the transformation (e.g., reuse the computed
output_schema or derive it by applying the same schema-mapping logic used for
non-empty inputs) and return spark.createDataFrame([], output_schema) so the
returned DataFrame preserves entity_id, metadata and feature columns; update the
code around df.limit(1).count() to obtain or reference output_schema before
creating the empty DataFrame.
In `@py-sdk/inference_logging_client/inference_logging_client/cli.py`:
- Around line 171-176: The comment above the df.count() call is inaccurate
because the code performs a full count; either update the comment to reflect
that df.count() is executed, or change the implementation to the advertised
optimization by replacing row_count = df.count() with a cheap emptiness check
such as row_count = df.limit(1).count() (or similar) and only run a full count
when necessary; update the printed message using row_count and retain the
existing exception handling around the count to preserve current behavior.
In `@py-sdk/inference_logging_client/pyproject.toml`:
- Line 31: The package metadata lists Python 3.12 in classifiers but requires
"pyspark>=3.3.0" which doesn't officially support 3.12; update the dependency or
classifiers to be consistent: either change the pyspark requirement string in
pyproject.toml from "pyspark>=3.3.0" to "pyspark>=3.5.0" to enable Python 3.12
support, or remove the Python 3.12 classifier entries from the package
classifiers so they match pyspark>=3.3.0 compatibility.
🧹 Nitpick comments (2)
py-sdk/inference_logging_client/readme.md (1)
628-638: Add language specifiers to diagram code blocks.Several code blocks containing ASCII art diagrams and text layouts are missing language specifiers, which triggers markdownlint warnings. Consider adding
textorplaintextas the language identifier.📝 Example fix for diagram blocks
-``` +```text ┌─────────────────────────────────────────────────────────────┐ │ Byte 0: Generated Flag (1 = no generated values) │This applies to multiple diagram blocks throughout the document (lines 628, 647, 666, 682, 707, 818, 984, 999, 1058, 1074, 1084).
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)
488-489: High default partition count may be excessive for smaller DataFrames.The default of 10,000 partitions is optimized for large rows (3-5 MB each per documentation), but could introduce unnecessary overhead for smaller DataFrames. Consider documenting this trade-off more prominently or adjusting the default based on estimated data size.
| # Check if DataFrame is empty (avoid full count: use limit(1)) | ||
| if df.limit(1).count() == 0: | ||
| from pyspark.sql.types import StructType | ||
| return spark.createDataFrame([], StructType([])) |
There was a problem hiding this comment.
Empty DataFrame returns empty schema instead of expected output schema.
When the input DataFrame is empty, this returns a DataFrame with an empty StructType(), losing the expected output schema (entity_id + metadata columns + feature columns). This may cause issues for downstream operations that depend on a consistent schema.
Suggested fix to preserve schema
# Check if DataFrame is empty (avoid full count: use limit(1))
if df.limit(1).count() == 0:
- from pyspark.sql.types import StructType
- return spark.createDataFrame([], StructType([]))
+ from pyspark.sql.types import StringType, StructField, StructType
+ # Return empty DataFrame with at least entity_id column
+ fields = [StructField("entity_id", StringType(), True)]
+ return spark.createDataFrame([], StructType(fields))🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 306 - 309, The empty-DataFrame branch currently returns
spark.createDataFrame([], StructType([])) which drops the expected output
schema; in the df.limit(1).count() == 0 block replace the empty StructType with
the actual output schema used by the transformation (e.g., reuse the computed
output_schema or derive it by applying the same schema-mapping logic used for
non-empty inputs) and return spark.createDataFrame([], output_schema) so the
returned DataFrame preserves entity_id, metadata and feature columns; update the
code around df.limit(1).count() to obtain or reference output_schema before
creating the empty DataFrame.
| # Avoid full count() for huge DataFrames: use limit(1).count() for empty check only | ||
| try: | ||
| row_count = df.count() | ||
| print(f"Rows: {row_count}", file=sys.stderr) | ||
| except Exception: | ||
| print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr) |
There was a problem hiding this comment.
Misleading comment contradicts actual behavior.
The comment states "Avoid full count() for huge DataFrames" but the code immediately performs df.count(), which is a full count operation. Consider updating the comment to accurately reflect the behavior, or implementing the optimization mentioned.
Suggested comment fix
- # Avoid full count() for huge DataFrames: use limit(1).count() for empty check only
+ # Note: count() can be slow for large DataFrames; errors are caught gracefully
try:
row_count = df.count()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Avoid full count() for huge DataFrames: use limit(1).count() for empty check only | |
| try: | |
| row_count = df.count() | |
| print(f"Rows: {row_count}", file=sys.stderr) | |
| except Exception: | |
| print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr) | |
| # Note: count() can be slow for large DataFrames; errors are caught gracefully | |
| try: | |
| row_count = df.count() | |
| print(f"Rows: {row_count}", file=sys.stderr) | |
| except Exception: | |
| print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr) |
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/inference_logging_client/cli.py` around lines
171 - 176, The comment above the df.count() call is inaccurate because the code
performs a full count; either update the comment to reflect that df.count() is
executed, or change the implementation to the advertised optimization by
replacing row_count = df.count() with a cheap emptiness check such as row_count
= df.limit(1).count() (or similar) and only run a full count when necessary;
update the printed message using row_count and retain the existing exception
handling around the count to preserve current behavior.
|
|
||
| dependencies = [ | ||
| "pandas>=1.3.0", | ||
| "pyspark>=3.3.0", |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
PySpark Python 3.12 support version compatibility
💡 Result:
For Python 3.12, these are the key PySpark / Spark compatibility points:
- Spark / PySpark 4.0.x: supports Python 3.9+ (so 3.12 is supported). The Spark 4.0.0 release notes also include Python-3.12-related dependency work (e.g., grpcio upgrade “for Python 3.12”). [1][2]
- Spark / PySpark 4.1.x (latest docs): supports Python 3.10+ (so 3.12 is supported). [3]
- Spark / PySpark 3.5.x: supports Python 3.8+ (so 3.12 is included by the stated support range). [4]
Practical recommendation: if you specifically care about smooth Python 3.12 operation (dependency wheels, fewer edge cases), prefer Spark 4.0+ (or newer). [1][2]
Sources
[1] Spark 4.0.0 PySpark install docs (Python 3.9+) (spark.apache.org)
[2] Spark 4.0.0 release notes (grpcio upgrade for Python 3.12) (spark.apache.org)
[3] Spark “latest” (4.1.0) PySpark install docs (Python 3.10+) (spark.apache.org)
[4] Spark 3.5.3 PySpark install docs (Python 3.8+) (spark.apache.org)
Citations:
- 1: https://spark.apache.org/docs/4.0.0/api/python/getting_started/install.html
- 2: https://spark.apache.org/releases/spark-release-4-0-0.html?utm_source=openai
- 3: https://spark.apache.org/docs/latest/api/python/getting_started/install.html
- 4: https://spark.apache.org/docs/3.5.3/api/python/getting_started/install.html?utm_source=openai
🏁 Script executed:
find . -path "*/py-sdk/inference_logging_client/pyproject.toml" -type f | head -1 | xargs catRepository: Meesho/BharatMLStack
Length of output: 1893
Fix PySpark and Python version compatibility mismatch.
Python 3.12 is listed in the classifiers, but the minimum PySpark version (3.3.0) does not officially support it. PySpark 3.5.0+ adds Python 3.12 support.
Either:
- Bump the minimum PySpark version to
>=3.5.0if Python 3.12 support is required - Remove Python 3.12 from classifiers if maintaining PySpark 3.3.0 compatibility is more important
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/pyproject.toml` at line 31, The package
metadata lists Python 3.12 in classifiers but requires "pyspark>=3.3.0" which
doesn't officially support 3.12; update the dependency or classifiers to be
consistent: either change the pyspark requirement string in pyproject.toml from
"pyspark>=3.3.0" to "pyspark>=3.5.0" to enable Python 3.12 support, or remove
the Python 3.12 classifier entries from the package classifiers so they match
pyspark>=3.3.0 compatibility.
🔁 Pull Request Template – BharatMLStack
📌 Summary
📂 Modules Affected
horizon(Real-time systems / networking)online-feature-store(Feature serving infra)trufflebox-ui(Admin panel / UI)infra(Docker, CI/CD, GCP/AWS setup)docs(Documentation updates)___________✅ Type of Change
___________📊 Benchmark / Metrics (if applicable)
Summary by CodeRabbit
New Features
Utilities
Documentation
Chores