Контроль порядка обработанных элементов в CoProcessFunction с использованием пользовательских источников - PullRequest
0 голосов
/ 05 апреля 2020

В целях тестирования я использую следующий пользовательский источник:

class ThrottledSource[T](
  data: Array[T],
  throttling: Int,
  beginWaitingTime: Int = 0,
  endWaitingTime: Int = 0
) extends SourceFunction[T] {

  private var isRunning = true
  private var offset = 0

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    Thread.sleep(beginWaitingTime)

    val lock = ctx.getCheckpointLock

    while (isRunning && offset < data.length) {
      lock.synchronized {
        ctx.collect(data(offset))
        offset += 1
      }
      Thread.sleep(throttling)
    }

    Thread.sleep(endWaitingTime)
  }

  override def cancel(): Unit = isRunning = false

и использую его следующим образом в своем тесте

val controlStream = new ThrottledSource[Control](
  data = Array(c1,c2), endWaitingTime = 10000, throttling = 0,
)

val dataStream = new ThrottledSource[Event](
  data = Array(e1,e2,e3,e4,e5),
  throttling = 1000,
  beginWaitingTime = 2000,
  endWaitingTime = 2000,
)

val dataStream = env.addSource(events)

env.addSource(controlStream)
  .connect(dataStream)
  .process(MyProcessFunction)

Я хочу получить все элементы управления во-первых (поэтому я не указываю ни beginWaitingTime, ни throttling). В processElement1 и processElement2 в MyProcessFunction я печатаю элементы, когда получаю их. В большинстве случаев сначала я получаю два элемента управления, как и ожидалось, но для меня, как это ни удивительно, время от времени я сначала получаю элементы данных, несмотря на двухсекундную задержку, которая используется источником данных для начала излучения его элементов. Кто-нибудь может мне это объяснить?

1 Ответ

1 голос
/ 05 апреля 2020

Операторы источника управления и потока данных работают в разных потоках, и, как вы видели, нет никакой гарантии, что экземпляр источника, на котором выполняется поток управления, получит шанс запустить его до запуска потока данных.

Вы можете посмотреть ответ здесь и связанный с ним код на github , чтобы найти способ надежно выполнить sh.

...