Изменение Flink ElasticSearchSink, чтобы не использовать сопоставление типов - PullRequest
0 голосов
/ 31 января 2020

Я столкнулся с некоторыми проблемами при реализации Flink, использующей Flink 1.9 и ElasticSearch 7.3.2. Мой канал извлекает события из Kafka и затем записывает их в ElasticSearch (после некоторой обработки). Однако я продолжаю получать следующее предупреждение:

WARN org.elasticsearch.client.RestClient - request [POST http://IP:PORT/_bulk?timeout=1m] returned 1 warnings: [299 Elasticsearch-7.3.2-1c1faf1 "[types removal] Specifying types in bulk requests is deprecated."]

И предупреждение совершенно ясно, однако, что мне интересно, так это то, что возможно обработать это предупреждение в приемнике ElasticSearch?

Я использую flink-connector -asticsearch6_2.11 , и вот как я реализовал следующую функцию Sink:

public class ElasticsearchEventSink implements ElasticsearchSinkFunction<HashMap> {

    public IndexRequest createIndexRequest(HashMap<String, Object> event) {

        OffsetDateTime currentDate = OffsetDateTime.now();
        String dateString = String.format("%s-%02d-%02d", currentDate.getYear(), currentDate.getMonthValue(), currentDate.getDayOfMonth());
        String indexName = "events";

        return Requests.indexRequest()
                .index(indexName + '-' + dateString)
                .type("_doc")
                .source(event);
    }

    @Override
    public void process(HashMap event, RuntimeContext runtimeContext, RequestIndexer indexer) {
        indexer.add(createIndexRequest(event));
    }
}

Благодарен за любую помощь !

...