Я пытаюсь отправить некоторые данные из DataSet в эластичный поиск с помощью нового соединителя эластичного поиска, но я не могу найти здесь никаких ресурсов, кроме ресурсов для структуры потока данных:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html
Мой Набор данных - это набор данных строки (из запроса sql), это содержимое:
199947,6
199958,3
199964,2
199985,2
Я создал статический вложенный класс, который реализует ElasticsearchSinkFunction :
public static class NumberOfTransactionsByBlocks implements ElasticsearchSinkFunction<Row> {
public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
public IndexRequest createIndexRequest(Row element) {
Map<String, String> json = new HashMap<>();
json.put("block_number", element.getField(0).toString());
json.put("numberOfTransactions", element.getField(1).toString());
return Requests.indexRequest()
.index("nbOfTransactionsByBlocks")
.type("count-transactions")
.source(json);
}
}
И тогда моя проблема в том, что я не знаю, как отправить экземпляр моего внутреннего класса ...
DataSet<Row> data = tableEnv.toDataSet(sqlResult, Row.class);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1"); // flush inserts after every event
config.put("cluster.name", "elasticsearch"); // default cluster name
data.output(new ElasticsearchSink<>(config, httpHosts, new NumberOfTransactionsByBlocks()));
У меня возникает ошибка при создании экземпляра ElasticsearchSink, в котором говорится:
не может вывести аргументы
Но когда я указываю тип (Row), он говорит:
ElasticsearchSink (java.util.Map,
java.util.List,
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction,
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler,
org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory)»
имеет частный доступ в
'Org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink'
Я что-то не так делаю?