Отправка данных DataSet вasticsearch - PullRequest
0 голосов
/ 04 сентября 2018

Я пытаюсь отправить некоторые данные из DataSet в эластичный поиск с помощью нового соединителя эластичного поиска, но я не могу найти здесь никаких ресурсов, кроме ресурсов для структуры потока данных:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html

Мой Набор данных - это набор данных строки (из запроса sql), это содержимое:

199947,6
199958,3
199964,2
199985,2

Я создал статический вложенный класс, который реализует ElasticsearchSinkFunction :

public static class NumberOfTransactionsByBlocks implements ElasticsearchSinkFunction<Row> {

    public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));

    }

    public IndexRequest createIndexRequest(Row element) {
        Map<String, String> json = new HashMap<>();
        json.put("block_number", element.getField(0).toString());
        json.put("numberOfTransactions", element.getField(1).toString());

        return Requests.indexRequest()
                .index("nbOfTransactionsByBlocks")
                .type("count-transactions")
                .source(json);
    }
}

И тогда моя проблема в том, что я не знаю, как отправить экземпляр моего внутреннего класса ...

DataSet<Row> data = tableEnv.toDataSet(sqlResult, Row.class);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");   // flush inserts after every event
config.put("cluster.name", "elasticsearch"); // default cluster name


data.output(new ElasticsearchSink<>(config, httpHosts, new NumberOfTransactionsByBlocks()));

У меня возникает ошибка при создании экземпляра ElasticsearchSink, в котором говорится:

не может вывести аргументы

Но когда я указываю тип (Row), он говорит:

ElasticsearchSink (java.util.Map, java.util.List, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction, org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler, org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory)» имеет частный доступ в 'Org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink'

Я что-то не так делаю?

1 Ответ

0 голосов
/ 05 сентября 2018

В настоящее время имеется (1.6.0) четыре различных разъема, предоставленных Flink для ElasticSearch.

  • v1.x : flink-connector-elasticsearch_2.11
  • v2.x : flink-connector-elasticsearch2_2.11
  • v5.x : flink-connector-elasticsearch5_2.11
  • v6.x : flink-connector-elasticsearch6_2.11

Убедитесь, что вы включили правильную зависимость maven в свой проект.

... имеет личный доступ в org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink

Теперь, исходя из трассировки, которой вы поделились, похоже, что вы используете зависимость для v6.x. Глядя на источник , можно предположить, что они переместили конструктор в private и добавили Builder [commit]

Итак, чтобы добавить ElasticsearchSink, вам нужно что-то вроде:

data.output(
  new ElasticsearchSink.Builder<>(httpHosts, new NumberOfTransactionsByBlocks())
    .setBulkFlushMaxActions(1)
    .build());

Кроме того, импорт будет

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
...