Kafka Connect: java.lang.IllegalStateException: нет текущего назначения для раздела - PullRequest
0 голосов
/ 24 марта 2019

Я использую Kafka Connect в Kubernetes (8-16 узлов, автоматическое масштабирование). Я определил всего 44 коннектора, по одному на тему Kafka (один раздел на тему). Эти темы производятся Debezium / Postgresql. Есть 3 узла Кафки. Каждый соединитель имеет значение tasks.max, равное 4. Большинство моих соединителей (но не каждый!) Имеют одну (всегда одну) невыполненную задачу из-за java.lang.IllegalStateException: нет текущего назначения для раздела -0.

Здесь не эксперт по Kafka, обратите внимание;) Я предполагаю, что есть 3 узла Kafka, поэтому у 3 рабочих все отлично, и к 4-м задачам подключаться нечего, поэтому они не работают. Но почему иногда 4 задачи выполняются просто отлично?

Кроме того, у меня довольно часто возникает проблема «конфликтующие операции из-за перебалансировки», которая может возникать в течение нескольких минут или даже часов. Недавно я удалил все модули, и они сами перезагрузились, проблема исчезла, но это не является долгосрочным решением.

Что такое рекомендуемое значение tasks.max? Заранее спасибо!

Исключение:

java.lang.IllegalStateException: No current assignment for partition table-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:70)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748

Коннектор раковины:

connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas   true
sanitizeTopics  true
autoCreateTables    true
topics  <topic-name>
tasks.max   3
schemaRegistryLocation  http://<ip>:8081
project <big-query-project>
maxWriteSize    10000
datasets    .*=<big-query-dataset>
task.class  com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name    <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait  1000
bufferSize  100000

И это исключение выше java.lang.IllegalStateException: No current assignment for [...]

1 Ответ

1 голос
/ 28 марта 2019

Стоимость имущества tasks.max зависит от нескольких факторов. Наиболее важным является конкретный разъем. Конкретный соединитель зависит от его логики и значения tasks.max, вычисляемого числа Task, которое будет создано. ех. FileStreamSourceConnector всегда создает 1 задание, поэтому даже если вы передадите значение выше 1, оно создаст только одно. Та же ситуация с PostgresConnector это параллельно одному.

tasks.max значение также должно зависеть от других факторов, таких как: режим Kafka Connect, сколько у вас экземпляров Kafka Connect, процессор компьютеров и т. Д.

Насколько я понимаю, вы используете SourceConnector (PostgresConnector). Соединители источников не опрашивают данные от Kafka. Исключение, которое вы опубликовали, относится к некоторым SinkConnector. Если вы используете SinkConnector, ваш tasks.max не должен превышать количество разделов. Если вы запустите больше заданий, чем число разделов, некоторые из них будут простаивать (состояние выполняется, но они не обрабатывают данные), и может произойти перебалансировка.

...