Skip to content

Comments

Pyspark base64#274

Open
dhruvgupta-meesho wants to merge 4 commits intodevelopfrom
pyspark-base64
Open

Pyspark base64#274
dhruvgupta-meesho wants to merge 4 commits intodevelopfrom
pyspark-base64

Conversation

@dhruvgupta-meesho
Copy link
Contributor

@dhruvgupta-meesho dhruvgupta-meesho commented Feb 4, 2026

🔁 Pull Request Template – BharatMLStack

Please fill out the following sections to help us review your changes efficiently.


📌 Summary

e.g., Adds optimizes Redis fetch latency in online-feature-store, or improves search UI responsiveness in trufflebox-ui.


📂 Modules Affected

  • horizon (Real-time systems / networking)
  • online-feature-store (Feature serving infra)
  • trufflebox-ui (Admin panel / UI)
  • infra (Docker, CI/CD, GCP/AWS setup)
  • docs (Documentation updates)
  • Other: ___________

✅ Type of Change

  • Feature addition
  • Bug fix
  • Infra / build system change
  • Performance improvement
  • Refactor
  • Documentation
  • Other: ___________

📊 Benchmark / Metrics (if applicable)

Summary by CodeRabbit

Release Notes

  • New Features

    • Replaced pandas DataFrames with Apache Spark DataFrames for improved scalability and performance
    • Enhanced CLI with new arguments for encoding options (hex, base64), output formats (JSON, CSV), and Spark configuration
    • Added schema cache clearing functionality and expanded metadata fields in decoded results
    • Increased public API exports for better programmatic access
  • Refactor

    • Rewrote CLI processing pipeline for Spark-based data handling
    • Enhanced format decoders for Arrow and Parquet support
  • Documentation

    • Comprehensive README with detailed guides, examples, and troubleshooting sections
  • Chores

    • Updated core dependency from pandas to Apache Spark (v0.2.0)

@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • 🔍 Trigger a full review
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pyspark-base64

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
py-sdk/inference_logging_client/inference_logging_client/cli.py (1)

77-82: ⚠️ Potential issue | 🟡 Minor

Enable strict base64 validation to detect malformed input.

base64.b64decode without the validate=True flag silently ignores invalid characters instead of raising an error. The current error handler cannot catch this silent failure. Normalize whitespace and enable validation to provide clear feedback to users:

🛠️ Suggested change
-            data = base64.b64decode(data)
+            normalized = b"".join(data.split())
+            data = base64.b64decode(normalized, validate=True)
🤖 Fix all issues with AI agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 350-363: The metadata preservation and column-ordering arrays use
the hard-coded string "mp_config_id" causing custom mp_config_id_column values
to be dropped; update the code to use the mp_config_id_column parameter instead
of the literal by replacing occurrences of "mp_config_id" in the
row_metadata_columns list and any column-ordering lists/operations (and the
metadata extraction logic that references that key) with the mp_config_id_column
variable (ensure mp_config_id_column has a sensible default like "mp_config_id"
if None so behavior is unchanged). Use the existing symbols
row_metadata_columns, mp_config_id_column, and the functions/blocks that perform
metadata extraction and column reordering to locate and change the hard-coded
string to the parameter.
- Around line 287-319: The _extract_metadata_byte function must be extended to
handle raw binary types and integer-list metadata: update _extract_metadata_byte
to (1) check for bytes/bytearray/memoryview at the top-level and return the
first byte if present, (2) when handling list formats in the parsed and
already-parsed branches, treat a first_item that is an int (0-255) as the
metadata byte and return it, and (3) treat list first_item values that are
bytes-like (bytes/bytearray/memoryview) by decoding or taking the first byte
similarly; ensure existing JSON/string parsing still occurs and keep all
existing exception handling.

In `@py-sdk/inference_logging_client/inference_logging_client/cli.py`:
- Around line 53-55: Update the misleading help and success text to reflect that
Spark writes directories of part-* files and that JSON output is supported:
change the parser.add_argument("--output", "-o", help=...) help to say "Output
directory (Spark creates part-* files; CSV by default or JSON with --json)" and
update the script's success message (the message printed after writing,
referenced near the write/finish logic) to state "Wrote output to directory
'<output>' as CSV" or "as JSON" depending on the --json flag, mentioning the
directory contains Spark part-* files rather than a single file.

In `@py-sdk/inference_logging_client/inference_logging_client/io.py`:
- Around line 48-54: The User-Agent string in the HTTP request construction
(where req = urllib.request.Request(...)) is hard-coded to
"inference-logging-client/0.1.0" and must be updated to the package's current
version; change that header to "inference-logging-client/0.2.0" (or derive it
from the package version constant if one exists) in the Request headers so the
User-Agent matches the package release.
- Around line 31-77: The _fetch_schema_with_retry function can raise TypeError
when max_retries <= 0 because the loop never runs and last_exception remains
None; add an upfront guard in _fetch_schema_with_retry to validate max_retries
(e.g., if max_retries < 1) and either set it to a sane default or raise a
SchemaFetchError/ValueError with a clear message, and as an extra safety ensure
that before the final raise you raise a concrete SchemaFetchError (not None)
with a descriptive message if last_exception is still None so callers of
_fetch_schema_with_retry never get a TypeError.

In `@py-sdk/inference_logging_client/readme.md`:
- Around line 628-639: The fenced code block showing the ASCII table uses triple
backticks with no language (MD040); update that block to include an appropriate
language tag such as text (i.e., change ``` to ```text) so markdownlint passes
and readability improves—search for the ASCII box starting with
"┌─────────────────────────────────────────────────────────────┐" and update its
opening fence; apply the same pattern to any other unlabeled fenced blocks (use
bash for shell snippets, python for code examples).
🧹 Nitpick comments (2)
py-sdk/inference_logging_client/inference_logging_client/formats.py (1)

31-35: Close Arrow stream readers after reading the table.

pa.ipc.open_stream returns a reader that should be closed to avoid resource leakage in long-lived processes. A context manager (or explicit close) makes this safer.

♻️ Suggested change
-    try:
-        reader = pa.ipc.open_stream(io.BytesIO(encoded_bytes))
-        table = reader.read_all()
-    except Exception as e:
+    try:
+        with pa.ipc.open_stream(io.BytesIO(encoded_bytes)) as reader:
+            table = reader.read_all()
+    except Exception as e:
         raise FormatError(f"Failed to read Arrow IPC data: {e}")
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

266-280: Avoid an extra Spark action before collect.

df.count() triggers a full job and df.collect() immediately triggers another. You can collect once and check emptiness.

♻️ Suggested change
-    if df.count() == 0:
-        from pyspark.sql.types import StructType
-        return spark.createDataFrame([], StructType([]))
-
     # Collect to driver for processing
     # Note: For large datasets, consider using mapInPandas or processing in partitions
     rows = df.collect()
+    if not rows:
+        from pyspark.sql.types import StructType
+        return spark.createDataFrame([], StructType([]))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant