Обновить одновременную карту внутри потоковой карты на Flink - PullRequest
0 голосов
/ 20 мая 2019

У меня есть один поток, который постоянно передает последние значения некоторых ключей.

Поток A: DataStream[(String,Double)]

У меня есть другой поток, который хочет получить последнее значение при каждом вызове процесса,

Мой подход состоял в том, чтобы ввести concurrentHashMap, который будет обновляться потоком А и считываться вторым потоком.

val rates = new concurrentHasMap[String,Double].asScala
val streamA : DataStream[(String,Double)]= ???
streamA.map(keyWithValue => rates(keyWithValue._1)= keyWithValue._2) //rates never gets updated
rates("testKey")=2 //this works
val streamB: DataStream[String] = ???
streamB.map(str=> rates(str)  // rates does not contain the values of the streamA at this point
  //some other functionality
) 

Можно ли обновить карту параллельного потока из потока?Любое другое решение по обмену данными из потока с другим также приемлемо

1 Ответ

4 голосов
/ 20 мая 2019

Поведение, которое вы пытаетесь использовать, не будет работать распределенным образом, в основном, если у вас будет parellelism> 1, оно не будет работать. В вашем коде rates фактически обновляются, но в другом экземпляре параллельного оператора.

На самом деле, в этом случае вы хотели бы использовать BroadcastState, который был разработан для решения именно той проблемы, с которой вы столкнулись.

В вашем конкретном случае использования это будет выглядеть примерно так:

val streamA : DataStream[(String,Double)]= ???
val streamABroadcasted = streamA.broadcast(<Your Map State Definition>)
val streamB: DataStream[String] = ???
streamB.connect(streamABroadcasted)

Тогда Вы можете легко использовать BroadcastProcessFunction для реализации Вашей логики. Подробнее о шаблоне состояния вещания можно найти здесь

...