Flink - Присоединяйтесь к тому же потоку, чтобы отфильтровать некоторые события - PullRequest
2 голосов
/ 07 августа 2020

У меня есть поток данных, который выглядит следующим образом:

impressionId | id | name | eventType | timestamp

Мне нужно отфильтровать (игнорировать) событие типа «щелчок», которое не имеет соответствующего 'impressionId' типа 'impression '(поэтому в основном игнорируйте события кликов, которые не имеют показа), а затем подсчитайте, сколько всего у меня показов и сколько у меня кликов (для пары идентификатор / имя) для определенного временного окна.

Вот как я подошел к решению:

[...]
Table eventsTable = tEnv.fromDataStream(eventStreamWithTimeStamp, "impressionId, id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
   

Table clicksTable = eventsTable
      .where("eventType = 'click'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, id, name, eventType, minuteWindow")
      .select("impressionId as clickImpressionId, eventType as clickEventType, concat(concat(id,'_'), name) as concatClickId, id as clickId, name as clickName, minuteWindow.rowtime as clickMinute");

Table impressionsTable = eventsTable
      .where("eventType = 'impression'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, id, name, eventType, minuteWindow")
      .select("impressionId as impressionImpressionId, eventType as impressionEventType, concat(concat(id,'_'), name) as concatImpId, id as impId, name as impName, minuteWindow.rowtime as impMinute");

Table filteredClickCount = clicksTable
      .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
      .window(Slide.over("24.hour").every("1.minute").on("clickMinute").as("minuteWindow"))
      .groupBy("concatClickId, clickMinute")
      .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
 DataStream<Test3> result = tEnv.toAppendStream(filteredClickCount, Test3.class);
result.print();

То, что я пытаюсь сделать, это просто создать две таблицы, одну с кликами, а другую с показами, «внутреннее» соединение кликов с показами и ту, которая соединены, это означает, что это клики, которые производят совпадающий показ.

Теперь это не работает, и я не знаю почему!?

счетчик, произведенный последней объединенной таблицей, не верный. Он работает в течение первой минуты, но после этого счетчик уменьшается почти вдвое.

Затем я попытался изменить последнюю таблицу следующим образом:

Table clickWithMatchingImpression2 = clicksTable
      .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
      .groupBy("concatClickId, clickMinute")
      .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");

DataStream<Tuple3<Boolean, Tuple3>> result2 = tEnv.toRetractStream(clickWithMatchingImpression2, Test3.class);
    result2.print();

И .... это работает !? Однако я не знаю почему, и я не знаю, что делать с этим DataStream. > формат ... Flink отказывается использовать toAppendStream, когда в таблице нет окна. Мне нужна простая структура, содержащая только последние числа.

1) Правильный ли мой подход? Есть ли более простой способ отфильтровать клики, по которым нет показов?

2) Почему в моем решении подсчеты неверны?

1 Ответ

1 голос
/ 10 августа 2020

Я не совсем уверен, правильно ли я понял ваш вариант использования, здесь определенно поможет пример с некоторыми точками данных.

Позвольте мне объяснить, что делает ваш код. Сначала в двух таблицах подсчитывается, сколько кликов / показов было за последние 24 часа. Для ввода

new Event("1", "1", "ABC", "...", 1),
new Event("1", "2", "ABC", "...", 2),
new Event("1", "3", "ABC", "...", 3),
new Event("1", "4", "ABC", "...", 4)

вы получите windows (массив , window_start, window_end, rowtime):

[1], 1969-12-31-01T00:01:00.000, 1970-01-01T00:01:00.000, 1970-01-01T00:00:59.999
[1, 2], 1969-12-31-01T00:02:00.000, 1970-01-01T00:02:00.000, 1970-01-01T00:01:59.999
[1, 2, 3], 1969-12-31-01T00:03:00.000, 1970-01-01T00:03:00.000, 1970-01-01T00:02:59.999
...

Следовательно, когда вы группируете и по id, и по имени, вы получить sth, например:

1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:00:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:01:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:02:59.999
...

, который, если вы снова сгруппируете через 24 часа windows, вы будете подсчитывать каждое событие с тем же идентификатором несколько раз.

Если я правильно понимаю ваш вариант использования и вы ищете, сколько показов произошло за 1 минуту после появления клика, возможно, вы ищете интервальное соединение . Вы можете реализовать свой случай с помощью следующего запроса:

Table clicks = eventsTable
        .where($("eventType").isEqual("click"))
        .select(
                $("impressionId").as("clickImpressionId"),
                concat($("id"), "_", $("name")).as("concatClickId"),
                $("id").as("clickId"),
                $("name").as("clickName"),
                $("eventTime").as("clickEventTime")
        );

Table impressions = eventsTable
        .where($("eventType").isEqual("impression"))
        .select(
                $("impressionId").as("impressionImpressionId"),
                concat($("id"), "_", $("name")).as("concatImpressionId"),
                $("id").as("impressionId"),
                $("name").as("impressionName"),
                $("eventTime").as("impressionEventTime")
        );

Table table = impressions.join(
        clicks,
        $("clickImpressionId").isEqual($("impressionImpressionId"))
                .and(
                        $("clickEventTime").between(
                                $("impressionEventTime").minus(lit(1).minutes()),
                                $("impressionEventTime"))
                ))
        .select($("concatClickId"), $("impressionEventTime"));

table
        .window(Slide.over("24.hour").every("1.minute").on("impressionEventTime").as("minuteWindow"))
        .groupBy($("concatClickId"), $("minuteWindow"))
        .select($("concatClickId"), $("concatClickId").count())
        .execute()
        .print();

Что касается того, почему Flink иногда не может создавать поток добавления, а только убирает поток см. . Вкратце, если операция не работает на основе атрибута времени, не существует единого момента времени, когда результат был бы «действительным». Поэтому он должен выдавать поток изменений вместо одного добавленного значения. Первое поле в кортеже сообщает вам, является ли запись вставкой (истина) или ретракцией / удалением (ложь).

...