Как рассчитать две суммы на одном наборе данных с помощью Apache Flink - PullRequest
1 голос
/ 15 января 2020

У меня есть простой поток, если данные имеют следующую форму:

id | name | eventType | eventTime
----------------------------------
1    A       PLAY        (ts of when the client fired the event)
1    B       IMPRESSION
2    A       CLICK

Конечная цель состоит в том, чтобы рассчитать сумму события CLIP eventType, деленную на сумму события EventType сгруппированного типа IMPRESSION. по идентификатору и имени для падающего окна продолжительностью 60 секунд.

в чистом SQL это будет выглядеть как

SELECT d.id, d.name, d.impressionCount, d.clickCount,  d.clickCount / d.impressionCount * 100.0 FROM
( SELECT i.id, i.name, count(*) as clickCount, c.impressionCount from events as i
LEFT JOIN
 (
    SELECT id, name, count(*) as impressionCount from events WHERE event_type = 'IMPRESSION' GROUP BY id,name
 ) as c
ON i.id = c.id and i.name = c.name
WHERE event_type = 'CLICK' 
 GROUP BY i.id, i.name
) as d

Поэтому мне сначала нужно создать столбец с количеством щелчков и новый столбец с номером показа, а затем я использую эту таблицу для деления.

Мой вопрос: что лучше всего сделать с Flink APis? Я попытался сделать это:

Table clickCountTable = eventsTable
                .where("eventType = 'CLICK'")
                .window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
                .groupBy("id, name, minuteWindow")
                .select("concat(concat(id,'_'), name) as id, eventType.count as clickCount, minuteWindow.rowtime as minute");

и то же самое для показа, и затем я присоединяюсь к этим двум столам. Но я не получаю правильный результат, и я не уверен, что это лучший способ добиться того, что я хочу сделать, используя окно труб.

РЕДАКТИРОВАТЬ :

Вот как я преобразовываю поток в таблицы:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

[.....]
DataStream<EventWithCount> eventStreamWithTime = eventStream
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventWithCount>() {
                @Override
                public long extractAscendingTimestamp(EventWithCount element) {
                    try {
                        DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
                        Date parsedDate = df1.parse(element.eventTime);
                        Timestamp timestamp = new java.sql.Timestamp(parsedDate.getTime());
                        return timestamp.getTime();
                    } catch (Exception e) {
                        throw new RuntimeException(e.getMessage());
                    }
                }});
tEnv.fromDataStream(eventStreamWithTime, "id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);

1 Ответ

1 голос
/ 17 января 2020

Ваш запрос API таблицы для подсчета событий CLICK на id и name в минуту выглядит хорошо.

Table clickCountTable = eventsTable
  .where("eventType = 'CLICK'")
  .window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
  .groupBy("id, name, minuteWindow")
  .select("concat(concat(id,'_'), name) as clickId, eventType.count as clickCount, minuteWindow.rowtime as clickMin");

Сделайте то же самое для IMPRESSION:

Table impressionCountTable = eventsTable
  .where("eventType = 'IMPRESSION'")
  .window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
  .groupBy("id, name, minuteWindow")
  .select("concat(concat(id,'_'), name) as impId, eventType.count as impCount, minuteWindow.rowtime as impMin");

Наконец, вы должны объединить обе таблицы:

Table result = impressionCountTable
  .leftOuterJoin(clickCountTable, "impId = countId && impMin = countMin")
  .select("impId as id, impMin as minute, clickCount / impCount as ratio")

Обратите внимание на условие объединения impMin = countMin. Это превратит объединение в объединение с временным окном с минимальным размером окна 1 миллисекунду (мс - гранулярность времени в Flink SQL).

Вы сказали, что запрос вел себя не так, как вы ожидали. Можете ли вы точнее указать c ожидаемый и фактический результат?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...