Метод Spark Streaming Store работает только в окне Duration, но не в рабочем процессе foreachRDD в настроенном приемнике. - PullRequest
0 голосов
/ 26 марта 2020

Я определяю получателя для чтения данных из Redis.

часть упрощенного кода получателя:

class MyReceiver extends Receiver (StorageLevel.MEMORY_ONLY){
  override def onStart() = {
    while(!isStopped) {
      val res = readMethod()
      if (res != null) store(res.toIterator) 
      // using res.foreach(r => store(r)) the performance is almost the same
    }
  }
}

Мой поток операций:

val ssc = new StreamingContext(spark.sparkContext, new Duration(50))
val myReceiver = new MyReceiver()
val s = ssc.receiverStream(myReceiver)
s.foreachRDD{ r => 
  r.persist()
  if (!r.isEmpty) {
    some short operations about 1s in total
    // note this line ######1
  }
}

У меня есть производитель, который производит намного быстрее, чем потребитель, так что теперь в Redis есть много записей, я проверил с номером 10000. Я отладил, и все записи можно было быстро прочитать после того, как они в Redis на readMethod() выше. Тем не менее, в каждой микробатке я могу получить только 30 записей . (Если хранилище достаточно быстрое, оно должно получить все 10000)

С этим подозреваемым я добавил код 10 секунд ожидания Thread.sleep(10000) к ######1 выше. Каждая микропакета по-прежнему получает около 30 записей , и время обработки каждой микропакеты увеличивается на 10 секунд. И если я увеличу Длительность до 200 мс, val ssc = new StreamingContext(spark.sparkContext, new Duration(200)), он может получить около 120 записей.

Все эти показы при потоковой генерации только сгенерируют СДР в Duration? После получения RDD и в основном рабочем процессе метод store временно остановлен? Но это большая трата, если это правда. Я хочу, чтобы он также генерировал RDD (хранилище) во время работы основного рабочего процесса.

Есть идеи?

1 Ответ

0 голосов
/ 08 апреля 2020

Я не могу оставить комментарий, просто у меня недостаточно репутации. Возможно ли, что свойство spark.streaming.receiver.maxRate установлено где-то в вашем коде?

...