Как перебрать DataStream - PullRequest
0 голосов
/ 10 мая 2019

Я новичок в скале. У меня есть собственный класс Analytics.scala, который имеет несколько переменных (var a, var b, var c). В моем тестовом примере я получаю DataStream типа Analytics и хочу установить значение var c равным 0 для каждого объекта.

Я пытался использовать функцию map поверх DataStream, но это не помогло. Я также попытался преобразовать поток в список, а затем перебрать этот список, но это тоже не сработало.

поток имеет тип DataStream [Analytics]. Вот что я пробовал:

stream.map(x => x.c=0)
val a = DataStreamUtils.collect(stream.javaStream).asScala.toArray.iterator
a.foreach(x => x.c=0)

значение var c не меняется на 0 в моем тестовом случае.

Ответы [ 2 ]

1 голос
/ 11 мая 2019

В общем, Flink DataStream - это не конечная коллекция, которую вы можете повторить один раз и все готово - это потенциально неограниченный поток, который просто продолжает содержать больше данных.

Использование карты - правильный путь. Но когда вы применяете карту к потоку, как в

stream.map(x => x.c=0)

вы описываете преобразование потока, а не изменяете сам поток. Вместо этого вы должны попробовать

streamWhereCisZero = stream.map(x => x.c=0)

Это создает новый поток, в котором каждый элемент будет иметь нулевое значение.

0 голосов
/ 10 мая 2019

Вот как я итерировал.Не уверен, что это лучшее решение.

val collection = DataStreamUtils.collect(stream.javaStream)
val results: Seq[Analytics] = collection.asScala.toSeq
for (result <- results){
    result.c=0
}
...