У меня была проблема с приложением Spark структурированной потоковой передачи (SSS), которое зависло из-за программной ошибки и не обрабатывалось в выходные дни. Когда я перезапустил его, было много сообщений по темам для повторной обработки (около 250 000 сообщений по 3 темам, которые необходимо объединить).
При перезапуске приложение снова зависало с исключением OutOfMemory. Из документации я узнал, что конфигурация maxOffsetsPerTrigger
в потоке чтения должна помочь именно в этих случаях. Я изменил код PySpark (работает на SSS 2.4.3, кстати), чтобы во всех 3 темах было что-то вроде следующего
rawstream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("maxOffsetsPerTrigger", 10000L)
.option("startingOffsets", "earliest")
.load()
Я ожидал бы, что теперь запрос SSS будет загружать ~ 33 000 смещений из каждой темы и объединять их в первом пакете. Затем во второй партии он будет очищать записи состояния из первой партии, срок действия которых истекает из-за водяного знака (что приведет к очистке большинства записей из первой партии), а затем читать еще ~ 33 тыс. Из каждой темы. Таким образом, после ~ 8 пакетов он должен обработать задержку с «разумным» объемом памяти.
Но приложение продолжало аварийно завершать работу с OOM, и когда я проверил DAG в основном интерфейсе приложения, он сообщил, что снова попытался прочитать все 250 000 сообщений.
Есть ли что-то еще, что мне нужно настроить? Как я могу проверить, что эта опция действительно используется? (когда я проверяю план, к сожалению, он обрезан и показывает только (Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...)
, я не мог найти способ показать часть после точек)