Этим утром мы обновили версию Spark с 2.2.0 до 2.3.0, и я столкнулся с довольно странной проблемой.
У меня есть UDF (), вычисляющая расстояние между 2 точками
private static UDF4<Double, Double, Double, Double, Double> calcDistance =
(UDF4<Double, Double, Double, Double, Double>) (lat, lon, meanLat, meanLon) ->
GeoUtils.calculateDistance(lat, lon, meanLat, meanLon);
Регистрация UDF
spark.udf().register("calcDistance", calcDistance, DataTypes.DoubleType);
И у меня есть кадр данных следующей структуры (этот DF является результатом объединения 2 DFs в поле hpan
)
root
|-- hpan: string (nullable = true)
|-- atmid: string (nullable = true)
|-- reqamt: long (nullable = true)
|-- mcc_code: string (nullable = true)
|-- utime: string (nullable = true)
|-- udate: string (nullable = true)
|-- address_city: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- gmt_msk_offset: integer (nullable = true)
|-- utimeWithTZ: timestamp (nullable = true)
|-- weekDay: integer (nullable = true)
|-- location_type: string (nullable = true)
|-- mean_lat: double (nullable = true)
|-- mean_lon: double (nullable = true)
Итак, я хочу добавить столбец с расстоянием между (lat, lon) и (mean_lat, mean_lon);
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude"), col("longitude"),
col("mean_lat"), col("mean_lon")));
Хорошо работает на Spark 2.2, но начинает выходить из строя на v2.3
Исключение составляет
Исключение в потоке "main" org.apache.spark.sql.AnalysisException:
Разрешенные атрибуты (Mean_lon), Mean_lat, «Longitude», «широта» отсутствуют
из gmt_msk_offset # 147, utime # 3, долготы # 146, адрес
ss_city # 141, udate # 29, # 371 mean_lon, WEEKDAY # 230, # reqamt 4L, широта # 145, # 369 mean_lat, LOCATION_TYPE # 243, hpan # 1, # 218 utimeWithTZ, mcc_code # 14, # 9 atmid
в проекте оператора [hpan # 1, atmid # 9, reqamt # 4L,
mcc_code # 14, utime # 3, udate # 29, address_city # 141, широта # 145,
долгота # 146, gmt_msk_offset # 147, utimeWithTZ # 218, weekDay # 230,
location_type # 243, mean_lat # 369, mean_lon # 371, 'calcDist
ance ('широта,' долгота, 'mean_lat,' mean_lon) AS расстояние # 509].
Атрибуты с тем же именем появляются в операции:
mean_lon, mean_lat, долгота, широта. Пожалуйста, проверьте, если право
атрибут (ы). ;;
Я пытался добавить псевдонимы в столбцы внутри UDF () следующим образом
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude").as("a"), col("longitude").as("b"), col("mean_lat").as("c"), col("mean_lon").as("d")));
Или оберните эти столбцы в последовательности scala
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",JavaConverters.collectionAsScalaIterableConverter(Arrays.asList
(col("latitude"), col("longitude"), col("mean_lat"), col("mean_lon")))
.asScala()
.toSeq()));
Ни одна из этих попыток не решает проблему.
Может быть, кто-то знает способ решения этой проблемы?
Поток преобразований такой
ParentDF -> childDF1(as parentDF.groupBy().agg(mean())), childDF2(parentDF.filter('condition')) -> svWithCoordsTzAndDistancesDF (join childDF1 and childDF2).
Я думаю, что проблема может быть в плане выполнения, построенном для этого потока ...