Я определяю получателя для чтения данных из Redis.
часть упрощенного кода получателя:
class MyReceiver extends Receiver (StorageLevel.MEMORY_ONLY){
override def onStart() = {
while(!isStopped) {
val res = readMethod()
if (res != null) store(res.toIterator)
// using res.foreach(r => store(r)) the performance is almost the same
}
}
}
Мой поток операций:
val ssc = new StreamingContext(spark.sparkContext, new Duration(50))
val myReceiver = new MyReceiver()
val s = ssc.receiverStream(myReceiver)
s.foreachRDD{ r =>
r.persist()
if (!r.isEmpty) {
some short operations about 1s in total
// note this line ######1
}
}
У меня есть производитель, который производит намного быстрее, чем потребитель, так что теперь в Redis есть много записей, я проверил с номером 10000. Я отладил, и все записи можно было быстро прочитать после того, как они в Redis на readMethod()
выше. Тем не менее, в каждой микробатке я могу получить только 30 записей . (Если хранилище достаточно быстрое, оно должно получить все 10000)
С этим подозреваемым я добавил код 10 секунд ожидания Thread.sleep(10000)
к ######1
выше. Каждая микропакета по-прежнему получает около 30 записей , и время обработки каждой микропакеты увеличивается на 10 секунд. И если я увеличу Длительность до 200 мс, val ssc = new StreamingContext(spark.sparkContext, new Duration(200))
, он может получить около 120 записей.
Все эти показы при потоковой генерации только сгенерируют СДР в Duration
? После получения RDD и в основном рабочем процессе метод store
временно остановлен? Но это большая трата, если это правда. Я хочу, чтобы он также генерировал RDD (хранилище) во время работы основного рабочего процесса.
Есть идеи?