У меня есть простой поток, если данные имеют следующую форму:
id | name | eventType | eventTime
----------------------------------
1 A PLAY (ts of when the client fired the event)
1 B IMPRESSION
2 A CLICK
Конечная цель состоит в том, чтобы рассчитать сумму события CLIP eventType, деленную на сумму события EventType сгруппированного типа IMPRESSION. по идентификатору и имени для падающего окна продолжительностью 60 секунд.
в чистом SQL это будет выглядеть как
SELECT d.id, d.name, d.impressionCount, d.clickCount, d.clickCount / d.impressionCount * 100.0 FROM
( SELECT i.id, i.name, count(*) as clickCount, c.impressionCount from events as i
LEFT JOIN
(
SELECT id, name, count(*) as impressionCount from events WHERE event_type = 'IMPRESSION' GROUP BY id,name
) as c
ON i.id = c.id and i.name = c.name
WHERE event_type = 'CLICK'
GROUP BY i.id, i.name
) as d
Поэтому мне сначала нужно создать столбец с количеством щелчков и новый столбец с номером показа, а затем я использую эту таблицу для деления.
Мой вопрос: что лучше всего сделать с Flink APis? Я попытался сделать это:
Table clickCountTable = eventsTable
.where("eventType = 'CLICK'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as id, eventType.count as clickCount, minuteWindow.rowtime as minute");
и то же самое для показа, и затем я присоединяюсь к этим двум столам. Но я не получаю правильный результат, и я не уверен, что это лучший способ добиться того, что я хочу сделать, используя окно труб.
РЕДАКТИРОВАТЬ :
Вот как я преобразовываю поток в таблицы:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
[.....]
DataStream<EventWithCount> eventStreamWithTime = eventStream
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventWithCount>() {
@Override
public long extractAscendingTimestamp(EventWithCount element) {
try {
DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Date parsedDate = df1.parse(element.eventTime);
Timestamp timestamp = new java.sql.Timestamp(parsedDate.getTime());
return timestamp.getTime();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}});
tEnv.fromDataStream(eventStreamWithTime, "id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);