MongoDB Kafka Source Connector выбрасывает java .lang.IllegalStateException: очередь заполнена при использовании copy.existing: true - PullRequest
0 голосов
/ 13 февраля 2020

При импорте данных из mongodb в kafka, используя соединитель, https://github.com/mongodb/mongo-kafka, он выдает java.lang.IllegalStateException: Queue full.

Я использую настройку по умолчанию copy.existing.queue.size, которая равна 16000, и copy.existing: true. Какое значение я должен установить? Размер коллекции составляет 10G.

Среда:

mongo-kafka-connect: 1.0.0
Kafka: 2.4.0
Kafka-Connect: 2.4.0
MongoDB server: 3.6.14
mongodb-driver-sync: 3.12.1

Stacktrace: org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalStateException: Queue full\n\tat com.mongodb.kafka.connect.source.MongoCopyDataManager.poll(MongoCopyDataManager.java:95)\n\tat com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:301)\n\tat com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:154)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)\n\tat java.base\/java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base\/java.lang.Thread.run(Unknown Source)\nCaused by: java.lang.IllegalStateException: Queue full\n\tat java.base\/java.util.AbstractQueue.add(Unknown Source)\n\tat java.base\/java.util.concurrent.ArrayBlockingQueue.add(Unknown Source)\n\tat com.mongodb.client.internal.Java8ForEachHelper.forEach(Java8ForEachHelper.java:30)\n\tat com.mongodb.client.internal.Java8AggregateIterableImpl.forEach(Java8AggregateIterableImpl.java:54)\n\tat com.mongodb.kafka.connect.source.MongoCopyDataManager.copyDataFrom(MongoCopyDataManager.java:123)\n\tat com.mongodb.kafka.connect.source.MongoCopyDataManager.lambda$new$0(MongoCopyDataManager.java:87)\n\t... 5 more

1 Ответ

0 голосов
/ 21 февраля 2020

Исправлено в https://github.com/mongodb/mongo-kafka/commit/7e6bf97742f2ad75cde394d088823b86880cdf4e

и будет выпущено после 1.0.0. Так что, если кто-то сталкивается с той же проблемой, пожалуйста, обновите версию до более поздней, чем 1.0.0.

...