Поведение Флинка - PullRequest
       65

Поведение Флинка

0 голосов
/ 25 июня 2018

Я пытаюсь поиграть с расширенным состоянием Флинка в простом случае.

Я просто хочу умножить поток целых чисел на другое целое число в широковещательном потоке.

Поведение моей трансляции«странно», если я добавлю слишком мало элементов в свой поток ввода (например, 10), ничего не произойдет, и мой MapState будет пустым, но если я добавлю больше элементов (например, 100), я получу желаемое поведение (умножьте целое числопоток на 2 здесь).

Почему вещательный поток не учитывается, если я дал слишком мало элементов?

Как я могу контролировать, когда вещательный потокработает?

Необязательно: Я хочу сохранить только последний элемент моего широковещательного потока, .clear() хороший способ?

Спасибо!

Вот мои BroadcastProcessFunction:

import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._

class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] {
  override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = {
    val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
    if (currentBroadcastState.isEmpty) {
      out.collect(value)
    } else {
      out.collect(currentBroadcastState.last.getValue * value)
    }
  }

  override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = {
    // Keep only last state
    ctx.getBroadcastState(State.mapState).clear()
    // Add state
    ctx.getBroadcastState(State.mapState).put("key", value)
  }
}

И мои MapState:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.scala._

object State {
  val mapState: MapStateDescriptor[String, Int] =
    new MapStateDescriptor(
      "State",
      createTypeInformation[String],
      createTypeInformation[Int]
    )
}

И мои Main:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object Broadcast {
  def main(args: Array[String]): Unit = {
    val numberElements = 100
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val broadcastStream = env.fromElements(2).broadcast(State.mapState)
    val input = (1 to numberElements).toList
    val inputStream = env.fromCollection(input)
    val outputStream = inputStream
      .connect(broadcastStream)
      .process(new BroadcastProcess())
    outputStream.print()
    env.execute()
  }
}

Редактировать: я использую Flink 1.5, документация Broadcast State здесь здесь .

1 Ответ

0 голосов
/ 25 июня 2018

Flink не синхронизирует поступление потоков, то есть потоки выдают данные, как только могут. Это верно для обычных и широковещательных входов. BroadcastProcess не будет ожидать поступления первого широковещательного входа, прежде чем будет принят обычный вход.

Когда вы добавляете больше чисел в обычный ввод, просто требуется больше времени для сериализации, десериализации и обслуживания входа таким образом, что вход широковещания уже присутствует, когда поступает первое обычное число.

...