Запись большого DataFrame из PySpark в Kafka заканчивается временем ожидания - PullRequest
0 голосов
/ 13 декабря 2018

Я пытаюсь записать фрейм данных, который содержит около 230 миллионов записей для Кафки.В частности, к концентратору событий Azure с поддержкой Kafka , но я не уверен, является ли это источником моей проблемы.

EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

Это нормально запускается и пишет о 3-4 миллиона записей успешно (и довольно быстро) в очереди.Но затем через несколько минут задание останавливается с такими сообщениями:

org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 6 на этапе 7.0 не выполнено 4 раза, самое последнееОшибка: потерянное задание 6.3 на этапе 7.0 (TID 248, 10.139.64.5, исполнитель 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Срок действия истек 61 записей для mytopic-18: с тех пор прошло 32839 мспоследнее добавление

или

org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 13 на этапе 8.0 не выполнено 4 раза, последний сбой: потерянзадача 13.3 на этапе 8.0 (TID 348, 10.139.64.5, исполнитель 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: истекло время ожидания запроса.

Кроме того, я никогда не вижуфайл контрольной точки создается / записывается в.Я также играл с .option("kafka.delivery.timeout.ms", 30000) и другими значениями, но это, похоже, не оказало никакого влияния.

Я запускаю это в кластере Azure Databricks версии 5.0 (включает Apache Spark 2.4.0, Scala 2.11)

Я не вижу каких-либо ошибок, таких как регулирование на моем концентраторе событий, так что все должно быть в порядке.

1 Ответ

0 голосов
/ 03 января 2019

Наконец понял (в основном):

Оказалось, что размер пакета по умолчанию около 16000 сообщений был слишком большим для конечной точки.После того, как я установил для параметра batch.size значение 5000, он заработал и записывает около 700 тыс. Сообщений в минуту в концентратор событий.Кроме того, указанный выше параметр времени ожидания был неверным и просто игнорировался.Это kafka.request.timeout.ms

Единственная проблема заключается в том, что случайным образом он все еще работает в тайм-аутах и, очевидно, начинается снова с самого начала, так что я получаю дубликаты.Откроется другой вопрос для этого.

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
...