Я использую Kafka Connect в Kubernetes (8-16 узлов, автоматическое масштабирование). Я определил всего 44 коннектора, по одному на тему Kafka (один раздел на тему). Эти темы производятся Debezium / Postgresql. Есть 3 узла Кафки. Каждый соединитель имеет значение tasks.max, равное 4. Большинство моих соединителей (но не каждый!) Имеют одну (всегда одну) невыполненную задачу из-за java.lang.IllegalStateException: нет текущего назначения для раздела -0.
Здесь не эксперт по Kafka, обратите внимание;) Я предполагаю, что есть 3 узла Kafka, поэтому у 3 рабочих все отлично, и к 4-м задачам подключаться нечего, поэтому они не работают. Но почему иногда 4 задачи выполняются просто отлично?
Кроме того, у меня довольно часто возникает проблема «конфликтующие операции из-за перебалансировки», которая может возникать в течение нескольких минут или даже часов. Недавно я удалил все модули, и они сами перезагрузились, проблема исчезла, но это не является долгосрочным решением.
Что такое рекомендуемое значение tasks.max? Заранее спасибо!
Исключение:
java.lang.IllegalStateException: No current assignment for partition table-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:70)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
Коннектор раковины:
connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas true
sanitizeTopics true
autoCreateTables true
topics <topic-name>
tasks.max 3
schemaRegistryLocation http://<ip>:8081
project <big-query-project>
maxWriteSize 10000
datasets .*=<big-query-dataset>
task.class com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait 1000
bufferSize 100000
И это исключение выше java.lang.IllegalStateException: No current assignment for [...]