Как подключить Elasticsearch к Кафке? - PullRequest
0 голосов
/ 27 сентября 2018

Я новичок в Kafka иasticSearch,

я хочу перенести данные Elasticsearch в тему kafka, с соединителем (источник)

Я вижу для kafka дляasticsearch, но я хочуasticsearch to kafka

Я делаю этот соединитель с logstash, но это не совсем то, что я хочу, я хочу, чтобы kafka слушалasticsearch et pull в теме обо всех изменениях базы.

Я нашел плагинбанка для конфлюэнт-коннектора

[здесь] [1]

но у меня ошибка не работает

{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"bkQiBFqdQL-hCbWApo6bBQ","index":"metric"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":"metric","node":"wMGlz0c9Q5yViW0bsKhKOA","reason":{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"bkQiBFqdQL-hCbWApo6bBQ","index":"metric"}}]},"status":400}
            at org.elasticsearch.client.RestClient$1.completed(RestClient.java:357)
            at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346)
            at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
            at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
            at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:432)
            at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:325)
            at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:267)
            at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
            at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
            at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:116)
            at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:164)
            at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:339)
            at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:317)
            at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:278)
            at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106)
            at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590)
            ... 1 more

коннектор используйте эту конфигурацию

{       "name": "elastic-source",
"config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
                            "tasks.max": "1",
                            "es.host" : "localhost",
                            "es.port" : "9200",
                            "index.prefix" : "metric",
                            "topic.prefix" : "es_",
                            "incrementing.field.name" : "@timestamp"
    }

}

это пример импорта данных в моем индексе

{"index":{"_index": "movies","_type":"movie","_id":1}}
{"fields" : {"directors" : ["Joseph Gordon-Levitt"],"release_date" : "2013-01-18T00:00:00Z","rating" : 7.4,"genres" : ["Comedy","Drama"],"image_url" : "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg","plot" : "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.","title" : "Don Jon","rank" : 1,"running_time_secs" : 5400,"actors" : ["Joseph Gordon-Levitt","Scarlett Johansson","Julianne Moore"],"year" : 2013},"id" : "tt2229499","type" : "add"}

Спасибо за помощь!:)

РЕДАКТИРОВАТЬ

Я изменяю свой индекс с этими данными

    {"index":{"_index": "test","_type":"test","_id":1}}
{"fields" : {"directors" : "Joseph Gordon-Levitt","release_date" : "2013-01-18T00:00:00Z"}}

и этот конфиг

{       "name": "elastic-source",
    "config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
                                "tasks.max": "1",
                                "es.host" : "localhost",
                                "es.port" : "9200",
                                "index.prefix" : "test",
                                "topic.prefix" : "es_",
                                "incrementing.field.name" : "_score"
        }
}

и теперь у меня есть эта ошибка

[2018-09-28 11:00:06,367] INFO index test-elasticsearch-sink total messages: null  (com.github.dariobalinzo.task.ElasticSourceTask:149)
[2018-09-28 11:00:06,367] INFO fetching from test (com.github.dariobalinzo.task.ElasticSourceTask:143)
[2018-09-28 11:00:06,373] INFO total shard 5, successuful: 5 (com.github.dariobalinzo.task.ElasticSourceTask:200)
[2018-09-28 11:00:06,373] ERROR error fetching min value (com.github.dariobalinzo.task.ElasticSourceTask:220)
java.lang.NullPointerException
        at com.github.dariobalinzo.task.ElasticSourceTask.fetchLastOffset(ElasticSourceTask.java:215)
        at com.github.dariobalinzo.task.ElasticSourceTask.lambda$poll$0(ElasticSourceTask.java:144)
        at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
        at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

У вас есть идея?Спасибо !!

  [1]: https://medium.com/@dariobalinzo/kafka-connect-elasticsearch-source-connector-1a8c16a0e8eb
...