Слишком много текущих снимков. Увеличьте размер пула производителей кафки или уменьшите количество одновременных контрольных точек - PullRequest
1 голос
/ 21 марта 2020

Я работаю над приложением Flink, которое опускается до Кафки. Я создал производителя Kafka с размером пула по умолчанию 5. Я включил контрольные точки со следующей конфигурацией:

    env.enableCheckpointing(1800000);//checkpointing for every 30 minutes.

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);

    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

Приложение иногда продолжает падать, за исключением следующего. Эта проблема связана с размером пула производителей kafka или контрольными точками?

2020-03-20 22:31:23,859 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer011 0/1 aborted recovered transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=FileSplitReader -> metrics-map -> Sink: components-topic-sink-4ab008489d4c8ed0fe577883438cc1ff-1, producerId=21, epoch=3], transactionStartTime=1584742933826}
2020-03-20 22:31:23,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
java.lang.NullPointerException
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
2020-03-20 22:31:23,861 INFO  org.apache.flink.runtime.taskmanager.Task                     - FileSplitReader -> metrics-map -> Sink: components-topic-sink (1/1) (92b7f3ed8f6362fe0087efd40eb94016) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createTransactionalProducer(FlinkKafkaProducer011.java:934)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:701)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:97)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:394)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:385)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:862)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)

Ответы [ 2 ]

0 голосов
/ 21 марта 2020

Рекомендую обновить до новейшего разъема flink / kafka - похоже, вы используете FlinkKafkaProducer011, который предназначен для Kafka 0.11.

Вы должны использовать FlinkKafkaProducer от универсального Разъем кафки: флинк-коннектор-кафка. Начиная с Flink 1.9, здесь используется клиент Kafka 2.2.0.

В maven вы хотите указать

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

или заменить 2.11 на 2.12, если вы используете Scala 2.12.

0 голосов
/ 21 марта 2020

Трудно сказать без доступа к среде.

Это может быть связано с указанным c кодом, который вы используете. В основном вы нажимаете это исключение.

Пара вещей:

  1. Это аналогичная проблема, связанная с массивом в коде : Прерывается при присоединении к ioThread / Error при удалении оператора потока в приложении flink

  2. Похоже, вы работаете в Kubernetes, и если вы посмотрите на , это вы видите, что проблема может быть связана с неудачным разрывом соединения или отсутствием связи между менеджерами заданий и задач, поэтому вы можете проверить сеть в вашем кластере Kubernetes и убедиться, что все ваши модули Flink могут общаться друг с другом.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...