Skip to content

Add internal connectors to external ML providers (Bedrock/SageMaker, etc) in the ml_inference processor#6772

Merged
Zhangxunmt merged 4 commits intoopensearch-project:mainfrom
Zhangxunmt:main
May 6, 2026
Merged

Add internal connectors to external ML providers (Bedrock/SageMaker, etc) in the ml_inference processor#6772
Zhangxunmt merged 4 commits intoopensearch-project:mainfrom
Zhangxunmt:main

Conversation

@Zhangxunmt
Copy link
Copy Markdown
Collaborator

@Zhangxunmt Zhangxunmt commented Apr 20, 2026

Description

This PR integrates connectors and executors directly into the MLInferenceProcessor, eliminating the need to specify a model_id in the ml-commons plugin. Instead of configuring models externally, the processor now natively supports Amazon Bedrock-hosted models, streamlining deployment and removing manual model setup. In the future, ideally we only need to add new connector blueprints in the resources folder to enable new models.
Specifically, this update includes 3 Bedrock models out-of-the-box:

"amazon.titan-embed-text-v2:0" – Text embedding model
"amazon.titan-embed-image-v1" – Image embedding model
"amazon.nova-2-multimodal-embeddings-v1:0" – Multimodal (text + image) embeddings

For other models, it's still required to setup model/connector in ml-commons as prerequisite to run the batch inference.

    - ml_inference:
        host: "<your OpenSearch url>"
        aws_sigv4: true
        action_type: "batch_predict"
        service_name: "bedrock"
        model_id: "amazon.titan-embed-text-v2:0"
        job_role_arn: arn:aws:iam::xxxxxxxxxx:role/bedrock-test-batch-api

A new param "job_role_arn" is added in the processor config, which is the role passed to Bedrock/SageMaker to run the job. The role needs to have below Permissions

    "s3:GetObject",
    "s3:ListBucket"
    "s3:PutObject"

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@Zhangxunmt Zhangxunmt changed the title Add internal connectors to bedrock/sagemaker Add internal connectors in the ml_inference processor to Bedrock/SageMaker Apr 22, 2026
@Zhangxunmt Zhangxunmt changed the title Add internal connectors in the ml_inference processor to Bedrock/SageMaker Add internal connectors to external ML providers (Bedrock/SageMaker, etc) in the ml_inference processor Apr 22, 2026
graytaylor0
graytaylor0 previously approved these changes Apr 22, 2026
Comment on lines +123 to +125
// When a built-in connector was resolved for the model_id, send directly to the
// model provider (e.g. Bedrock) via the typed RemoteConnectorExecutor.
// All other model IDs continue to use the existing ml-commons path.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still be requiring the "host" opensearch endpoint for this processor? Or will we eventually not require it if the model id / model is resolved.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch. In this case using the build-in connector the host is not needed so we can make it optional. It's only required when you want to use a model_id from the ml plugin.

Comment on lines +46 to +50
@JsonProperty("pre_process_function")
private String preProcessFunction;

@JsonProperty("post_process_function")
private String postProcessFunction;
Copy link
Copy Markdown
Member

@graytaylor0 graytaylor0 Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do these work?

Copy link
Copy Markdown
Collaborator Author

@Zhangxunmt Zhangxunmt Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the functions to pre-process and post-process the input and output data to ensure it fits the interphase of the "text-embedding" processors in AOS. These are only needed for realtime predictions. For the batch inference, these are not relevant. But eventually we will support realtime predict as another Action too so this is still defined here in the connector.

@graytaylor0
Copy link
Copy Markdown
Member

Some checkstyle errors failing the build:

Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/ml-inference-processor/src/test/java/org/opensearch/dataprepper/plugins/ml_inference/processor/common/BedrockBatchJobCreatorTest.java:23:8: Unused import - org.opensearch.dataprepper.plugins.ml_inference.processor.connector.ConnectorActionType. [UnusedImports]
> Task :data-prepper-plugins:ml-inference-processor:checkstyleTest FAILED
Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/ml-inference-processor/src/test/java/org/opensearch/dataprepper/plugins/ml_inference/processor/common/BedrockBatchJobCreatorTest.java:33:8: Unused import - java.util.Map. [UnusedImports]
Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/ml-inference-processor/src/test/java/org/opensearch/dataprepper/plugins/ml_inference/processor/common/BedrockBatchJobCreatorTest.java:40:15: Unused import - org.mockito.ArgumentMatchers.eq. [UnusedImports]
Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/ml-inference-processor/src/test/java/org/opensearch/dataprepper/plugins/ml_inference/processor/common/BedrockBatchJobCreatorTest.java:41:15: Unused import - org.mockito.Mockito.doThrow. [UnusedImports]


graytaylor0
graytaylor0 previously approved these changes Apr 24, 2026
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Zhangxunmt for this great new feature!

Also, I really like the approach you took with the annotations and JSON SubType for reading the JSON configurations.

this.connectorExecutor = buildConnectorExecutor(mlProcessorConfig, awsCredentialsSupplier);
}

private static RemoteConnectorExecutor buildConnectorExecutor(final MLProcessorConfig config,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my refactoring suggestion below this would go into a new class.

/**
* Model ID for Amazon Bedrock Titan Multi-modal Embeddings V1.
*/
public static final String TITAN_MULTIMODAL_EMBED_V1_MODEL_ID = "amazon.titan-embed-image-v1";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your PR description lists this:

"amazon.nova-2-multimodal-embeddings-v1:0" – Multimodal (text + image) embeddings

But I don't see it here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this model is bedrock supported for batch inference but only in the IAD region so I thought it can be added later. I will add it in the revision anyways.


final HttpExecuteRequest executeRequest = HttpExecuteRequest.builder()
.request(signedRequest)
.contentStreamProvider(request.contentStreamProvider().orElse(null))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be:

signedRequest.contentStreamProvider().orElse(null)

This is currently providing the unsigned request.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure updated. But I believe only this .request(signedRequest) matters. The contentStreamProvider should be the same between signedRequest and request. This code has been verified running fine actually.

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zhangxunmt , just one more comment from me.

try {
final URI uri = dirUrl.toURI();
if ("jar".equals(uri.getScheme())) {
try (FileSystem fs = FileSystems.newFileSystem(uri, Collections.emptyMap());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the overall change here to scan the classpath for these files. But the specific implementation is not ideal using newFileSystem. Instead, you can probably use reflections or spring-core.

Using reflections, it might be something like this:

new Reflections(RESOURCE_BASE).getResources(Pattern.compile(".*\\.json"))

With spring-core, you could look into PathMatchingResourcePatternResolver.

I'd probably say try using reflections. Data Prepper currently has both dependencies.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the call out. Updated to use the reflection lib. @dlvenable

Zhangxunmt added 4 commits May 4, 2026 13:04
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
…and add nova embedding model connector

Signed-off-by: Xun Zhang <xunzh@amazon.com>
Signed-off-by: Xun Zhang <xunzh@amazon.com>
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Zhangxunmt for this great new feature!

@Zhangxunmt Zhangxunmt merged commit c64c147 into opensearch-project:main May 6, 2026
72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants