[FLINK-AGENTS-524] Add Amazon OpenSearch and S3 Vectors vector store integrations#533
[FLINK-AGENTS-524] Add Amazon OpenSearch and S3 Vectors vector store integrations#533avichaym wants to merge 8 commits intoapache:mainfrom
Conversation
…grations Add two new integration modules for Amazon Bedrock: - Chat model using the Converse API with native tool calling support, SigV4 auth via DefaultCredentialsProvider, and token metrics reporting. Supports all Bedrock models accessible via Converse API. - Embedding model using Titan Text Embeddings V2 via InvokeModel. Batch embed(List<String>) parallelizes via configurable thread pool (embed_concurrency parameter, default 4). Includes unit tests for constructors, parameter handling, and inheritance.
…ockEmbeddingModelConnection
56b9836 to
9f4f768
Compare
|
@avichaym Please add the following content to your PR description and select a checkbox: |
9f4f768 to
c492b13
Compare
…ences
- Add exponential backoff retry (MAX_RETRIES=5) for ThrottlingException,
ServiceUnavailableException, ModelErrorException, 429, 503 — consistent
with BedrockEmbeddingModelConnection in this PR.
- Remove {..} JSON extraction fallback from stripMarkdownFences that could
corrupt normal text responses containing braces.
- Only apply markdown fence stripping on non-tool-call responses.
- Add 5 unit tests for stripMarkdownFences covering: text with braces,
clean JSON, json fences, plain fences, and null input.
…grations Add two new vector store integration modules: - OpenSearch: supports Serverless (AOSS) and Service domains, IAM (SigV4) or basic auth. Implements CollectionManageableVectorStore. ANN search via knn query with ef_search, min_score, and filter_query support. Bulk writes chunked by configurable max_bulk_mb. - S3 Vectors: uses S3 Vectors SDK for PutVectors/QueryVectors/ GetVectors/DeleteVectors. PutVectors chunked at 500 (API limit). Both override add() for batch embedding via embed(List<String>). Includes unit tests and integration tests (auto-enabled via OPENSEARCH_ENDPOINT / S3V_BUCKET environment variables). Validated against real OpenSearch domain and S3 Vectors bucket.
…docs Bedrock (chat + embedding): - Add close() to release AWS SDK clients and thread pools - Wire max_tokens through BedrockChatModelSetup into InferenceConfiguration - Add retry jitter to BedrockEmbeddingModelConnection - Add typed getConnection() override to BedrockEmbeddingModelSetup - Document stripMarkdownFences necessity and future work OpenSearch vector store: - Add close() to release SdkHttpClient - Cache DefaultCredentialsProvider (was creating new instance per request) - Add constructor validation for required endpoint/index params - Add limit support in get() via extraArgs - Add TODO for Aws4Signer deprecation and batch add() dedup S3 Vectors vector store: - Add close() to release S3VectorsClient - Add constructor validation for required vector_bucket/vector_index params - size() now throws UnsupportedOperationException instead of returning -1 - Add TODO for batch add() dedup All files: expand wildcard imports, add usage example Javadocs
c492b13 to
dd77d50
Compare
OpenSearchVectorStore: - Add retry with exponential backoff for 429/502/503 in executeRequest() - Only ignore 404s in getCollection/deleteCollection (not all exceptions) - Close credentialsProvider in close() S3VectorsVectorStore: - Add retry with backoff for putVectors (ThrottlingException, 429, 503) Consistent with retry patterns in BedrockChatModelConnection and BedrockEmbeddingModelConnection.
| * Batch-embeds all documents in a single call, then delegates to addEmbedding. | ||
| * | ||
| * <p>TODO: This batch embedding logic is duplicated in S3VectorsVectorStore. Consider | ||
| * extracting to BaseVectorStore in a follow-up (would also benefit ElasticsearchVectorStore). |
There was a problem hiding this comment.
+1 for implementing this batch embedding logic in BaseVectorStore directly.
|
|
||
| this.index = descriptor.getArgument("index"); | ||
| if (this.index == null || this.index.isBlank()) { | ||
| throw new IllegalArgumentException("index is required for OpenSearchVectorStore"); |
There was a problem hiding this comment.
Could index be null but indicate index in each operation?
| @Nullable List<String> ids, @Nullable String collection, Map<String, Object> extraArgs) | ||
| throws IOException { | ||
| if (ids == null || ids.isEmpty()) { | ||
| return; |
There was a problem hiding this comment.
In current design, if ids is not provided, vector store should get/delete all the documents in the collection. The behavior of s3vectors is inconsistent, we need throw exception when ids is not provided for s3vectors or emphasize this point in the documentation.
| body.put("size", ids.size()); | ||
| return parseHits(executeRequest("POST", "/" + idx + "/_search", body.toString())); | ||
| } | ||
| int limit = 10000; |
There was a problem hiding this comment.
Maybe a static variable is better?
| this.client = | ||
| S3VectorsClient.builder() | ||
| .region(Region.of(regionStr != null ? regionStr : "us-east-1")) | ||
| .credentialsProvider(DefaultCredentialsProvider.create()) |
There was a problem hiding this comment.
According to https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html#create(), create is deprecated, builder().build() is better.
Linked issue: #524
Depends on #534 — please merge that first.
Purpose of change
Add Amazon OpenSearch and S3 Vectors as vector store providers.
OpenSearchVectorStore— Supports Serverless (AOSS) and Service domains, IAM/basic auth, implementsCollectionManageableVectorStorefor Long-Term Memory, KNN search with filter support, chunked bulk writesS3VectorsVectorStore— S3 Vectors SDK, PutVectors chunked at 500 (API limit)Both override
add()for batch embedding optimization.New modules:
integrations/vector-stores/opensearch/,integrations/vector-stores/s3vectors/Tests
OPENSEARCH_ENDPOINT,S3V_BUCKET): collection CRUD, document CRUD, filtered queryAPI
No public API changes. New integration modules only.
Documentation
doc-neededdoc-not-neededdoc-included