kafka mon go conctor не работает должным образом - PullRequest
0 голосов
/ 22 января 2020

Я использую Kafka Connect (источник) с mon go, работник Connect работает, но не записывает данные в топику Kafka c, я использую исходный соединитель и следующий файл конфигурации для разъем:

name=mongo-ff
tasks.max=1
connector.class =com.mongodb.kafka.connect.MongoSourceConnector
database=haifa
collection=alerts
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup

, но когда я пытаюсь получить данные из топи c:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefix  --from-beginning

я не получил никаких данных.

Кажется как будто он пишет не так, как ожидалось.

часть стека журналов:

    ... 3 more
[2020-01-22 17:19:52,727] INFO Opened connection [connectionId{localValue:3, serverValue:20}] to localhost:27017 (org.mongodb.driver.connection:71)
[2020-01-22 17:19:52,732] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 3]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2854274} (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,732] INFO Discovered cluster type of STANDALONE (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,733] ERROR Expecting a single STANDALONE, but found more than one.  Removing localhost:27017 from client view of cluster. (org.mongodb.driver.cluster:101)
[2020-01-22 17:19:52,735] INFO Cluster ID: sZ64WgvDRBmrJnawsRJ_7A (org.apache.kafka.clients.Metadata:379)
[2020-01-22 17:19:52,756] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
[2020-01-22 17:20:02,647] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:02,648] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)

getting some error:
ERROR WorkerSourceTask{id=mongo-fff-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27018, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}, {address=localhost:27019, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}]
    at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
    at com.mongodb.internal.connection.AbstractMultiServerCluster.getDescription(AbstractMultiServerCluster.java:54)
    at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:152)
    at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:103)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:284)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:188)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:203)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53)
    at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:128)
    at com.mongodb.client.internal.ChangeStreamIterableImpl$1.iterator(ChangeStreamIterableImpl.java:123)
    at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:236)
    at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:136)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2020-01-22 18:29:09,590] ERROR WorkerSourceTask{id=mongo-fff-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2020-01-22 18:29:09,593] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1145)
[2020-01-22 18:29:19,482] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)

1 Ответ

0 голосов
/ 22 января 2020

Я думаю, что ваши ожидания не совсем верны.

В файле конфигурации вы установили префикс topi c:

topic.prefix=someprefix

, но вы пытаетесь использовать из topi c с именем someprefix:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefix  --from-beginning

Обратите внимание, что параметр конфигурации topic.prefix - это не имя топика c, который будет создан, а просто его префикс:

topic.prefix

Префикс, добавляемый к именам таблиц, для создания имени Kafka topi c для публикации sh данных или в случае пользовательских запрос, полное имя topi c to publi sh to.

Тип: string

По умолчанию: “”

Важность: high

Следовательно, если в вашей базе данных haifa есть таблица с именем users, то topi c будет созданный на основе вашего файла конфигурации будет иметь имя someprefixusers (я предлагаю вам использовать дефис, например topic.prefix=someprefix-, чтобы конечное имя topi c было более читабельным). Поэтому вам придется использовать записи из этой топи c:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefixusers  --from-beginning

РЕДАКТИРОВАТЬ:

Вы получаете ошибку Connection Refused, которая означает, что либо ваш mongodb не запущен или вы неправильно подключаетесь к базе данных.

Во-первых, убедитесь, что mongodb запущен и работает через mongoclient:

mongo mongoHost:mongoPort/dbname

Во-вторых, кажется, что connection.uri отсутствует в файле конфигурации вашего коннектора.

...