Spark структурированная потоковая передача maxOffsetsPerTrigger не работает - PullRequest
1 голос
/ 25 июня 2019

У меня была проблема с приложением Spark структурированной потоковой передачи (SSS), которое зависло из-за программной ошибки и не обрабатывалось в выходные дни. Когда я перезапустил его, было много сообщений по темам для повторной обработки (около 250 000 сообщений по 3 темам, которые необходимо объединить).

При перезапуске приложение снова зависало с исключением OutOfMemory. Из документации я узнал, что конфигурация maxOffsetsPerTrigger в потоке чтения должна помочь именно в этих случаях. Я изменил код PySpark (работает на SSS 2.4.3, кстати), чтобы во всех 3 темах было что-то вроде следующего

 rawstream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topicName)
    .option("maxOffsetsPerTrigger", 10000L)
    .option("startingOffsets", "earliest")
    .load()

Я ожидал бы, что теперь запрос SSS будет загружать ~ 33 000 смещений из каждой темы и объединять их в первом пакете. Затем во второй партии он будет очищать записи состояния из первой партии, срок действия которых истекает из-за водяного знака (что приведет к очистке большинства записей из первой партии), а затем читать еще ~ 33 тыс. Из каждой темы. Таким образом, после ~ 8 пакетов он должен обработать задержку с «разумным» объемом памяти.

Но приложение продолжало аварийно завершать работу с OOM, и когда я проверил DAG в основном интерфейсе приложения, он сообщил, что снова попытался прочитать все 250 000 сообщений.

Есть ли что-то еще, что мне нужно настроить? Как я могу проверить, что эта опция действительно используется? (когда я проверяю план, к сожалению, он обрезан и показывает только (Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...), я не мог найти способ показать часть после точек)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...