Я столкнулся с некоторыми проблемами при реализации Flink, использующей Flink 1.9 и ElasticSearch 7.3.2. Мой канал извлекает события из Kafka и затем записывает их в ElasticSearch (после некоторой обработки). Однако я продолжаю получать следующее предупреждение:
WARN org.elasticsearch.client.RestClient - request [POST http://IP:PORT/_bulk?timeout=1m] returned 1 warnings: [299 Elasticsearch-7.3.2-1c1faf1 "[types removal] Specifying types in bulk requests is deprecated."]
И предупреждение совершенно ясно, однако, что мне интересно, так это то, что возможно обработать это предупреждение в приемнике ElasticSearch?
Я использую flink-connector -asticsearch6_2.11 , и вот как я реализовал следующую функцию Sink:
public class ElasticsearchEventSink implements ElasticsearchSinkFunction<HashMap> {
public IndexRequest createIndexRequest(HashMap<String, Object> event) {
OffsetDateTime currentDate = OffsetDateTime.now();
String dateString = String.format("%s-%02d-%02d", currentDate.getYear(), currentDate.getMonthValue(), currentDate.getDayOfMonth());
String indexName = "events";
return Requests.indexRequest()
.index(indexName + '-' + dateString)
.type("_doc")
.source(event);
}
@Override
public void process(HashMap event, RuntimeContext runtimeContext, RequestIndexer indexer) {
indexer.add(createIndexRequest(event));
}
}
Благодарен за любую помощь !