Я не совсем уверен, правильно ли я понял ваш вариант использования, здесь определенно поможет пример с некоторыми точками данных.
Позвольте мне объяснить, что делает ваш код. Сначала в двух таблицах подсчитывается, сколько кликов / показов было за последние 24 часа. Для ввода
new Event("1", "1", "ABC", "...", 1),
new Event("1", "2", "ABC", "...", 2),
new Event("1", "3", "ABC", "...", 3),
new Event("1", "4", "ABC", "...", 4)
вы получите windows (массив , window_start, window_end, rowtime):
[1], 1969-12-31-01T00:01:00.000, 1970-01-01T00:01:00.000, 1970-01-01T00:00:59.999
[1, 2], 1969-12-31-01T00:02:00.000, 1970-01-01T00:02:00.000, 1970-01-01T00:01:59.999
[1, 2, 3], 1969-12-31-01T00:03:00.000, 1970-01-01T00:03:00.000, 1970-01-01T00:02:59.999
...
Следовательно, когда вы группируете и по id, и по имени, вы получить sth, например:
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:00:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:01:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:02:59.999
...
, который, если вы снова сгруппируете через 24 часа windows, вы будете подсчитывать каждое событие с тем же идентификатором несколько раз.
Если я правильно понимаю ваш вариант использования и вы ищете, сколько показов произошло за 1 минуту после появления клика, возможно, вы ищете интервальное соединение . Вы можете реализовать свой случай с помощью следующего запроса:
Table clicks = eventsTable
.where($("eventType").isEqual("click"))
.select(
$("impressionId").as("clickImpressionId"),
concat($("id"), "_", $("name")).as("concatClickId"),
$("id").as("clickId"),
$("name").as("clickName"),
$("eventTime").as("clickEventTime")
);
Table impressions = eventsTable
.where($("eventType").isEqual("impression"))
.select(
$("impressionId").as("impressionImpressionId"),
concat($("id"), "_", $("name")).as("concatImpressionId"),
$("id").as("impressionId"),
$("name").as("impressionName"),
$("eventTime").as("impressionEventTime")
);
Table table = impressions.join(
clicks,
$("clickImpressionId").isEqual($("impressionImpressionId"))
.and(
$("clickEventTime").between(
$("impressionEventTime").minus(lit(1).minutes()),
$("impressionEventTime"))
))
.select($("concatClickId"), $("impressionEventTime"));
table
.window(Slide.over("24.hour").every("1.minute").on("impressionEventTime").as("minuteWindow"))
.groupBy($("concatClickId"), $("minuteWindow"))
.select($("concatClickId"), $("concatClickId").count())
.execute()
.print();
Что касается того, почему Flink иногда не может создавать поток добавления, а только убирает поток см. . Вкратце, если операция не работает на основе атрибута времени, не существует единого момента времени, когда результат был бы «действительным». Поэтому он должен выдавать поток изменений вместо одного добавленного значения. Первое поле в кортеже сообщает вам, является ли запись вставкой (истина) или ретракцией / удалением (ложь).