Pyspark Структурированные потоковые динамические раздвижные окна - PullRequest
0 голосов
/ 16 января 2019

Я пытаюсь получить максимальное, среднее для всех строк за последние x минут (например, 10 минут) по столбцу s1 относительно столбца времени события. За исключением того, что данные поступают в виде потоков из Кафки. Для каждой входящей строки я хочу рассчитать максимальное и среднее значение столбца для всех строк за последние x минут.

Входящий поток строк находится в миллисекундах (именно с этим я хочу скользить мое окно). Итак, скажем:

  Streaming Input to spark                              Output from spark to kafka
1 {ts:"2018-10-04 10:18:32.001", s1:"24"}               {ts:"2018-10-04 10:18:32.001", s1:"24", s1max10mins:"somevalue"}
2 {ts:"2018-10-04 10:18:32.300", s1:"36"}               {ts:"2018-10-04 10:18:32.300", s1:"36", s1max10mins:"somevalue"}
3 {ts:"2018-10-04 10:18:33.600", s1:"11"}               {ts:"2018-10-04 10:18:32.600", s1:"11", s1max10mins:"somevalue"}
4 {ts:"2018-10-04 10:18:33.800", s1:"10"}               {ts:"2018-10-04 10:18:32.800", s1:"10", s1max10mins:"somevalue"}
.
.
.
.
100 {ts:"2018-10-04 10:18:42.000", s1:"6"}             {ts:"2018-10-04 10:18:42.000", s1:"6", s1max10mins:"36"}
101 {ts:"2018-10-04 10:18:42.400", s1:"7"}             {ts:"2018-10-04 10:18:42.400", s1:"7", s1max10mins:"11"}

Если бы у меня было скользящее окно в минуту, я бы получил «s1max10mins» как 36 для обеих строк 100 и 101. Но я хочу, чтобы окно скользило динамически относительно текущей временной метки (времени события) до миллисекунд.

Возможно ли это любым другим способом, кроме оконного. Не вставлять здесь никакого кода, потому что мой код управления окнами / водяными знаками отлично работает для окна 10 минут и статических интервалов скольжения 1 минута. Установка скользящего окна в 10 секунд действительно увеличивает системные ресурсы. Поэтому даже не хотел пробовать сдвижное окно на что-то вроде 500 мс. Также мой код прекрасно работает для чтения потоков из Кафки и возврата в Кафку.

Я просто хочу max / avg столбца для всех записей за последние 10 минут относительно текущей отметки времени и поместить его в другой столбец (например, "s1max10mins") и передать обратно в Kafka, который запускает внешний скрипт. Какие-либо параметры, кроме окон, которые я могу использовать, чтобы точно рассчитать 10 минут от текущего времени события?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...