Искристая структурированная потоковая передача, объединяющая совокупный информационный кадр в информационный кадр - PullRequest
0 голосов
/ 29 мая 2018

У меня есть потоковый фрейм данных, который в какой-то момент может выглядеть следующим образом:

+--------------------+--------------------+
|               owner|              fruits|
+--------------------+--------------------+
|Brian                | apple|
Brian                | pear |
Brian                | date|
Brian                | avocado|
Bob                | avocado|
Bob                | apple|
........
+--------------------+--------------------+

Я выполнил groupBy, agg collect_list, чтобы очистить вещи.

val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")

вывододин ряд для каждого владельца и массив каждого фрукта.Теперь я хотел бы присоединить этот очищенный массив к исходному потоковому фрейму данных, отбрасывая col фруктов и просто имея колонку fruitsA

val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")

, это, кажется, работает в моей голове, но спарк, похоже, не согласен.

Я получаю

Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
      +- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]

Когда я превращаю все в статический фрейм данных, он работает просто отлично.Разве это невозможно в потоковом контексте?

1 Ответ

0 голосов
/ 05 июня 2018

Вы пытались переименовать имя столбца?Есть похожая проблема https://issues.apache.org/jira/browse/SPARK-19860

...