Продукты и соответствующие продажи загружаются из CSV-файлов правильно, например:
Dataset<Row> dfProducts = sparkSession.read()
.option("mode", "DROPMALFORMED")
.option("header", "true")
.option("inferSchema", "true")
.option("charset", "UTF-8")
.csv(new ClassPathResource("products.csv").getURL().getPath());
Dataset<Row> dfSaledetails = sparkSession.read()
.option("mode", "DROPMALFORMED")
.option("header", "true")
.option("inferSchema", "true")
.option("charset", "UTF-8")
.csv(new ClassPathResource("saledetails.csv").getURL().getPath());
У продукта есть столбцы (product_id, product_name, ...).В продажах есть столбцы (product_id, amount, ...)
Что мне нужно сделать, так это соединить два набора данных на основе общего столбца (product_id)
, сгруппировать по product_id
, столбец суммы amount
, а затемвыбрать / отобразить только определенные столбцы (product_name и результат суммирования)
Следующая моя попытка
Dataset<Row> dfSalesTotals = dfSaledetails
.join(dfProducts, dfSaledetails.col("product_id").equalTo(dfProducts.col("product_id")))
.groupBy(dfSaledetails.col("product_id"))
.agg(sum(dfSaledetails.col("amount")).alias("total_amount"))
.select(dfProducts.col("product_name"), col("total_amount"));
dfSalesTotals.show();
, которая выдает следующую ошибку
Причина:org.apache.spark.sql.AnalysisException: решенные атрибуты отсутствуют product_name # 215 в product_id # 272, total_amount # 499 в операторе! Project [product_name # 215, total_amount # 499]. ;;! Project [product_name # 215, total_amount # 499] + - Aggregate [product_id # 272], [product_id # 272, sum (amount # 277) AS total_amount # 499] + - Join Inner, (product_id # 272 = product_id # 212): - Отношение [sale_detail_auto_id # 266, sale_auto_id # 267, sale_id # 268, agent_id # 269, sale_detail_id # 270, inventory_id # 271, product_id # 272, unit_cost # 273, unit_price # 274, НДС # 275, количество # 276, количество # 276, количество #277, promo_id # 278, скидка # 279] csv + - отношение [product_id # 212, user_group_id_super_owner # 213, product_category # 214, product_name # 215, product_type # 216, product_code # 217, код дистрибьютора # 218, product_units # 219, product_unit, product_manufacturer # 221, product_distributor # 222, create_date # 223, UPDATE_DATE # 224, кубовые # 225, product_weight # 226, carton_size # 227, product_listStatus # 228, active_status # 229, distributor_type # 230, BUNDLE_TYPE # 231, barcode_type # 232, product_family_id# 233] csv