У меня есть один поток, который постоянно передает последние значения некоторых ключей.
Поток A: DataStream[(String,Double)]
У меня есть другой поток, который хочет получить последнее значение при каждом вызове процесса,
Мой подход состоял в том, чтобы ввести concurrentHashMap
, который будет обновляться потоком А и считываться вторым потоком.
val rates = new concurrentHasMap[String,Double].asScala
val streamA : DataStream[(String,Double)]= ???
streamA.map(keyWithValue => rates(keyWithValue._1)= keyWithValue._2) //rates never gets updated
rates("testKey")=2 //this works
val streamB: DataStream[String] = ???
streamB.map(str=> rates(str) // rates does not contain the values of the streamA at this point
//some other functionality
)
Можно ли обновить карту параллельного потока из потока?Любое другое решение по обмену данными из потока с другим также приемлемо