как распараллелить это в искре, используя API набора данных искры - PullRequest
0 голосов
/ 15 апреля 2020

Я использую spark- sql -2.4.1v с Java 8.

У меня есть столбцы данных, как показано ниже

val df_data = Seq(
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979),
  ("Indus_2","Indus_2_Name","Country1", "State2",21789933),
  ("Indus_3","Indus_3_Name","Country1", "State3",21789978),
  ("Indus_4","Indus_4_Name","Country2", "State1",41789978),
  ("Indus_5","Indus_5_Name","Country3", "State3",27789978),
  ("Indus_6","Indus_6_Name","Country1", "State1",27899790),
  ("Indus_7","Indus_7_Name","Country3", "State1",27899790),
  ("Indus_8","Indus_8_Name","Country1", "State2",27899790),
  ("Indus_9","Indus_9_Name","Country4", "State1",27899790)
  ).toDF("industry_id","industry_name","country","state","revenue");

Учитывая приведенный ниже список входных данных :

val countryList = Seq("Country1","Country2");
val stateMap = Map("Country1" -> {"State1","State2"}, "Country2" -> {"State2","State3"});

При работе по принципу "работа на спарке" для каждой страны для каждого штата мне нужно рассчитать общий доход нескольких отраслей.

На других языках мы получаем l oop.

т.е.

for( country <- countryList ){
   for( state <- stateMap.get(country){
   // do some calculation for each state industries
   }
}

В Spark, что я понял, мы должны делать так, то есть все исполнители не были использованы для этого. так каков правильный способ справиться с этим?

Ответы [ 3 ]

1 голос
/ 15 апреля 2020

Вы можете использовать flatMapValues для создания пар ключ-значение, а затем выполнять вычисления с шагом .map.

scala> val data = Seq(("country1",Seq("state1","state2","state3")),("country2",Seq("state1","state2")))
scala> val rdd = sc.parallelize(data)
scala> val rdd2 = rdd.flatMapValues(s=>s)

scala> rdd2.foreach(println(_))
(country1,state1)
(country2,state1)
(country1,state2)
(country2,state2)
(country1,state3)

Здесь вы можете выполнять операции, я добавил # к каждому состояние

scala> rdd2.map(s=>(s._1,s._2+"#")).foreach(println(_))
(country1,state1#)
(country1,state2#)
(country1,state3#)
(country2,state1#)
(country2,state2#)
1 голос
/ 20 апреля 2020

Я добавил несколько дополнительных строк к вашим образцам данных для дифференциации агрегации Я использовал scala параллельный сбор, для каждой страны он получит состояния, а затем использует эти значения, чтобы отфильтровать данный кадр данных и затем выполнить агрегацию, и в конце он объединит все результаты обратно.

scala> val df = Seq(
     |   ("Indus_1","Indus_1_Name","Country1", "State1",12789979),
     |   ("Indus_2","Indus_2_Name","Country1", "State2",21789933),
     |   ("Indus_2","Indus_2_Name","Country1", "State2",31789933),
     |   ("Indus_3","Indus_3_Name","Country1", "State3",21789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State1",41789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State2",41789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State2",81789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State3",41789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State3",51789978),
     |   ("Indus_5","Indus_5_Name","Country3", "State3",27789978),
     |   ("Indus_6","Indus_6_Name","Country1", "State1",27899790),
     |   ("Indus_7","Indus_7_Name","Country3", "State1",27899790),
     |   ("Indus_8","Indus_8_Name","Country1", "State2",27899790),
     |   ("Indus_9","Indus_9_Name","Country4", "State1",27899790)
     |   ).toDF("industry_id","industry_name","country","state","revenue")
df: org.apache.spark.sql.DataFrame = [industry_id: string, industry_name: string ... 3 more fields]

scala> val countryList = Seq("Country1","Country2","Country4","Country5");
countryList: Seq[String] = List(Country1, Country2, Country4, Country5)

scala> val stateMap = Map("Country1" -> ("State1","State2"), "Country2" -> ("State2","State3"),"Country3" -> ("State31","State32"));
stateMap: scala.collection.immutable.Map[String,(String, String)] = Map(Country1 -> (State1,State2), Country2 -> (State2,State3), Country3 -> (State31,State32))

scala>

scala> :paste
// Entering paste mode (ctrl-D to finish)

countryList
.par
.filter(cn => stateMap.exists(_._1 == cn))
.map(country => (country,stateMap(country)))
.map{data =>
    df.filter($"country" === data._1 && ($"state" === data._2._1 || $"state" === data._2._2)).groupBy("country","state","industry_name").agg(sum("revenue").as("total_revenue"))
}.reduce(_ union _).show(false)


// Exiting paste mode, now interpreting.

+--------+------+-------------+-------------+
|country |state |industry_name|total_revenue|
+--------+------+-------------+-------------+
|Country1|State2|Indus_8_Name |27899790     |
|Country1|State1|Indus_6_Name |27899790     |
|Country1|State2|Indus_2_Name |53579866     |
|Country1|State1|Indus_1_Name |12789979     |
|Country2|State3|Indus_4_Name |93579956     |
|Country2|State2|Indus_4_Name |123579956    |
+--------+------+-------------+-------------+


scala>

Редактировать - 1: разделенный код Agg на другой функциональный блок.

scala> def processDF(data:(String,(String,String)),adf:DataFrame) = adf.filter($"country" === data._1 && ($"state" === data._2._1 || $"state" === data._2._2)).groupBy("country","state","industry_name").agg(sum("revenue").as("total_revenue"))
processDF: (data: (String, (String, String)), adf: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

scala> :paste
// Entering paste mode (ctrl-D to finish)

countryList.
par
.filter(cn => stateMap.exists(_._1 == cn))
.map(country => (country,stateMap(country)))
.map(data => processDF(data,df))
.reduce(_ union _)
.show(false)


// Exiting paste mode, now interpreting.

+--------+------+-------------+-------------+
|country |state |industry_name|total_revenue|
+--------+------+-------------+-------------+
|Country1|State2|Indus_8_Name |27899790     |
|Country1|State1|Indus_6_Name |27899790     |
|Country1|State2|Indus_2_Name |53579866     |
|Country1|State1|Indus_1_Name |12789979     |
|Country2|State3|Indus_4_Name |93579956     |
|Country2|State2|Indus_4_Name |123579956    |
+--------+------+-------------+-------------+


scala>
1 голос
/ 15 апреля 2020

Это действительно зависит от того, что вы хотите сделать, если вам не нужно делить состояние между штатами (состояниями стран), то вам нужно создать свой DataFrame, который каждой строкой (страна, штат), и затем вы можете контролировать, как много строк будут обрабатываться параллельно (num разделов и num ядер).

...