У меня есть DataFrame df
со следующей структурой:
root
|-- author: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- client: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- outbound_link: array (nullable = true)
| |-- element: string (containsNull = true)
|-- url: string (nullable = true)
Я запускаю этот код:
val sourceField = "outbound_link" // set automatically
val targetField = "url" // set automatically
val nodeId = "client" // set automatically
val result = df.as("df1").join(df.as("df2"),
$"df1."+sourceField === $"df2."+targetField
).groupBy(
($"df1."+nodeId).as("nodeId_1"),
($"df2."+nodeId).as("nodeId_2")
)
.agg(
count("*") as "value", max($"df1."+timestampField) as "timestamp"
)
.toDF("source", "target", "value", "timestamp")
Но я получаю ошибку:
Exception in thread "main" org.apache.spark.sql.AnalysisException: syntax error in attribute name: df1.;
По какой-то причине переменные sourceField
и targetField
не видны внутри операции join
. Эти переменные не являются пустыми и содержат имена полей. Я должен использовать переменные, потому что я определяю их автоматически на предыдущем шаге кода.