Я пытаюсь поиграть с расширенным состоянием Флинка в простом случае.
Я просто хочу умножить поток целых чисел на другое целое число в широковещательном потоке.
Поведение моей трансляции«странно», если я добавлю слишком мало элементов в свой поток ввода (например, 10), ничего не произойдет, и мой MapState
будет пустым, но если я добавлю больше элементов (например, 100), я получу желаемое поведение (умножьте целое числопоток на 2 здесь).
Почему вещательный поток не учитывается, если я дал слишком мало элементов?
Как я могу контролировать, когда вещательный потокработает?
Необязательно: Я хочу сохранить только последний элемент моего широковещательного потока, .clear()
хороший способ?
Спасибо!
Вот мои BroadcastProcessFunction
:
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] {
override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = {
val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
if (currentBroadcastState.isEmpty) {
out.collect(value)
} else {
out.collect(currentBroadcastState.last.getValue * value)
}
}
override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = {
// Keep only last state
ctx.getBroadcastState(State.mapState).clear()
// Add state
ctx.getBroadcastState(State.mapState).put("key", value)
}
}
И мои MapState
:
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.scala._
object State {
val mapState: MapStateDescriptor[String, Int] =
new MapStateDescriptor(
"State",
createTypeInformation[String],
createTypeInformation[Int]
)
}
И мои Main
:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object Broadcast {
def main(args: Array[String]): Unit = {
val numberElements = 100
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val broadcastStream = env.fromElements(2).broadcast(State.mapState)
val input = (1 to numberElements).toList
val inputStream = env.fromCollection(input)
val outputStream = inputStream
.connect(broadcastStream)
.process(new BroadcastProcess())
outputStream.print()
env.execute()
}
}
Редактировать: я использую Flink 1.5, документация Broadcast State здесь здесь .