Skip to content

Bug in lambda variable resolution #23068

@fbx31

Description

@fbx31

Describe the bug

The first parameter of array_filter and array_transform seems to not be correctly resolved if it is a lamba variable, instead, receive Null.

To Reproduce

    use datafusion::logical_expr::{col, lit, lambda, lambda_var};
    use  datafusion::functions::expr_fn::*;
    use  datafusion::functions_nested::expr_fn::*;
    use datafusion::functions_aggregate::expr_fn::*;
    use futures::TryStreamExt;

    let mut df = dataframe!(
        "id" => vec![1, 2],
        "name" => vec!["Alice", "Bob"],
    )?;

    let expr1 = array_transform(
        col("grouped"),
        lambda(["x"],
        concat_ws(
            lit("+"), 
            vec![
                    get_field(lambda_var("x"), "name"),
                    get_field(lambda_var("x"), "l"),
                ]
            )
        )
    );
    let expr2 = array_transform(
        col("grouped"),
        lambda(["x"],
        concat_ws(
            lit("+"), 
            vec![
                    get_field(lambda_var("x"), "name"),
                    array_filter(
                        get_field(lambda_var("x"), "l"),
                        lambda(["y"], lambda_var("y").gt_eq(lit(1)))
                    ), 
                ]
            )
        )
    );

    df = df
        .with_column("l", make_array(vec![lit("0"), lit("1"), lit("2")]))?
        .aggregate(
                vec![lit("true")],
                vec![array_agg(named_struct(vec![lit("id"), col("id"), lit("name"), col("name"), lit("l"), col("l")])).alias("grouped")],
            )?
        .with_column("test", expr1)?
        // .with_column("test2", expr2)?
        ;

    let plan = df.into_unoptimized_plan().resolve_lambda_variables()?.data;
    let session =  datafusion::execution::context::SessionContext::new();
    let exec = session.state().create_physical_plan(&plan).await?;
    let context = session.task_ctx();
    let data: Vec<RecordBatch> = datafusion::physical_plan::execute_stream(exec, context)?.try_collect::<Vec<_>>().await?;
    let format = datafusion::config::FormatOptions { types_info:true, ..Default::default() };
    println!("{}", datafusion::arrow::util::pretty::pretty_format_batches_with_options(&data, &(&format).try_into()?)?);

With this code, if you uncomment the line .with_column("test2", expr2)? you will get following plan error:
Error: Plan("array_filter expected a list as first argument, got Null")

Perhaps I missed something...?

Thanks

Expected behavior

The expr2 should run normally and produce a column test2 similar but not same as comun 'test' in addition to the following schema

+--------------+------------------------------------------------------------------------+----------------------------------+
| Utf8("true") | grouped                                                                | test                             |
| Utf8         | List(Struct("id": Int32, "name": Utf8, "l": List(Utf8)))               | List(Utf8)                       |
+--------------+------------------------------------------------------------------------+----------------------------------+
| true         | [{id: 1, name: Alice, l: [0, 1, 2]}, {id: 2, name: Bob, l: [0, 1, 2]}] | [Alice+[0, 1, 2], Bob+[0, 1, 2]] |
+--------------+------------------------------------------------------------------------+----------------------------------+

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions