Я использую следующий код для связи между flink apache иasticsearch, но у меня много проблем. Я не понимаю многие из следующих классов кода. Например, как работать с классом ElasticsearchSink.Builder в следующем коде? Или как устроена структура ElasticsearchSink.Builder? Я вижу много кода на GitHub, но они очень сложны. Пожалуйста, помогите мне.
DataStream<String> input = ...;
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after every element,
otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
}
);
// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());