Среднее из нескольких разных полей из одного потока - PullRequest
0 голосов
/ 16 апреля 2019

Я еще не выбрал потоковый фреймворк, но сейчас я возиться с Флинком. Но я открыт для использования Beam, Spark Streaming, и все, что я выясню, подходит для моего использования. Как бы вы сделали эквивалент следующего SQL:

SELECT a,b,c, avg(d), avg(e), ..., avg(z)
FROM whatever
GROUP BY a,b,c,d,e, ..., z

Похоже, что для Флинка среднее значение достигается через AggregateFunction https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java#L61

Но я не понимаю, как вы делаете этот «масштаб». Похоже, что это много шаблонных только для среднего поля. Что если у меня есть несколько разных потоков с разными полями, которые мне нужно усреднить?

Делает ли это что-либо из: Flink, Beam, структурированного потокового вещания и т. Д. Проще?

В качестве примечания, есть ли простой способ эмулировать этот приятный маленький синтаксис фильтра подсчета из Postgres,

SELECT
  COUNT(*) AS unfiltered,
  COUNT(*) FILTER (WHERE some_condition) AS filtered
FROM whatever

1 Ответ

1 голос
/ 16 апреля 2019

Как правило, в заданиях Flink я создаю определенные пользовательские функции как отдельные классы, которые затем я могу применять к любым полям, которые мне нравятся. У Flink также есть SQL API, с которым я не так хорошо знаком, но вот пример, основанный на коде, который я нашел здесь (https://gist.github.com/mustafaakin/457859b8bf703c64029071c1139b593d):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment table = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
DataStream<Tuple3<String, Double, Time>> dataset = text.map(...);

table.registerDataStream("dataset", dataset, "p1, p2, p3");
String query = "SELECT p1, AVG(p2) AS avgp2 FROM dataset GROUP p1";
Table tableResult = table.sql(query);

// print to System.out
table.toAppendStream(tableResult, Row.class).print();

env.execute();

Я бы также посмотрел на Apache Ignite для потоковой передачи данных с SQL-запросами. Я никогда не использовал это сам, но я слышал хорошие вещи.

...