Агрегированные данные Kinesis на основе идентификатора события - PullRequest
0 голосов
/ 26 февраля 2019

Я новичок в Kinesis Data Analytics.Есть следующие вопросы.Мой поток входных данных имеет следующую схему: каждая запись - это событие, которое может произойти во время просмотра видео (запись генерируется во время просмотра видео):

videoViewId, имя-события, intPayload, timeStamp, someMetadataField1, someMetadataField2 123, «пауза», 1,2019-02-26T00: 23: 00, «Некоторое статическое поле 1», «Некоторое статическое поле 2»

123, «резюме», 1,2019-02-26T00: 23: 01, «Некоторое статическое поле 1», «Некоторое статическое поле 2»

123, «viewDuration», 230,2019-02-26T00: 23: 02, «Некоторое статическоеполе 1 »,« Некоторое статическое поле 2 »

123,« skipForward », 1,2019-02-26T00: 23: 01,« Некоторое статическое поле 3 »,« Некоторое статическое поле 4 »

456, «viewDuration», 550,2019-02-26T00: 23: 06, «Некоторое статическое поле 3», «Некоторое статическое поле 4»

456, «отключение звука», 1,2019-02-26T00: 23: 05, «Некоторое статическое поле 3», «Некоторое статическое поле 4»

Мои выходные данные должны быть агрегированы на основе videoViewId и иметь «обратную» схемугде имена столбцов определяются через имя события (у меня есть список из 20-30 возможных событий, которые могут или не могут произойти).

Это схема выходных данных:

videoViewId, pauseCount, resumeCount, skipForwardCount, muteCount, unmuteCount, viewDuration, someMetadataField1, someMetadataField2

123,1,1,1,0,0,230, «Некоторое статическое поле 1», «Некоторое статическое поле 2»

456,0,0,0,1,0,550, «Некоторое статическое поле 3», «Некоторое статическое поле 4»

Что будет лучшим вариантомдля такой агрегации?

Я имею в виду следующие подходы:

  • Kinesis Data Analytics.Можно ли выполнить такое агрегирование с помощью Kinesis Data Analytics? Будет ли работать агрегация скользящих окон в этом случае, поскольку я не знаю, когда все события для определенного идентификатора просмотра видео представлены и готовы к агрегации.

  • Немного лямбды?

  • Вывод данных в поток доставки S3 Firehose, а затем запускать задачу AWS EMR каждый час / день для агрегирования результатов?

...