Как обрабатывать это параллельно на кластере, используя MapFunction и ReduceFunction of spark- java api? - PullRequest
0 голосов
/ 23 апреля 2020

Я использую spark- sql -2.4.1v с java8.

Необходимо выполнить расчет с использованием group by для различных условий с использованием java api, то есть с использованием MapFunction и ReduceFunction.

Сценарий:

Получите исходные данные по приведенной выборке, как показано ниже

+--------+--------------+-----------+-------------+---------+------+
| country|generated_date|industry_id|industry_name|  revenue| state|
+--------+--------------+-----------+-------------+---------+------+
|Country1|    2020-03-01|    Indus_1| Indus_1_Name| 12789979|State1|
|Country1|    2019-06-01|    Indus_1| Indus_1_Name| 56189008|State1|
|Country1|    2019-03-01|    Indus_1| Indus_1_Name| 12789979|State1|
|Country1|    2020-03-01|    Indus_2| Indus_2_Name| 21789933|State2|
|Country1|    2018-03-01|    Indus_2| Indus_2_Name|300789933|State2|
|Country1|    2019-03-01|    Indus_3| Indus_3_Name| 27989978|State3|
|Country1|    2017-06-01|    Indus_3| Indus_3_Name| 56189008|State3|
|Country1|    2017-03-01|    Indus_3| Indus_3_Name| 30014633|State3|
|Country2|    2020-03-01|    Indus_4| Indus_4_Name| 41789978|State1|
|Country2|    2018-03-01|    Indus_4| Indus_4_Name| 56189008|State1|
|Country3|    2019-03-01|    Indus_5| Indus_5_Name| 37899790|State3|
|Country3|    2018-03-01|    Indus_5| Indus_5_Name| 56189008|State3|
|Country3|    2017-03-01|    Indus_5| Indus_5_Name| 67789978|State3|
|Country1|    2020-03-01|    Indus_6| Indus_6_Name| 12789979|State1|
|Country1|    2020-06-01|    Indus_6| Indus_6_Name| 37899790|State1|
|Country1|    2018-03-01|    Indus_6| Indus_6_Name| 56189008|State1|
|Country3|    2020-03-01|    Indus_7| Indus_7_Name| 26689900|State1|
|Country3|    2020-12-01|    Indus_7| Indus_7_Name|212359979|State1|
|Country3|    2019-03-01|    Indus_7| Indus_7_Name| 12789979|State1|
|Country1|    2018-03-01|    Indus_8| Indus_8_Name|212359979|State2|
+--------+--------------+-----------+-------------+---------+------+

Необходимо рассчитать различные вычисления, например, avg (доход) ) для каждой данной группы для заданных дат, способных сделать это, но не масштабировать их в спарк-кластере.

Для того же, что я делаю ниже, но это вовсе не масштабирование ... следовательно, понял, что мне нужно использовать MapFunction и ReduceFunction java .. не знаете, как это сделать?

//Will get dates to for which I need to calculate , this provided by external source 
        List<String> datesToCalculate = Arrays.asList("2019-03-01","2020-06-01","2018-09-01");

        //Will get groups  to calculate , this provided by external source ..will keep changing
        //Have around 100s of groups.
        List<String> groupsToCalculate = Arrays.asList("Country","Country-State");

        //For each data given need to calculate avg(revenue) for each given group 
        //for those given each date of datesToCalculate for those records whose are later than given date.
        //i.e. 

        //Now I am doing some thing like this..but it is not scaling

        datesToCalculate.stream().forEach( cal_date -> {

            Dataset<IndustryRevenue> calc_ds = ds.where(col("generated_date").gt(lit(cal_date)));

            //this keep changing for each cal_date
            Dataset<Row> final_ds = calc_ds
                                      .withColumn("calc_date", to_date(lit(cal_date)).cast(DataTypes.DateType));

            //for each group it calcuate separate set
            groupsToCalculate.stream().forEach( group -> {

                String tempViewName = new String("view_" + cal_date + "_" + group);

                final_ds.createOrReplaceTempView(tempViewName);

                String query = "select "  
                                  + " avg(revenue) as mean, "
                                  + "from " + tempViewName                      
                                  + " group by " + group;

                System.out.println("query : " + query);
                Dataset<Row> resultDs  = spark.sql(query);

                Dataset<Row> finalResultDs  =  resultDs
                                 .withColumn("calc_date", to_date(lit(cal_date)).cast(DataTypes.DateType))
                                 .withColumn("group", to_date(lit(group)).cast(DataTypes.DateType));


                //Writing to each group for each date is taking hell lot of time.
                // For each record it is save at a time
                // want to move out unioning all finalResultDs and write in batches
                finalResultDs
                   .write().format("parquet")
                   .mode("append")
                   .save("/tmp/"+ tempViewName);

                spark.catalog().dropTempView(tempViewName);

            });

        });

Из-за циклов for для обработки нескольких миллионов записей требуется более 20 часов. так как избежать циклов и заставить его работать быстро.

Вот пример кода

https://github.com/BdLearnerr/Java-mapReduce/blob/master/MapReduceScalingProblem.java

Ожидаемый результат:

+--------------+----------------+--------------+
| group-name   |   group-value  |         mean |
+--------------+----------------+--------------+
|country-state |Country1-State1 | 2.53448845E7 |
|country-state |Country3-State3 |   6.7789978E7|
|country-state |Country1-State2 | 1.919319606E8|
|country-state |Country4-State1 |    9.789979E7|
|country-state |Country1-State3 |   2.9339748E7|
|country-state |Country3-State1 |     2.66899E7|
|country-state |Country2-State1 |   4.1789978E7|
|country       |Country4        |    9.789979E7|
|country       |Country1        |   8.5696311E7|
|country       |Country3        |   4.7239939E7|
|country       |Country2        |   4.1789978E7|
+--------------+----------------+--------------+

1 Ответ

1 голос
/ 26 апреля 2020

Вот то, что я считаю частью решения, касающегося ваших неотложных вопросов, но я оставляю аспекты для вас, чтобы заполнить их Есть и другие подходы, но это мой быстрый вывод из того, что я понимаю. Успех. Нет foreach req'd. Возможно, я неправильно понял, что вам нужно. Извините, если это так. При таком подходе вы можете подумать о .cache.

// Assuming constant names in terms of country names are spelled similarly and consistently
// Not clear if by date or for selected dates. If selected dates then use another list 
// This approach will scale due to JOIN and AGG and no foreach, etc.
// Spark will fuse the code together if it can, but there are shuffles

// This is for Country, State. You can apply the approach to just Country and then UNION the 2 DF's with common names and definitions. Try it out
// NB: You make a custom grouping by concatenating the Country & State or you can leave as is, and for 2nd query you can just fill in country and put a blank value into the State.
// I leave that up to you.

import spark.implicits._

import org.apache.spark.sql.functions._
val dfC = Seq(("USA", "Ohio"), ("NZ", "Otago")).toDF("sCountry", "sState") // Your search criteria at Country / State level, cannot so simple .isin - why?

val d = List("23-10-2001", "12-12-2003") // or Array

val dfS = Seq(
             ("USA", "Ohio", "23-10-2001", 2),
             ("USA", "Ohio", "23-10-2001", 2),
             ("USA", "Ohio", "23-10-2011", 2),
             ("USA", "Texas", "23-10-2001", 2),
             ("USA", "Virgina", "23-10-2001", 10),
             ("USA", "Virgina", "23-10-2001", 6),
             ("USA", "vanDiemensLand", "23-10-2001", 26),
             ("NL", "vanDiemensLand", "23-10-2001", 16),
             ("UK", "Middlesex", "23-10-2001", 3)
             ).toDF("country", "state", "date", "some_val") 
dfS.show(false)

// 1. For Country & State 
// JOIN acts as a filter as is inner join and alleviates the .isin for multiple cols i.e. Country||State
val df1 = dfS.join(dfC, (dfS("country") === dfC("sCountry")) && (dfS("state") === dfC("sState"))).drop("sCountry").drop("sState")
df1.show(false)

val df2 = df1.filter($"date".isin(d:_*)).groupBy("country", "state").avg("some_val") 
df2.show(false)

// 2. For Country only
... to fill in by you
...

// 3. UNION df2 & df3
...

// 4. Save with partitioning.
...