Как установить спарк потребительский кеш? исправить ошибку «KafkaConsumer достигает максимальной емкости 64»? - PullRequest
0 голосов
/ 18 октября 2019

Я использую spark-sql 2.4.1, spark-cassandra-connector_2.11-2.4.1.jar и java8. При вставке данных из темы kafka в данные таблицы C * / Cassandra.

Я получаю сообщение об ошибке:

 org.apache.spark.sql.kafka010.KafkaDataConsumer - KafkaConsumer cache hitting max capacity of 64, removing consumer for CacheKey(spark-kafka-source-33321dde-bfad-49f3-bdf7-09f95883b6e9--1249540122-executor)

Как решить эту проблему?

Раздел 2:

Я использую следующие параметры

Dataset<Row> df = sparkSession
                      .readStream()
                      .format("kafka")
                      ///other options
                      .option("startingOffsets", "latest")
                      .option("retries", 1)
                      .option("linger.ms", 10)
                      .option("enable.auto.commit", false)
                      .option("failOnDataLoss", false)
                      .option("maxOffsetsPerTrigger", 500)
                   .option("spark.streaming.kafka.consumer.cache.enabled",false)
                      .load(); 

Тем не менее я получаю сообщение об ошибке:

 org.apache.spark.sql.kafka010.KafkaDataConsumer - KafkaConsumer cache hitting max capacity of 64, removing consumer for CacheKey(spark-kafka-source-33321dde-bfad-49f3-bdf7-09f95883b6e9--1249540122-executor)

Ответы [ 2 ]

1 голос
/ 29 октября 2019

Я не уверен, что вы ожидаете здесь, но я делюсь своими мыслями.

  1. "spark.streaming.kafka.consumer.cache.enabled" - это флаг DStreams, но упомянутый API + предупреждающее сообщение - это структурированная потоковая передача,Пожалуйста, не перепутайте, потому что это два совершенно разных продукта.

  2. Поскольку вы используете API-интерфейс структурированной потоковой передачи, предполагается, что это продукт, который вы изначально хотели. Обратите внимание, что в структурированной потоковой передаче кэширование потребителей Kafka невозможно отключить, но размер кэша можно настроить с помощью "spark.sql.kafkaConsumerCache.capacity" (мягкое ограничение). К вашему сведению, в Spark 3.0 мы переписали весь этот механизм.

  3. Если задание достигает размера кэша по умолчанию, равного 64, это означает, что по крайней мере 64 потока пытаются прочитать точно такую ​​же темуРазделение внутри одной JVM. Я вряд ли могу представить, что это эффективно в любом случае. Тесты производительности могут найти правильные числа.

Я предлагаю горизонтальное масштабирование (можно добавить больше исполнителей). Поскольку я не вижу сам код, я предлагаю понять, почему Spark пытается прочитать один и тот же раздел раздела из стольких потоков и ограничить его.

1 голос
/ 21 октября 2019

Я думаю, что это WARN, в любом случае это проблема с документацией.

Вы можете проверить эту ссылку https://issues.apache.org/jira/browse/SPARK-25466

Размер кэша можно настроить с помощью параметра spark.sql.kafkaConsumerCache.capacity.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...