Я читаю пакетную запись из Redis, используя потоковую обработку с искрой foreachBatch
по следующему коду (пытаясь установить batchSize на stream.read.batch.size
)
val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()
val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}
В настоящее время мы устанавливаем stream.read.batch.size
в 128но, кажется, это не работает.BatchSize кажется случайным, иногда более 1000, даже 10000.
Однако я не хочу ждать так долго (10000 записей), потому что мне нужно выполнить некоторые операции (в комментарии к коду // some operation
)как можно скорее, чтобы я хотел контролировать максимальный размер пакета, чтобы, когда записи достигли этого ограничения, его можно было обработать немедленно, как это сделать?