Как написать функцию внешнего соединения в Flink с помощью Connect? - PullRequest
0 голосов
/ 20 февраля 2020

Я хочу выполнить внешнее объединение для 2 потоков данных, и лучше не помещать их в окно (я видел, что Cogroup всегда поставляется с окном).

Я пробовал это:

    val controlStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream1_feat1"),
      (1, "mex2", "stream1_feat2")
    ).keyBy(x => (x._1, x._2))

    val wordStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream2_feat1"),
      (1, "mex3", "stream2_feat3")
    ).keyBy(x => (x._1, x._2))

    val filteredStream = controlStream
        .connect(wordStream)
        .flatMap(new ControlFunction)

////////////////////////////////////////////////////////////////////////

class ControlFunction extends RichCoFlatMapFunction[
    (Int, String, String),
    (Int, String, String),
    (Int, String, String, String)] {

    // outer join
    private var state1: ValueState[(Int, String, String)] = _
    private var state2: ValueState[(Int, String, String)] = _

    override def open(parameters: Configuration): Unit = {
      state1 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))

      state2 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))

    }

    override def flatMap1(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state2Value = state2.value

      if (state2Value != null) {
        println("inside map1 not null")
        state2.clear()
        out.collect((value._1, value._2, value._3, state2Value._3))
      } else {
        println("inside map1 null")
        state1.update(value)
        out.collect((value._1, value._2, value._3, "NA"))
      }
    }

    override def flatMap2(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state1Value = state1.value

      if (state1Value != null) {
        println("inside map2 not null")
        state1.clear()
        out.collect((value._1, value._2, state1Value._3, value._3))
      } else {
        println("inside map2 null")
        state2.update(value)
        out.collect((value._1, value._2, "NA", value._3))
      }
    }

  }

Что дало мне:

5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

Где запись (1,mex1,stream1_feat1,NA) не должна производиться. В результате я хочу получить внешнее соединение:

5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

Получив оператор печати, я обнаружил, что 2 flapMaps были переданы в последовательности, что привело к тому, что mex1 было произведено дважды, в любом случае решить эту проблему?

Заранее спасибо!

1 Ответ

1 голос
/ 20 февраля 2020

Нельзя ожидать, что потоковое внешнее объединение будет вести себя так же, как пакетное внешнее соединение. Пакетное внешнее объединение может полностью сканировать обе входные таблицы и будет выдавать только выходные строки, содержащие нули, если совпадающие записи не существуют. При реализации потоковой передачи вы не можете знать, сможете ли вы, ожидая, в конечном итоге получить соответствующую запись.

Поскольку они не могут получить доступ к будущему, приложения потоковой обработки часто вынуждены выводить поток, содержащий результаты, которые обновляются по мере поступления дополнительной информации.

Одна вещь, которую вы могли бы сделать можно было бы подождать некоторое время, чтобы увидеть, будет ли ошибкой выдавать результат, содержащий NA, но в конечном итоге вам придется прекратить ждать и получить результат.

Обратите внимание, что Table API Flink имеет внешнее объединение, но вы заметите, что оно помечено как "Обновление результатов" по причинам, описанным выше.

...