простой код соединения Apache Flink и эластичного поиска - PullRequest
0 голосов
/ 22 октября 2019

Я использую следующий код для связи между flink apache иasticsearch, но у меня много проблем. Я не понимаю многие из следующих классов кода. Например, как работать с классом ElasticsearchSink.Builder в следующем коде? Или как устроена структура ElasticsearchSink.Builder? Я вижу много кода на GitHub, но они очень сложны. Пожалуйста, помогите мне.

DataStream<String> input = ...;
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }

    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}
);

// configuration for the bulk requests; this instructs the sink to emit after every element, 
otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
}
);

// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
...