Объединение, агрегирование и выбор определенных столбцов в Apache Spark - PullRequest
0 голосов
/ 19 декабря 2018

Продукты и соответствующие продажи загружаются из CSV-файлов правильно, например:

    Dataset<Row> dfProducts = sparkSession.read()
            .option("mode", "DROPMALFORMED")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("charset", "UTF-8")
            .csv(new ClassPathResource("products.csv").getURL().getPath());
    Dataset<Row> dfSaledetails = sparkSession.read()
            .option("mode", "DROPMALFORMED")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("charset", "UTF-8")
            .csv(new ClassPathResource("saledetails.csv").getURL().getPath());

У продукта есть столбцы (product_id, product_name, ...).В продажах есть столбцы (product_id, amount, ...)

Что мне нужно сделать, так это соединить два набора данных на основе общего столбца (product_id), сгруппировать по product_id, столбец суммы amount, а затемвыбрать / отобразить только определенные столбцы (product_name и результат суммирования)

Следующая моя попытка

    Dataset<Row> dfSalesTotals = dfSaledetails
            .join(dfProducts, dfSaledetails.col("product_id").equalTo(dfProducts.col("product_id")))
            .groupBy(dfSaledetails.col("product_id"))
            .agg(sum(dfSaledetails.col("amount")).alias("total_amount"))
            .select(dfProducts.col("product_name"), col("total_amount"));
    dfSalesTotals.show();

, которая выдает следующую ошибку

Причина:org.apache.spark.sql.AnalysisException: решенные атрибуты отсутствуют product_name # 215 в product_id # 272, total_amount # 499 в операторе! Project [product_name # 215, total_amount # 499]. ;;! Project [product_name # 215, total_amount # 499] + - Aggregate [product_id # 272], [product_id # 272, sum (amount # 277) AS total_amount # 499] + - Join Inner, (product_id # 272 = product_id # 212): - Отношение [sale_detail_auto_id # 266, sale_auto_id # 267, sale_id # 268, agent_id # 269, sale_detail_id # 270, inventory_id # 271, product_id # 272, unit_cost # 273, unit_price # 274, НДС # 275, количество # 276, количество # 276, количество #277, promo_id # 278, скидка # 279] csv + - отношение [product_id # 212, user_group_id_super_owner # 213, product_category # 214, product_name # 215, product_type # 216, product_code # 217, код дистрибьютора # 218, product_units # 219, product_unit, product_manufacturer # 221, product_distributor # 222, create_date # 223, UPDATE_DATE # 224, кубовые # 225, product_weight # 226, carton_size # 227, product_listStatus # 228, active_status # 229, distributor_type # 230, BUNDLE_TYPE # 231, barcode_type # 232, product_family_id# 233] csv

1 Ответ

0 голосов
/ 19 декабря 2018

Если вы хотите сохранить product_name, оно должно быть либо в groupBy

.groupBy(
  dfSaledetails.col("product_id"),
  col("product_name")))

, либо в agg

.agg(
  sum(dfSaledetails.col("amount")).alias("total_amount"), 
  first(col("product_name")).alias("product_name"))
...