Как установить максимальное количество строк в одной микропакете? - PullRequest
2 голосов
/ 20 июня 2019

Я читаю пакетную запись из Redis, используя потоковую обработку с искрой foreachBatch по следующему коду (пытаясь установить batchSize на stream.read.batch.size)

val data = spark.readStream.format("redis")
  .option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch { 
  (batchDF: DataFrame, batchId: Long) => ...
  // we count size of batchDF here, we want to limit its size
  // some operation
}

В настоящее время мы устанавливаем stream.read.batch.size в 128но, кажется, это не работает.BatchSize кажется случайным, иногда более 1000, даже 10000.

Однако я не хочу ждать так долго (10000 записей), потому что мне нужно выполнить некоторые операции (в комментарии к коду // some operation)как можно скорее, чтобы я хотел контролировать максимальный размер пакета, чтобы, когда записи достигли этого ограничения, его можно было обработать немедленно, как это сделать?

Ответы [ 2 ]

3 голосов
/ 20 июня 2019

Я поддерживаю spark-redis.В настоящее время это не поддерживается.Параметр stream.read.batch.size управляет количеством элементов, считываемых одним вызовом Redis API (параметр count вызова XREADGROUP).Это не влияет на количество элементов на триггер (размер batchDF).Я открыл билет на github для этого запроса.

1 голос
/ 24 июня 2019

мы хотим ограничить его размер

Вы можете использовать Dataset.limit для ограничения потоковой передачи (по крайней мере, в Spark 2.4.3).

При этом,код может выглядеть следующим образом:

val data = spark
  .readStream
  .format("redis")
  .load
  .limit(...your limit here...)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...