Вызов AnalysisExUDUDF () внутри withColumn () - PullRequest
0 голосов
/ 15 мая 2018

Этим утром мы обновили версию Spark с 2.2.0 до 2.3.0, и я столкнулся с довольно странной проблемой.

У меня есть UDF (), вычисляющая расстояние между 2 точками

private static UDF4<Double, Double, Double, Double, Double> calcDistance =
        (UDF4<Double, Double, Double, Double, Double>) (lat, lon, meanLat, meanLon) ->
                GeoUtils.calculateDistance(lat, lon, meanLat, meanLon);

Регистрация UDF

spark.udf().register("calcDistance", calcDistance, DataTypes.DoubleType);

И у меня есть кадр данных следующей структуры (этот DF является результатом объединения 2 DFs в поле hpan)

root
|-- hpan: string (nullable = true)
|-- atmid: string (nullable = true)
|-- reqamt: long (nullable = true)
|-- mcc_code: string (nullable = true)
|-- utime: string (nullable = true)
|-- udate: string (nullable = true)
|-- address_city: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- gmt_msk_offset: integer (nullable = true)
|-- utimeWithTZ: timestamp (nullable = true)
|-- weekDay: integer (nullable = true)
|-- location_type: string (nullable = true)
|-- mean_lat: double (nullable = true)
|-- mean_lon: double (nullable = true)

Итак, я хочу добавить столбец с расстоянием между (lat, lon) и (mean_lat, mean_lon);

svWithCoordsTzAndDistancesDF.withColumn("distance",
    callUDF("calcDistance",col("latitude"), col("longitude"), 
            col("mean_lat"), col("mean_lon")));

Хорошо работает на Spark 2.2, но начинает выходить из строя на v2.3 Исключение составляет

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: Разрешенные атрибуты (Mean_lon), Mean_lat, «Longitude», «широта» отсутствуют из gmt_msk_offset # 147, utime # 3, долготы # 146, адрес
ss_city # 141, udate # 29, # 371 mean_lon, WEEKDAY # 230, # reqamt 4L, широта # 145, # 369 mean_lat, LOCATION_TYPE # 243, hpan # 1, # 218 utimeWithTZ, mcc_code # 14, # 9 atmid в проекте оператора [hpan # 1, atmid # 9, reqamt # 4L, mcc_code # 14, utime # 3, udate # 29, address_city # 141, широта # 145, долгота # 146, gmt_msk_offset # 147, utimeWithTZ # 218, weekDay # 230, location_type # 243, mean_lat # 369, mean_lon # 371, 'calcDist
ance ('широта,' долгота, 'mean_lat,' mean_lon) AS расстояние # 509]. Атрибуты с тем же именем появляются в операции: mean_lon, mean_lat, долгота, широта. Пожалуйста, проверьте, если право
атрибут (ы). ;;

Я пытался добавить псевдонимы в столбцы внутри UDF () следующим образом

svWithCoordsTzAndDistancesDF.withColumn("distance",
        callUDF("calcDistance",col("latitude").as("a"), col("longitude").as("b"), col("mean_lat").as("c"), col("mean_lon").as("d")));

Или оберните эти столбцы в последовательности scala

svWithCoordsTzAndDistancesDF.withColumn("distance",
        callUDF("calcDistance",JavaConverters.collectionAsScalaIterableConverter(Arrays.asList
                (col("latitude"), col("longitude"), col("mean_lat"), col("mean_lon")))
                .asScala()
                .toSeq()));

Ни одна из этих попыток не решает проблему.

Может быть, кто-то знает способ решения этой проблемы?

Поток преобразований такой

ParentDF -> childDF1(as parentDF.groupBy().agg(mean())), childDF2(parentDF.filter('condition')) -> svWithCoordsTzAndDistancesDF (join childDF1 and childDF2). 

Я думаю, что проблема может быть в плане выполнения, построенном для этого потока ...

1 Ответ

0 голосов
/ 16 мая 2018

Это какая-то магия.Когда я указываю столбец данных и добавляю select("*") - все работает.Если кто-то может это объяснить - я буду очень благодарен

df = df.select("*")
       .withColumn("distance", callUDF("calcDistance",
                df.col("latitude"),
                df.col("longitude"),
                df.col("mean_lat"),
                df.col("mean_lon")))
      .toDF();
...