Как избежать использования сбора в Spark RDD в Scala? - PullRequest
2 голосов
/ 27 апреля 2020

У меня есть список, и я должен создать карту для этого для дальнейшего использования, я использую RDD, но с использованием collect () работа в кластере не выполняется. Любая помощь приветствуется.

Пожалуйста, помогите. Ниже приведен пример кода из списка в rdd.collect. Я должен использовать эти данные карты дальше, но как использовать без сбора?

Этот код создает карту из данных RDD (List). Формат списка -> (asdfg / 1234 / wert, asdf)

 //List Data to create Map
 val listData = methodToGetListData(ListData).toList
//Creating RDD from above List  

  val rdd = sparkContext.makeRDD(listData)

      implicit val formats = Serialization.formats(NoTypeHints)
      val res = rdd
        .map(map => (getRPath(map._1), getAttribute(map._1), map._2))
        .groupBy(_._1)
        .map(tuple => {
          Map(
            "P_Id" -> "1234",
            "R_Time" -> "27-04-2020",
            "S_Time" -> "27-04-2020",
            "r_path" -> tuple._1,
            "S_Tag" -> "12345,
            tuple._1 -> (tuple._2.map(a => (a._2, a._3)).toMap)
          )
        })

      res.collect()
    }

1 Ответ

2 голосов
/ 29 апреля 2020

Q: как использовать без сбора?


Ответ: collect ударит .. это переместит данные на узел драйвера. если данные огромны. Никогда не делай этого.


Я точно не знаю, каков вариант использования для подготовки map, но это может быть достигнуто с использованием встроенного API-интерфейса spark т.е. collectionAccumulator ... подробно,

collectionAccumulator[scala.collection.mutable.Map[String, String]]


Позволяет Предположим, это ваш примерный фрейм данных, и вы хотите создать карту.

+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree                  |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909  |1234     |Cables-1             |23-12-2020   |LC        |Installed   |ABCD1234     |0          |Cables      |ASDF123   |12345    |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111  |Cables-11            |23-12-2022   |LC1       |Installed1  |ABCD12341    |0          |Cables1     |ASDF1231  |123451   |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+

Из этого вы хотите создать карту ( вложенная карта. Я использовал префикс с именем ключа nestedmap в вашем примере. ) затем ...

Ниже приведен полный пример, посмотрите и измените его соответствующим образом.

package examples

import org.apache.log4j.Level

object GrabMapbetweenClosure extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)


  import org.apache.spark.sql.SparkSession

  val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()

  import spark.implicits._

  var mutableMapAcc = spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String, String]]("mutableMap")

  val df = Seq(
    ("-0909", "1234", "Cables-1", "23-12-2020", "LC", "Installed", "ABCD1234"
      , "0", "Cables", "ASDF123", "12345", "Start~>HInfo->Cables->Cables-1")
    , ("-09091", "1234111", "Cables-11", "23-12-2022", "LC1", "Installed1", "ABCD12341"
      , "0", "Cables1", "ASDF1231", "123451", "Start~>HInfo->Cables->Cables-11")

  ).toDF("Item_Id", "Parent_Id", "object_class_instance", "Received_Time", "CablesName", "CablesStatus", "CablesHInfoID",
    "CablesIndex", "object_class", "ServiceTag", "Scan_Time", "relation_tree"
  )

  df.show(false)
  df.foreachPartition { partition => // for performance sake I used foreachPartition
    partition.foreach {
      record => {
        mutableMapAcc.add(scala.collection.mutable.Map(
          "Item_Id" -> record.getAs[String]("Item_Id")
          , "CablesStatus" -> record.getAs[String]("CablesStatus")
          , "CablesHInfoID" -> record.getAs[String]("CablesHInfoID")
          , "Parent_Id" -> record.getAs[String]("Parent_Id")
          , "CablesIndex" -> record.getAs[String]("CablesIndex")
          , "object_class_instance" -> record.getAs[String]("object_class_instance")
          , "Received_Time" -> record.getAs[String]("Received_Time")
          , "object_class" -> record.getAs[String]("object_class")
          , "CablesName" -> record.getAs[String]("CablesName")
          , "ServiceTag" -> record.getAs[String]("ServiceTag")
          , "Scan_Time" -> record.getAs[String]("Scan_Time")
          , "relation_tree" -> record.getAs[String]("relation_tree")

        )
        )
      }
    }
  }
  println("FinalMap : " + mutableMapAcc.value.toString)

}


Результат:

+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|Item_Id|Parent_Id|object_class_instance|Received_Time|CablesName|CablesStatus|CablesHInfoID|CablesIndex|object_class|ServiceTag|Scan_Time|relation_tree                  |
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+
|-0909  |1234     |Cables-1             |23-12-2020   |LC        |Installed   |ABCD1234     |0          |Cables      |ASDF123   |12345    |Start~>HInfo->Cables->Cables-1 |
|-09091 |1234111  |Cables-11            |23-12-2022   |LC1       |Installed1  |ABCD12341    |0          |Cables1     |ASDF1231  |123451   |Start~>HInfo->Cables->Cables-11|
+-------+---------+---------------------+-------------+----------+------------+-------------+-----------+------------+----------+---------+-------------------------------+

FinalMap : [Map(Scan_Time -> 123451, ServiceTag -> ASDF1231, Received_Time -> 23-12-2022, object_class_instance -> Cables-11, CablesHInfoID -> ABCD12341, Parent_Id -> 1234111, Item_Id -> -09091, CablesIndex -> 0, object_class -> Cables1, relation_tree -> Start~>HInfo->Cables->Cables-11, CablesName -> LC1, CablesStatus -> Installed1), Map(Scan_Time -> 12345, ServiceTag -> ASDF123, Received_Time -> 23-12-2020, object_class_instance -> Cables-1, CablesHInfoID -> ABCD1234, Parent_Id -> 1234, Item_Id -> -0909, CablesIndex -> 0, object_class -> Cables, relation_tree -> Start~>HInfo->Cables->Cables-1, CablesName -> LC, CablesStatus -> Installed)]

Подобная проблема была решена здесь.

...