Требуется объединить данные из 2 потоковых источников, а затем уменьшить их для того же ключа и применить функцию для значений, чтобы преобразовать их в другой UDO (определенный пользователем тип объекта). Я понятия не имею, как это сделать. Итак, здесь я только что создал похожую проблему, как показано ниже:
2 входных потока имеют 3 столбца, id, значение и время отправки. Сначала мы объединяем его, а затем уменьшаем на основе идентификатора и получаем конечный результат со значениями (идентификатор, пользовательская функция (значения, время ожидания)). Как этого добиться?
Если я реализую это следующим образом:
val dff = df.union(df2)
.withWatermark("posttime", "15 minutes")
.groupBy(window($"posttime", "10 minutes", "5 minutes"),$"id")
.agg(collect_list(struct("value", "posttime")).as("data"))
.withColumn("data", user-defined-function("data"))
Как определить пользовательскую функцию для ввода столбца и вывода столбца с другим типом данных?
// How to tranform the Column (Type: A) to output Column (Type: B)
def user-defined-function(columnName: String): Column = {
val x = Column(columnName).cast(List<struct>)
val ptime = if(x.posttime < y.posttime) x.posttime else y.posttime
val value = (x.value.toInt + y.value.toInt).toString
return new Column(struct(value, ptime))
}