Spark SQL группировать данные по диапазону и запускать оповещения - PullRequest
0 голосов
/ 28 апреля 2019

Я обрабатываю поток данных из Кафки, используя структурированный поток с pyspark. Я хочу публиковать оповещения для Кафки, если показания ненормальные в формате avro

source  temperature timestamp
1001    21  4/28/2019 10:25
1001    22  4/28/2019 10:26
1001    23  4/28/2019 10:27
1001    24  4/28/2019 10:28
1001    25  4/28/2019 10:29
1001    34  4/28/2019 10:30
1001    37  4/28/2019 10:31
1001    36  4/28/2019 10:32
1001    38  4/28/2019 10:33
1001    40  4/28/2019 10:34
1001    41  4/28/2019 10:35
1001    42  4/28/2019 10:36
1001    45  4/28/2019 10:37
1001    47  4/28/2019 10:38
1001    50  4/28/2019 10:39
1001    41  4/28/2019 10:40
1001    42  4/28/2019 10:41
1001    45  4/28/2019 10:42
1001    47  4/28/2019 10:43
1001    50  4/28/2019 10:44

Transform 
source  range   count   alert
1001    21-25   5   HIGH
1001    26-30   5   MEDIUM
1001    40-45   5   MEDIUM
1001    45-50   5   HIGH

Я определил оконную функцию с 20-секундным и 1-секундным скольжением. Я могу публиковать оповещения с простым условием «где», но я не могу преобразовать фрейм данных, как указано выше, и инициировать оповещения, если число равно 20 для любого приоритета оповещения (все записи в окне совпадают с любым приоритетом HIGH-> count ( 20) и т. Д.) Кто-нибудь может подсказать, как это сделать?

Также я могу публиковать данные в формате json, но не могу генерировать их с помощью AVRO. В Scala и Java есть функция to_avro (), но в pyspark ее нет.

Ценю ваш ответ

1 Ответ

0 голосов
/ 28 апреля 2019

Я могу решить эту проблему, используя функцию Bucketizer из библиотеки ml в spark.

Как сделать бин в PySpark?

...