Skip to content

Enable 14 PPL datetime scalar functions on analytics-engine route#21582

Open
mengweieric wants to merge 1 commit intoopensearch-project:mainfrom
mengweieric:ae-wave-a-datetime
Open

Enable 14 PPL datetime scalar functions on analytics-engine route#21582
mengweieric wants to merge 1 commit intoopensearch-project:mainfrom
mengweieric:ae-wave-a-datetime

Conversation

@mengweieric
Copy link
Copy Markdown
Contributor

Description

Extends the PPL datetime function surface of analytics-backend-datafusion with a Wave A bundle of 14 scalar functions, routing them through the force-routable /_plugins/_ppl endpoint so they execute on DataFusion without a Calcite fallback.

Functions landed in this PR:

Category Functions
Constructors / casts date(expr), datetime(expr), makedate(year, doy), maketime(h, m, s), from_unixtime(sec)
Component extraction dayofweek / day_of_week, second / second_of_minute, extract(unit FROM …)
Wall-clock sysdate()
MySQL-format rendering / parsing strftime, date_format, time_format, str_to_date

Routing strategy

Each function picks the narrowest reliable implementation path:

  • Calcite built-ins reused where DataFusion already exposes matching semantics (date, datetime, dayofweek, second, sysdate). These go through name-mapping adapters that preserve PPL's declared return type via AbstractNameMappingAdapter.
  • Rust UDFs added for the MySQL-flavored behaviors with no 1:1 DataFusion built-in: strftime, extract (22-unit MySQL set incl. digit-concatenation composites), from_unixtime, maketime, makedate, date_format, time_format, str_to_date. The *_format / str_to_date family shares a single mysql_format token table to keep rendering/parsing in lock-step.
  • MySQL quirks preserved end-to-end: makedate doy overflow / year remapping, maketime rounding rules, strftime's abs(v) >= 1e11 ms auto-detect, date_format ordinal-day suffixes, time_format's date-token collapse rules.

Scope boundaries

  • Engine-side only. No PPL grammar, registry, or binding changes — the SQL plugin remains untouched.
  • 1-arg timestamp(expr) is excluded from Wave A and stays on legacy Calcite: the function's return type collides with an existing enum entry and is better handled in a follow-up.
  • Certain TIME-operand and Time64 return paths (e.g. hour(time(...)), end-to-end maketime surface assertions) remain blocked by a known substrait-java 0.89.1 ToTypeString gap on ParameterizedType.PrecisionTime. Rust-level unit tests cover the semantics; integration-layer coverage ships with the upstream substrait-java fix.

Files changed

  • Java adapters (sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/): new StrftimeFunctionAdapter, SecondAdapter, DayOfWeekAdapter, RustUdfDateTimeAdapters; extensions to DateTimeAdapters, DataFusionAnalyticsBackendPlugin (capability wiring), DataFusionFragmentConvertor (additional scalar signatures).
  • Substrait catalog: opensearch_scalar_functions.yaml extended with the new signatures.
  • Rust UDFs (sandbox/plugins/analytics-backend-datafusion/rust/src/udf/): strftime, extract, from_unixtime, maketime, makedate, date_format, time_format, str_to_date, mysql_format (shared token table).
  • SPI: ScalarFunction enum extended with Wave A entries.
  • Tests: StrftimeFunctionAdapterTests (adapter-level), new qa-module DateTimeScalarFunctionsIT, new cases in internal-cluster ScalarDateTimeFunctionIT, plus ~70 Rust unit tests across the new UDFs.

Verification

End-to-end verified against a force-routed runTask cluster on /_plugins/_ppl (-Dplugins.query.analytics.force_route=true). Routing confirmed via explain (viableBackends=[datafusion]) and cluster logs (ShardFragmentStageExecution state=CREATED). All 14 functions return correct values on parquet-formatted indices.

Pre-PR gates (all green on this branch):

  • ./gradlew -Dsandbox.enabled=true :sandbox:plugins:analytics-backend-datafusion:spotlessApply
  • cargo test --lib on each new UDF module: 70 passed / 0 failed
  • ./gradlew -Dsandbox.enabled=true :sandbox:plugins:analytics-backend-datafusion:check
  • ./gradlew check -p sandbox -Dsandbox.enabled=true (includes analytics-engine-rest qa integTest / integTestMemtable / integTestStreaming)

Check List

  • New functionality includes testing.
  • New functionality has been documented via adapter/UDF doc comments.
  • API changes companion pull request created, if applicable.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Reviewer Guide 🔍

(Review updated until commit ccf8920)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

In render_from_seconds, when nanos < 0, the code adjusts adj_seconds = seconds - 1 and adj_nanos = (nanos + 1_000_000_000) as u32. However, if seconds is already at the minimum boundary (-MAX_UNIX_SECONDS), subtracting 1 will push it below the valid range, yet the range check (-MAX_UNIX_SECONDS..=MAX_UNIX_SECONDS).contains(&seconds) happens before this adjustment. This means a value like (-MAX_UNIX_SECONDS, -1_nanos) would pass the range check but then produce adj_seconds = -MAX_UNIX_SECONDS - 1, which is out of range and could cause timestamp_opt to return None unexpectedly or produce incorrect results.

if !(-MAX_UNIX_SECONDS..=MAX_UNIX_SECONDS).contains(&seconds) {
    return None;
}
let fraction = seconds_value - seconds as f64;
let nanos = (fraction * 1_000_000_000.0) as i32;
// `Instant.ofEpochSecond(s, n)` normalizes negative n; chrono requires
// non-negative nanos paired with an adjusted seconds value.
let (adj_seconds, adj_nanos) = if nanos < 0 {
    (seconds - 1, (nanos + 1_000_000_000) as u32)
} else {
    (seconds, nanos as u32)
};
let dt = Utc.timestamp_opt(adj_seconds, adj_nanos).single()?;
Possible Issue

In week_number_sunday_first_01, if dt.year() - 1 underflows (e.g., when dt.year() is the minimum representable year), the call to chrono::NaiveDate::from_ymd_opt(dt.year() - 1, 12, 31) will fail and return None, causing an unwrap() panic. This can occur if a user passes a timestamp near the minimum date boundary and the week number calculation triggers the prior-year lookup.

fn week_number_sunday_first_01(dt: DateTime<Utc>) -> u32 {
    let wn = week_number_sunday_first(dt);
    if wn == 0 {
        let last = chrono::NaiveDate::from_ymd_opt(dt.year() - 1, 12, 31).unwrap();
        let last_dt = Utc.from_utc_datetime(&last.and_hms_opt(0, 0, 0).unwrap());
        week_number_sunday_first(last_dt)
Possible Issue

In days_since_epoch, the code casts year (an i64) to i32 via try_into(). If the year value is outside the i32 range (e.g., a very large positive or negative year after rounding), try_into() will fail and return None. However, the function does not explicitly document or test this boundary, so users passing extreme year values (e.g., year = 3_000_000_000.0) will silently get NULL without a clear indication that the year itself is out of range for the date calculation.

fn days_since_epoch(year: f64, day_of_year: f64) -> Option<i32> {
    if !year.is_finite() || !day_of_year.is_finite() {
        return None;
    }
    let year = year.round() as i64;
    let doy = day_of_year.round() as i64;
    if doy <= 0 || year < 0 {
        return None;
    }
    let year = if year == 0 { 2000 } else { year };
    let year: i32 = year.try_into().ok()?;
    let start = NaiveDate::from_yo_opt(year, 1)?;
    let date = start.checked_add_days(chrono::Days::new((doy - 1) as u64))?;
    Some(date.num_days_from_ce() - CE_TO_UNIX_EPOCH_DAYS)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Code Suggestions ✨

Latest suggestions up to ccf8920

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Prevent overflow in nanosecond adjustment

When nanos is negative, the adjustment logic assumes nanos >= -1_000_000_000, but if
fraction computation produces a value outside this range due to floating-point
precision issues, the cast to u32 may overflow. Add a bounds check or clamp nanos to
ensure it stays within valid range.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/strftime.rs [193-197]

 let (adj_seconds, adj_nanos) = if nanos < 0 {
-    (seconds - 1, (nanos + 1_000_000_000) as u32)
+    let adjusted_nanos = nanos + 1_000_000_000;
+    if adjusted_nanos < 0 || adjusted_nanos >= 1_000_000_000 {
+        return None;
+    }
+    (seconds - 1, adjusted_nanos as u32)
 } else {
     (seconds, nanos as u32)
 };
Suggestion importance[1-10]: 6

__

Why: The suggestion addresses a potential edge case where nanos could be outside the expected range due to floating-point precision issues. The current code computes nanos = (fraction * 1_000_000_000.0) as i32 where fraction is in [0, 1) or negative. Adding bounds validation before the cast to u32 would prevent potential overflow, though the likelihood is low given the controlled input range. This is a valid defensive programming improvement.

Low
Validate operand type before transformation

The adapter does not validate that the operand is a valid date/timestamp type before
calling date_part. If the operand is a string or other incompatible type, the
generated expression may fail at runtime. Add type validation or rely on upstream
coercion to ensure the operand is temporal.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DayOfWeekAdapter.java [42-44]

-RexNode datePart = rexBuilder.makeCall(SqlLibraryOperators.DATE_PART, partLiteral, original.getOperands().get(0));
+RexNode operand = original.getOperands().get(0);
+if (!SqlTypeName.DATETIME_TYPES.contains(operand.getType().getSqlTypeName())) {
+    return original;
+}
+RexNode datePart = rexBuilder.makeCall(SqlLibraryOperators.DATE_PART, partLiteral, operand);
 RexNode sum = rexBuilder.makeCall(SqlStdOperatorTable.PLUS, datePart, rexBuilder.makeExactLiteral(BigDecimal.ONE));
 return rexBuilder.makeCast(original.getType(), sum);
Suggestion importance[1-10]: 5

__

Why: The suggestion to validate operand types before transformation is reasonable for defensive programming. However, the adapter already checks arity (original.getOperands().size() != 1) and returns the original call if invalid. The upstream planner and coercion logic should ensure temporal types reach this adapter. Adding explicit type validation adds safety but may be redundant given the existing architecture.

Low
Validate temporal operand before transformation

The adapter assumes date_part returns a type that can be cast to DOUBLE, but does
not verify the operand is a valid temporal type. If the operand is non-temporal, the
expression may fail at runtime. Add operand type validation before transformation.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/SecondAdapter.java [42-47]

-RexNode datePart = rexBuilder.makeCall(SqlLibraryOperators.DATE_PART, partLiteral, original.getOperands().get(0));
+RexNode operand = original.getOperands().get(0);
+if (!SqlTypeName.DATETIME_TYPES.contains(operand.getType().getSqlTypeName())) {
+    return original;
+}
+RexNode datePart = rexBuilder.makeCall(SqlLibraryOperators.DATE_PART, partLiteral, operand);
 RelDataType doubleType = cluster.getTypeFactory()
     .createTypeWithNullability(cluster.getTypeFactory().createSqlType(SqlTypeName.DOUBLE), datePart.getType().isNullable());
 RexNode datePartDouble = rexBuilder.makeCast(doubleType, datePart);
 RexNode floored = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR, datePartDouble);
 return rexBuilder.makeCast(original.getType(), floored);
Suggestion importance[1-10]: 5

__

Why: Similar to the first suggestion, this proposes adding operand type validation before calling date_part. The adapter already validates arity and returns the original call if invalid. While adding type checks improves robustness, the upstream coercion and planning layers should ensure only temporal types reach this adapter, making this validation potentially redundant.

Low

Previous suggestions

Suggestions up to commit 7caf76c
CategorySuggestion                                                                                                                                    Impact
General
Prevent overflow in nanosecond calculation

The cast (fraction * 1_000_000_000.0) as i32 can overflow if fraction is near 1.0
due to floating-point precision errors, producing undefined behavior. Clamp the
result to [0, 999_999_999] before casting to ensure safety.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/strftime.rs [186-190]

 let seconds = seconds_value.trunc() as i64;
 if !(-MAX_UNIX_SECONDS..=MAX_UNIX_SECONDS).contains(&seconds) {
     return None;
 }
 let fraction = seconds_value - seconds as f64;
-let nanos = (fraction * 1_000_000_000.0) as i32;
+let nanos = ((fraction * 1_000_000_000.0).clamp(0.0, 999_999_999.0)) as i32;
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a potential edge case where floating-point precision errors could cause fraction * 1_000_000_000.0 to slightly exceed 999_999_999, though this is unlikely in practice. The proposed clamp adds defensive programming to ensure the value stays within valid bounds before casting to i32. However, the impact is minor since the upstream code already validates the input range, making overflow extremely unlikely.

Low
Suggestions up to commit 8096d30
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential out-of-bounds access in parser

The loop condition if pos > input_bytes.len() allows pos == input_bytes.len() to
proceed, which can cause out-of-bounds access in subsequent input_bytes[pos] reads
when the input ends exactly at a token boundary. Change the check to if pos >=
input_bytes.len() or ensure all downstream accesses validate pos < input_bytes.len()
before indexing.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/mysql_format.rs [535-549]

-pub(crate) fn parse_mysql_format(input: &str, format: &str) -> Option<Parsed> {
-    let tokens = tokenize(format);
-    let input_bytes = input.as_bytes();
-    let mut pos = 0;
-    let mut f = Parsed::default();
-    for tok in tokens {
-        if pos > input_bytes.len() {
-            return None;
-        }
-        ...
+for tok in tokens {
+    if pos >= input_bytes.len() && !matches!(tok, Token::Literal(c) if c.is_whitespace()) {
+        return None;
     }
-    Some(f)
+    ...
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion identifies a real bug: the condition if pos > input_bytes.len() allows pos == input_bytes.len(), which can cause out-of-bounds access in subsequent input_bytes[pos] reads. The improved code correctly checks pos >= input_bytes.len() before proceeding, preventing potential panics.

Medium
Prevent undefined behavior in float-to-int cast

The cast year.round() as i64 can produce undefined behavior when year is outside the
representable range of i64 (e.g., year = 1e100). Use year.round().clamp(i64::MIN as
f64, i64::MAX as f64) as i64 or validate the range before casting to prevent UB.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/makedate.rs [148-157]

-fn days_since_epoch(year: f64, day_of_year: f64) -> Option<i32> {
-    if !year.is_finite() || !day_of_year.is_finite() {
-        return None;
-    }
-    let year = year.round() as i64;
-    let doy = day_of_year.round() as i64;
-    if doy <= 0 || year < 0 {
-        return None;
-    }
-    let year = if year == 0 { 2000 } else { year };
-    let year: i32 = year.try_into().ok()?;
-    ...
+let year = year.round();
+if year < i64::MIN as f64 || year > i64::MAX as f64 {
+    return None;
 }
+let year = year as i64;
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that casting an out-of-range f64 to i64 is undefined behavior in Rust. Adding a range check before the cast prevents UB and ensures the function returns None for extreme inputs like 1e100.

Medium
Fix nanos normalization for edge cases

The negative nanos normalization assumes nanos is in the range [-1_000_000_000, 0),
but the calculation (fraction * 1_000_000_000.0) as i32 can produce values outside
this range for extreme fractional inputs. Add a bounds check or use rem_euclid to
ensure adj_nanos stays within [0, 1_000_000_000).

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/strftime.rs [286-291]

-let (adj_seconds, adj_nanos) = if nanos < 0 {
-    (seconds - 1, (nanos + 1_000_000_000) as u32)
-} else {
-    (seconds, nanos as u32)
-};
+let adj_seconds = seconds + (nanos / 1_000_000_000);
+let adj_nanos = nanos.rem_euclid(1_000_000_000) as u32;
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the normalization logic assumes nanos is in [-1_000_000_000, 0), but the calculation can produce values outside this range for extreme fractional inputs. Using rem_euclid is a more robust approach that handles all cases correctly.

Medium
Validate i64 range before casting

The multiplication seconds * 1_000_000.0 followed by truncation and cast to i64 can
overflow when seconds approaches i64::MAX / 1_000_000. Even though
MAX_UNIX_SECONDS_EXCLUSIVE provides an upper bound, verify that the result fits
within i64 range before casting to prevent undefined behavior.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/from_unixtime.rs [139-147]

 fn to_micros(seconds: f64) -> Option<i64> {
     if !seconds.is_finite() || !(0.0..MAX_UNIX_SECONDS_EXCLUSIVE).contains(&seconds) {
         return None;
     }
-    Some((seconds * 1_000_000.0).trunc() as i64)
+    let micros_f64 = (seconds * 1_000_000.0).trunc();
+    if micros_f64 < i64::MIN as f64 || micros_f64 > i64::MAX as f64 {
+        return None;
+    }
+    Some(micros_f64 as i64)
 }
Suggestion importance[1-10]: 3

__

Why: The overflow concern is theoretically valid but practically mitigated by MAX_UNIX_SECONDS_EXCLUSIVE (32,536,771,200.0), which when multiplied by 1,000,000 gives ~3.25e16, well below i64::MAX (~9.22e18). The additional check adds defensive programming but has minimal impact given the existing bounds.

Low
Prevent potential overflow in nanosecond conversion

The conversion from microseconds remainder to nanoseconds may overflow when
micros_rem is at its maximum value (999,999). Multiplying by 1,000 could produce a
value exceeding u32::MAX in edge cases. Validate the range or use checked arithmetic
to prevent potential panics.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/date_format.rs [181-186]

 fn render_at(micros: i64, format: &str, mode: FormatMode) -> Option<String> {
     let seconds = micros.div_euclid(1_000_000);
     let micros_rem = micros.rem_euclid(1_000_000) as u32;
-    let dt = Utc.timestamp_opt(seconds, micros_rem * 1_000).single()?;
+    let nanos = micros_rem.checked_mul(1_000)?;
+    let dt = Utc.timestamp_opt(seconds, nanos).single()?;
     format_datetime(dt, format, mode)
 }
Suggestion importance[1-10]: 2

__

Why: The concern about overflow is incorrect. micros_rem is guaranteed to be in range [0, 999_999] by rem_euclid(1_000_000), so multiplying by 1,000 yields at most 999,999,000, which is well within u32::MAX (4,294,967,295). The suggestion adds unnecessary complexity.

Low
General
Guard against exponent overflow in concat

The function uses checked_pow which returns None on overflow, but u32 exponents up
to 19 can overflow i64 (e.g., 10^19 exceeds i64::MAX). For composite units like
DAY_MICROSECOND, the total width is 14 digits, which is safe, but the function
should document this assumption or add an explicit width validation to prevent
silent failures on unexpected inputs.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/mysql_format.rs [293-303]

 fn concat(parts: &[(i64, u32)]) -> Option<i64> {
     let mut acc: i64 = 0;
     for &(v, w) in parts {
-        if v < 0 {
+        if v < 0 || w > 18 {
             return None;
         }
         let pow = 10_i64.checked_pow(w)?;
         acc = acc.checked_mul(pow)?.checked_add(v)?;
     }
     Some(acc)
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion adds a defensive check for exponent width (w > 18), which prevents potential overflow issues. While the current usage is safe (max width is 14), this guard improves robustness against future changes or unexpected inputs.

Low

@mengweieric mengweieric changed the title Wave A: enable 14 PPL datetime scalar functions on analytics-engine route Enable 14 PPL datetime scalar functions on analytics-engine route May 9, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

❌ Gradle check result for 8096d30: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mengweieric mengweieric force-pushed the ae-wave-a-datetime branch from 8096d30 to 7caf76c Compare May 10, 2026 03:01
@mengweieric mengweieric marked this pull request as ready for review May 10, 2026 03:01
@mengweieric mengweieric requested a review from a team as a code owner May 10, 2026 03:01
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7caf76c

…oute

Extends the PPL datetime surface of analytics-backend-datafusion with a
Wave A bundle of 14 functions: strftime, dayofweek / day_of_week,
second / second_of_minute, date, datetime, sysdate, extract,
from_unixtime (1-arg), maketime, makedate, date_format, time_format,
str_to_date. Each function is routed through
DataFusionAnalyticsBackendPlugin's scalar capabilities and wired end-to-end
to Substrait so that force-routed queries on /_plugins/_ppl execute on
DataFusion without any Calcite fallback.

Routing strategy per function:
 - Calcite builtins reused (date, datetime, dayofweek, second, sysdate)
   via name-mapping adapters that preserve PPL's declared return type.
 - Rust UDFs added for the MySQL-flavored behaviors that have no 1:1
   DataFusion builtin (strftime, extract, from_unixtime, maketime,
   makedate, date_format, time_format, str_to_date) with a shared
   mysql_format token table underpinning the *_format family.

End-to-end verification: all 14 functions pass against a force-routed
runTask cluster on /_plugins/_ppl, confirmed via explain
(viableBackends=[datafusion]) and ShardFragmentStageExecution traces.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>
@mengweieric mengweieric force-pushed the ae-wave-a-datetime branch from 7caf76c to ccf8920 Compare May 10, 2026 03:12
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ccf8920

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for ccf8920: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 10, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.44%. Comparing base (36809cc) to head (ccf8920).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21582      +/-   ##
============================================
- Coverage     73.50%   73.44%   -0.06%     
+ Complexity    74644    74618      -26     
============================================
  Files          5980     5980              
  Lines        338777   338777              
  Branches      48848    48848              
============================================
- Hits         249011   248818     -193     
- Misses        69946    70158     +212     
+ Partials      19820    19801      -19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant