Я обрабатываю поток данных из Кафки, используя структурированный поток с pyspark. Я хочу публиковать оповещения для Кафки, если показания ненормальные в формате avro
source temperature timestamp
1001 21 4/28/2019 10:25
1001 22 4/28/2019 10:26
1001 23 4/28/2019 10:27
1001 24 4/28/2019 10:28
1001 25 4/28/2019 10:29
1001 34 4/28/2019 10:30
1001 37 4/28/2019 10:31
1001 36 4/28/2019 10:32
1001 38 4/28/2019 10:33
1001 40 4/28/2019 10:34
1001 41 4/28/2019 10:35
1001 42 4/28/2019 10:36
1001 45 4/28/2019 10:37
1001 47 4/28/2019 10:38
1001 50 4/28/2019 10:39
1001 41 4/28/2019 10:40
1001 42 4/28/2019 10:41
1001 45 4/28/2019 10:42
1001 47 4/28/2019 10:43
1001 50 4/28/2019 10:44
Transform
source range count alert
1001 21-25 5 HIGH
1001 26-30 5 MEDIUM
1001 40-45 5 MEDIUM
1001 45-50 5 HIGH
Я определил оконную функцию с 20-секундным и 1-секундным скольжением. Я могу публиковать оповещения с простым условием «где», но я не могу преобразовать фрейм данных, как указано выше, и инициировать оповещения, если число равно 20 для любого приоритета оповещения (все записи в окне совпадают с любым приоритетом HIGH-> count ( 20) и т. Д.) Кто-нибудь может подсказать, как это сделать?
Также я могу публиковать данные в формате json, но не могу генерировать их с помощью AVRO. В Scala и Java есть функция to_avro (), но в pyspark ее нет.
Ценю ваш ответ