Apache мигает состояние вещания сбрасывается - PullRequest
0 голосов
/ 21 мая 2019

Я использую шаблон широковещания для соединения двух потоков и чтения данных из одного в другой.Код выглядит следующим образом

case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
  override def processBroadcastElement(in2: (String, Double), 
                                       context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
                                       collector:Collector[MyObject]):Unit={
    context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
  }

  override def processElement(obj: MyObject,
                            readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double), 
                            MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
    val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
    //If I print the context of the state here sometimes it is empty.
    out.collect(MyObject(new, properties, go, here))
  }
}

Дескриптор состояния:

val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])

Мой код выполнения выглядит следующим образом.

val streamA :DataStream[MyObject] = ... 
val streamB :DataStream[(String,Double)] = ... 
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)

streamA.connect(streamB).process(new Broadcast)

Проблема в processElement Функция состояния иногда пуста, а иногда нет.Состояние должно всегда содержать данные, поскольку я постоянно выполняю потоковую передачу из файла, который, как я знаю, содержит данные.Я не понимаю, почему это сбрасывает состояние, и я не могу получить данные.

Я попытался добавить некоторую печать в processBroadcastElement до и после перевода данных в состояние, и в результате получается следующее

0 - 1
1 - 2 
2 - 3 
.. all the way to 48 where it resets back to 0

ОБНОВЛЕНИЕ: кое-что, что я заметил, когда я уменьшаю значение тайм-аута контекста потокового выполнения, результаты немного лучше.когда я его увеличиваю, карта всегда пуста.

env.setBufferTimeout(1) //better results 
env.setBufferTimeout(200) //worse result (default is 100)

Ответы [ 2 ]

0 голосов
/ 22 мая 2019

Как сказал Дэвид, работа может быть возобновлена. Я отключил контрольные точки, чтобы я мог видеть любое возможное исключение вместо тихого сбоя и перезапуска задания.

Оказалось, что при попытке разбора файла произошла ошибка. Таким образом, работа продолжала перезапускаться, поэтому состояние было пустым, а мигание продолжало потреблять поток снова и снова.

0 голосов
/ 21 мая 2019

Всякий раз, когда два потока соединяются в Flink, вы не можете контролировать время, с которым Flink будет доставлять события из двух потоков в вашу пользовательскую функцию. Так, например, если есть событие, доступное для обработки из streamA, и событие, доступное для обработки из streamB, любое из них может быть обработано следующим. Вы не можете ожидать, что broadcastedStream каким-то образом будет иметь приоритет над другим потоком.

Существуют различные стратегии, которые вы можете использовать, чтобы справиться с этой гонкой между двумя потоками, в зависимости от ваших требований. Например, вы можете использовать KeyedBroadcastProcessFunction и использовать его метод applyToKeyedState для итерации по всему существующему ключевому состоянию при получении нового широковещательного события.

...