Я пишу приложение Stateful Streaming, в котором я использую mapGroupsWithState для создания агрегатов для групп, но мне нужно создать Группы на основе нескольких столбцов в строке ввода . Все примеры в «Spark: Полное руководство» используют только один столбец, например «Пользователь» или «Устройство». Я использую код, аналогичный приведенному ниже. Как мне указать более одного поля в 'groupByKey'?
Есть и другие проблемы. В книге сказано, что мы можем использовать updateAcrossEvents, как указано ниже, но я получаю сообщение об ошибке времени компиляции: Ошибка: (43, 65) список пропущенных аргументов для метода updateAcrossEvents в объекте Main Непримененные методы преобразуются только в функции когда ожидается тип функции. Вы можете сделать это преобразование явным, написав updateAcrossEvents _
или updateAcrossEvents(_,_,_,_,_)
вместо updateAcrossEvents
. .mapGroupsWithState (GroupStateTimeout.EventTimeTimeout ()) (updateAcrossEvents)
Другая проблема: компилятор также жалуется на мой MyReport: Ошибка: (41, 12) Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и др. c) и типы продуктов (классы дел) поддерживаются за счет импорта spark.implicits._ В следующих выпусках будет добавлена поддержка сериализации других типов.
Помощь в устранении этих ошибок будет принята с благодарностью. Заранее спасибо.
withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
updateAcrossEvents:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {
var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
}
state
}
updateWithEvent:
def updateWithEvent(state: MyState, report: MyReport): MyState = {
state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())
state
}