Skip to content

Comments

Bottlenecks#277

Open
dhruvgupta-meesho wants to merge 9 commits intodevelopfrom
bottlenecks
Open

Bottlenecks#277
dhruvgupta-meesho wants to merge 9 commits intodevelopfrom
bottlenecks

Conversation

@dhruvgupta-meesho
Copy link
Contributor

@dhruvgupta-meesho dhruvgupta-meesho commented Feb 9, 2026

🔁 Pull Request Template – BharatMLStack

Please fill out the following sections to help us review your changes efficiently.


📌 Summary

e.g., Adds optimizes Redis fetch latency in online-feature-store, or improves search UI responsiveness in trufflebox-ui.


📂 Modules Affected

  • horizon (Real-time systems / networking)
  • online-feature-store (Feature serving infra)
  • trufflebox-ui (Admin panel / UI)
  • infra (Docker, CI/CD, GCP/AWS setup)
  • docs (Documentation updates)
  • Other: ___________

✅ Type of Change

  • Feature addition
  • Bug fix
  • Infra / build system change
  • Performance improvement
  • Refactor
  • Documentation
  • Other: ___________

📊 Benchmark / Metrics (if applicable)

Summary by CodeRabbit

  • New Features

    • Spark-based distributed decoding and tuning (partitions, batch sizing)
    • Arrow and Parquet single-entity feature decoding
    • CLI: Spark master, input format flags, output modes (JSON/CSV/stream) and summary preview
  • Utilities

    • Spark-aware float formatting and memory-safe output workflows
    • Cache clearing utility exposed
  • Documentation

    • Complete README overhaul with examples and API reference
  • Chores

    • Core dependency and version updated for PySpark integration

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

Walkthrough

Migrates decoding from Pandas to Spark: core APIs now accept a SparkSession and return Spark DataFrames, decode_mplog_dataframe uses distributed processing (mapInPandas) with per-row schema lookup and metadata extraction; CLI, utils, types, formats, and docs updated accordingly.

Changes

Cohort / File(s) Summary
Core API Refactoring
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Replaced Pandas paths with Spark: decode_mplog requires SparkSession and returns SparkDataFrame; decode_mplog_dataframe accepts SparkDataFrame/spark and uses mapInPandas with num_partitions/max_records_per_batch; added _extract_metadata_byte, TYPE_CHECKING Spark types, and exported Spark-based helpers.
CLI Integration
py-sdk/inference_logging_client/inference_logging_client/cli.py
CLI now creates/configures SparkSession (--spark-master), supports --no-decompress, --json, --output, --format, uses Spark decoding flow, writes outputs via Spark (single-partition files or streamed JSON), and prints a lazy metadata summary.
Type & Schema Management
py-sdk/inference_logging_client/inference_logging_client/types.py, .../io.py
Added fields to DecodedMPLog (entities, parent_entity, metadata_byte, compression_enabled, version, format_type); exposed clear_schema_cache() and centralized metadata/schema parsing refinements.
Feature Decoding Helpers
py-sdk/inference_logging_client/inference_logging_client/formats.py
Added decode_arrow_features and decode_parquet_features to decode single-entity Arrow IPC and Parquet payloads into feature name→value dicts (use existing decode_feature_value).
Utilities & Float Formatting
py-sdk/inference_logging_client/inference_logging_client/utils.py
format_dataframe_floats refactored to operate on Spark DataFrames using Spark round() for float/double columns; type helpers preserved.
Decoder & Exceptions Cleanup
py-sdk/inference_logging_client/inference_logging_client/decoder.py, .../exceptions.py
Cosmetic changes: quote/style/whitespace normalization in decoder; explicit pass added to exception classes—no behavioral changes.
Dependencies & Documentation
py-sdk/inference_logging_client/pyproject.toml, .../readme.md
Project version bumped to 0.2.1; replaced pandas dependency with pyspark>=3.3.0; README expanded into a full user/developer guide and examples.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Spark as SparkSession
    participant Exec as Executor (per-row mapInPandas)
    participant Cache as Schema Cache
    participant Output as SparkDataFrame

    Client->>Spark: call decode_mplog_dataframe(df, spark, ...)
    activate Spark
    Spark->>Spark: repartition (optional)
    Spark->>Exec: mapInPandas(decode_batch)
    activate Exec
    Exec->>Exec: iterate rows in Pandas batch
    loop per row
        Exec->>Cache: get_feature_schema(mp_config_id, version)
        Cache-->>Exec: schema (cached or fetched)
        Exec->>Exec: extract metadata (_extract_metadata_byte)
        Exec->>Exec: decode features (proto/arrow/parquet)
        Exec->>Exec: format floats
        Exec-->>Output: yield decoded dict row
    end
    deactivate Exec
    Spark->>Output: assemble rows into SparkDataFrame
    deactivate Spark
    Output-->>Client: return decoded SparkDataFrame
Loading

No actionable comments were generated in the recent review. 🎉


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
py-sdk/inference_logging_client/inference_logging_client/types.py (1)

44-49: ⚠️ Potential issue | 🟠 Major

Python 3.8 compatibility issue with type hints.

The list[str] syntax for type hints is only available at runtime starting from Python 3.9. Since pyproject.toml declares requires-python = ">=3.8", this will cause a TypeError on Python 3.8 when the module is imported.

🐛 Proposed fix for Python 3.8 compatibility

Add the annotations import at the top of the file:

 """Type definitions for inference-logging-client."""

+from __future__ import annotations
 from enum import Enum
 from dataclasses import dataclass, field

Alternatively, use typing.List:

+from typing import List
 from dataclasses import dataclass, field

 # ... later in DecodedMPLog ...
-    entities: list[str] = field(default_factory=list)
-    parent_entity: list[str] = field(default_factory=list)
+    entities: List[str] = field(default_factory=list)
+    parent_entity: List[str] = field(default_factory=list)
py-sdk/inference_logging_client/inference_logging_client/utils.py (1)

178-190: ⚠️ Potential issue | 🟠 Major

Python 3.8 compatibility issue with return type annotation.

The tuple[bool, int, int] return type syntax requires Python 3.9+. This will fail on Python 3.8 which is listed as a supported version.

🐛 Proposed fix

Add at the top of the file:

 """Utility functions for type normalization and formatting."""

+from __future__ import annotations
 from typing import Optional

Or use typing.Tuple:

-from typing import Optional
+from typing import Optional, Tuple

-def unpack_metadata_byte(metadata_byte: int) -> tuple[bool, int, int]:
+def unpack_metadata_byte(metadata_byte: int) -> Tuple[bool, int, int]:
py-sdk/inference_logging_client/inference_logging_client/io.py (1)

22-24: ⚠️ Potential issue | 🟠 Major

Python 3.8 compatibility issue with generic type annotations.

The OrderedDict[tuple[str, int], list[FeatureInfo]] syntax requires Python 3.9+. This will cause a TypeError on Python 3.8.

🐛 Proposed fix

Add at the top of the file:

 """I/O operations for schema fetching and protobuf parsing."""

+from __future__ import annotations
 import os

Or use typing generics:

-from collections import OrderedDict
+from collections import OrderedDict
+from typing import Dict, List, Tuple

-_schema_cache: OrderedDict[tuple[str, int], list[FeatureInfo]] = OrderedDict()
+_schema_cache: "OrderedDict[Tuple[str, int], List[FeatureInfo]]" = OrderedDict()
🤖 Fix all issues with AI agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 306-309: The empty-DataFrame branch currently returns
spark.createDataFrame([], StructType([])) which drops the expected output
schema; in the df.limit(1).count() == 0 block replace the empty StructType with
the actual output schema used by the transformation (e.g., reuse the computed
output_schema or derive it by applying the same schema-mapping logic used for
non-empty inputs) and return spark.createDataFrame([], output_schema) so the
returned DataFrame preserves entity_id, metadata and feature columns; update the
code around df.limit(1).count() to obtain or reference output_schema before
creating the empty DataFrame.

In `@py-sdk/inference_logging_client/inference_logging_client/cli.py`:
- Around line 171-176: The comment above the df.count() call is inaccurate
because the code performs a full count; either update the comment to reflect
that df.count() is executed, or change the implementation to the advertised
optimization by replacing row_count = df.count() with a cheap emptiness check
such as row_count = df.limit(1).count() (or similar) and only run a full count
when necessary; update the printed message using row_count and retain the
existing exception handling around the count to preserve current behavior.

In `@py-sdk/inference_logging_client/pyproject.toml`:
- Line 31: The package metadata lists Python 3.12 in classifiers but requires
"pyspark>=3.3.0" which doesn't officially support 3.12; update the dependency or
classifiers to be consistent: either change the pyspark requirement string in
pyproject.toml from "pyspark>=3.3.0" to "pyspark>=3.5.0" to enable Python 3.12
support, or remove the Python 3.12 classifier entries from the package
classifiers so they match pyspark>=3.3.0 compatibility.
🧹 Nitpick comments (2)
py-sdk/inference_logging_client/readme.md (1)

628-638: Add language specifiers to diagram code blocks.

Several code blocks containing ASCII art diagrams and text layouts are missing language specifiers, which triggers markdownlint warnings. Consider adding text or plaintext as the language identifier.

📝 Example fix for diagram blocks
-```
+```text
 ┌─────────────────────────────────────────────────────────────┐
 │ Byte 0: Generated Flag (1 = no generated values)           │

This applies to multiple diagram blocks throughout the document (lines 628, 647, 666, 682, 707, 818, 984, 999, 1058, 1074, 1084).

py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

488-489: High default partition count may be excessive for smaller DataFrames.

The default of 10,000 partitions is optimized for large rows (3-5 MB each per documentation), but could introduce unnecessary overhead for smaller DataFrames. Consider documenting this trade-off more prominently or adjusting the default based on estimated data size.

Comment on lines +306 to +309
# Check if DataFrame is empty (avoid full count: use limit(1))
if df.limit(1).count() == 0:
from pyspark.sql.types import StructType
return spark.createDataFrame([], StructType([]))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Empty DataFrame returns empty schema instead of expected output schema.

When the input DataFrame is empty, this returns a DataFrame with an empty StructType(), losing the expected output schema (entity_id + metadata columns + feature columns). This may cause issues for downstream operations that depend on a consistent schema.

Suggested fix to preserve schema
     # Check if DataFrame is empty (avoid full count: use limit(1))
     if df.limit(1).count() == 0:
-        from pyspark.sql.types import StructType
-        return spark.createDataFrame([], StructType([]))
+        from pyspark.sql.types import StringType, StructField, StructType
+        # Return empty DataFrame with at least entity_id column
+        fields = [StructField("entity_id", StringType(), True)]
+        return spark.createDataFrame([], StructType(fields))
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 306 - 309, The empty-DataFrame branch currently returns
spark.createDataFrame([], StructType([])) which drops the expected output
schema; in the df.limit(1).count() == 0 block replace the empty StructType with
the actual output schema used by the transformation (e.g., reuse the computed
output_schema or derive it by applying the same schema-mapping logic used for
non-empty inputs) and return spark.createDataFrame([], output_schema) so the
returned DataFrame preserves entity_id, metadata and feature columns; update the
code around df.limit(1).count() to obtain or reference output_schema before
creating the empty DataFrame.

Comment on lines +171 to +176
# Avoid full count() for huge DataFrames: use limit(1).count() for empty check only
try:
row_count = df.count()
print(f"Rows: {row_count}", file=sys.stderr)
except Exception:
print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading comment contradicts actual behavior.

The comment states "Avoid full count() for huge DataFrames" but the code immediately performs df.count(), which is a full count operation. Consider updating the comment to accurately reflect the behavior, or implementing the optimization mentioned.

Suggested comment fix
-        # Avoid full count() for huge DataFrames: use limit(1).count() for empty check only
+        # Note: count() can be slow for large DataFrames; errors are caught gracefully
         try:
             row_count = df.count()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Avoid full count() for huge DataFrames: use limit(1).count() for empty check only
try:
row_count = df.count()
print(f"Rows: {row_count}", file=sys.stderr)
except Exception:
print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr)
# Note: count() can be slow for large DataFrames; errors are caught gracefully
try:
row_count = df.count()
print(f"Rows: {row_count}", file=sys.stderr)
except Exception:
print("Rows: (count skipped - use --output to write without summary)", file=sys.stderr)
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/inference_logging_client/cli.py` around lines
171 - 176, The comment above the df.count() call is inaccurate because the code
performs a full count; either update the comment to reflect that df.count() is
executed, or change the implementation to the advertised optimization by
replacing row_count = df.count() with a cheap emptiness check such as row_count
= df.limit(1).count() (or similar) and only run a full count when necessary;
update the printed message using row_count and retain the existing exception
handling around the count to preserve current behavior.


dependencies = [
"pandas>=1.3.0",
"pyspark>=3.3.0",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

PySpark Python 3.12 support version compatibility

💡 Result:

For Python 3.12, these are the key PySpark / Spark compatibility points:

  • Spark / PySpark 4.0.x: supports Python 3.9+ (so 3.12 is supported). The Spark 4.0.0 release notes also include Python-3.12-related dependency work (e.g., grpcio upgrade “for Python 3.12”). [1][2]
  • Spark / PySpark 4.1.x (latest docs): supports Python 3.10+ (so 3.12 is supported). [3]
  • Spark / PySpark 3.5.x: supports Python 3.8+ (so 3.12 is included by the stated support range). [4]

Practical recommendation: if you specifically care about smooth Python 3.12 operation (dependency wheels, fewer edge cases), prefer Spark 4.0+ (or newer). [1][2]

Sources
[1] Spark 4.0.0 PySpark install docs (Python 3.9+) (spark.apache.org)
[2] Spark 4.0.0 release notes (grpcio upgrade for Python 3.12) (spark.apache.org)
[3] Spark “latest” (4.1.0) PySpark install docs (Python 3.10+) (spark.apache.org)
[4] Spark 3.5.3 PySpark install docs (Python 3.8+) (spark.apache.org)

Citations:


🏁 Script executed:

find . -path "*/py-sdk/inference_logging_client/pyproject.toml" -type f | head -1 | xargs cat

Repository: Meesho/BharatMLStack

Length of output: 1893


Fix PySpark and Python version compatibility mismatch.

Python 3.12 is listed in the classifiers, but the minimum PySpark version (3.3.0) does not officially support it. PySpark 3.5.0+ adds Python 3.12 support.

Either:

  1. Bump the minimum PySpark version to >=3.5.0 if Python 3.12 support is required
  2. Remove Python 3.12 from classifiers if maintaining PySpark 3.3.0 compatibility is more important
🤖 Prompt for AI Agents
In `@py-sdk/inference_logging_client/pyproject.toml` at line 31, The package
metadata lists Python 3.12 in classifiers but requires "pyspark>=3.3.0" which
doesn't officially support 3.12; update the dependency or classifiers to be
consistent: either change the pyspark requirement string in pyproject.toml from
"pyspark>=3.3.0" to "pyspark>=3.5.0" to enable Python 3.12 support, or remove
the Python 3.12 classifier entries from the package classifiers so they match
pyspark>=3.3.0 compatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant