Spark Streaming с mapGroupsWithState - PullRequest
       4

Spark Streaming с mapGroupsWithState

0 голосов
/ 03 марта 2020

Я пишу приложение Stateful Streaming, в котором я использую mapGroupsWithState для создания агрегатов для групп, но мне нужно создать Группы на основе нескольких столбцов в строке ввода . Все примеры в «Spark: Полное руководство» используют только один столбец, например «Пользователь» или «Устройство». Я использую код, аналогичный приведенному ниже. Как мне указать более одного поля в 'groupByKey'?

Есть и другие проблемы. В книге сказано, что мы можем использовать updateAcrossEvents, как указано ниже, но я получаю сообщение об ошибке времени компиляции: Ошибка: (43, 65) список пропущенных аргументов для метода updateAcrossEvents в объекте Main Непримененные методы преобразуются только в функции когда ожидается тип функции. Вы можете сделать это преобразование явным, написав updateAcrossEvents _ или updateAcrossEvents(_,_,_,_,_) вместо updateAcrossEvents. .mapGroupsWithState (GroupStateTimeout.EventTimeTimeout ()) (updateAcrossEvents)

Другая проблема: компилятор также жалуется на мой MyReport: Ошибка: (41, 12) Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и др. c) и типы продуктов (классы дел) поддерживаются за счет импорта spark.implicits._ В следующих выпусках будет добавлена ​​поддержка сериализации других типов.

Помощь в устранении этих ошибок будет принята с благодарностью. Заранее спасибо.

withEventTime
 .as[MyReport]
 .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
 .writeStream
 .queryName("test_query")
 .format("memory")
 .outputMode("update")
 .start()

updateAcrossEvents:

def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {

 var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)

 for (input <- inputs) {
 state = updateWithEvent(state, input)
 oldState.update(state)
 }

 state
}

updateWithEvent:

def updateWithEvent(state: MyState, report: MyReport): MyState = {

 state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
 state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())

 state
}

1 Ответ

1 голос
/ 03 марта 2020

Вы можете сформировать набор ключей - проверьте этот код:

withEventTime
 .as[MyReport]
 .groupByKey(row => (row.getKeys.getKey1,row.getKeys.getKey2)) 
 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
 .writeStream
 .queryName("test_query")
 .format("memory")
 .outputMode("update")
 .start()

Теперь вы получаете одну уникальную группу для комбинации (getKey1, getKey2). Возможно, вам придется соответствующим образом изменить функцию обновления.

По второму вопросу:

да, по умолчанию spark поддерживает только классы дел и типы примитивов.

Чтобы избавиться от Эта ошибка, убедитесь, что «MyReport» является классом дела и имплицирует имплицит перед приведенным выше кодом, используя:

import <your_spark_session_variable>.implicits._
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...