Как объединить три или более потоков данных / таблиц по заданному ключу и общему окну с помощью API данных или Flink Table API / SQL? - PullRequest
0 голосов
/ 13 июня 2018

Я хочу присоединиться к трем или более потокам данных или таблицам по заданному ключу и общему окну.Однако я не знаю, как правильно написать код.В официальном документе https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/ приведен пример, приведенный ниже, однако он просто объединяет два потока данных, так как объединить три или более потоков данных по заданному ключу и общему окну?

dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

Я пытался выяснить, что я сначала соединяю два потока данных с общим окном и использую поток данных результата для соединения с третьим потоком данных с общим окном?Однако кажется, что семантика времени события для этих трех потоков данных будет изменена, когда мы установим TimeCharacteristic на время события.

=================

Тот же вопрос для FlinK Table API и SQL, как объединить три или более таблиц по заданному ключу и общему окну?В официальном документе https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html просто приведите приведенный ниже пример для одной таблицы.

Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
"  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
"  SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

Я попытался написать SQL, как показано ниже, для объединения трех таблиц с заданным ключом и общим окном, однако яне думаю, что это правильно.

String SQL = "SELECT" +
            " grades.user1  , SUM(salaries.amount)   FROM grades " +
            " INNER JOIN salaries ON   grades.user1 =   salaries.user1 " +
            " INNER JOIN person ON   grades.user1 =   person.user1 "+
             "GROUP BY grades.user1, TUMBLE(grades.proctime,  INTERVAL '5' SECOND) "   

Так каков правильный способ объединения трех или более потоков данных / таблиц по заданному ключу и общего окна с помощью API datastrem или Flink Table API / SQL?

Обновите от 16.06.2008, чтобы сделать вопрос более понятным.

Для Flink SQL, как и приведенный ниже псевдокод, мне понадобилось объединить три таблицыс общим TumblingEventTimeWindow, то есть альтернативной версией API DataStream, однако выраженной Flink SQL, что означает также означает объединение всех событий из трех таблиц, которые произошли в одном и том же TumblingEventTimeWindow.

SELECT A.a, B.b, C.c
FROM A, B, C
WHERE A.x = B.x AND A.x = C.x AND
window(TumblingEventTimeWindows.of(Time.seconds(3))

Кажется, что функция объединения также упоминается в следующем документе разработки Flink: «Соединения потока с потоком во время события объединяются в потоки двух потоков,Я не знаю, реализован ли в Flink SQL этот тип функции объединения Flink SQL.

https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit#

1 Ответ

0 голосов
/ 15 июня 2018

Трудно дать однозначный ответ на ваш вопрос, потому что семантика нужного вам объединения неясна.Семантика реализации оконного объединения API-интерфейса DataStream отличается от оконного объединения API-интерфейса таблиц / SQL.

В API-интерфейсе DataStream можно просто определить другое объединение следующим образом:

firstStream
  .join(secondStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})
  .join(thirdStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

Поскольку Flink реализует стандартный SQL, вы можете определить объединение трех таблиц как обычно:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL '10' MINUTE AND B.ts + INTERVAL '10' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL '10' MINUTE AND C.ts + INTERVAL '10' MINUTE

Диапазоны окон (A.ts BETWEEN B.ts - X AND B.ts + Y) может быть определено при необходимости.

...