Как эффективно извлечь несколько столбцов из одного столбца RDD? - PullRequest
3 голосов
/ 15 мая 2019

У меня есть файл с 20+ столбцами, из которых я хотел бы извлечь несколько.До сих пор у меня есть следующий код.Я уверен, что есть умный способ сделать это, но не в состоянии заставить это работать успешно.Есть идеи?

mvnmdata имеет тип RDD [String]

val strpcols = mvnmdata.map(x => x.split('|')).map(x => (x(0),x(1),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12),x(13),x(14),x(15),x(16),x(17),x(18),x(19),x(20),x(21),x(22),x(23) ))```

Ответы [ 2 ]

0 голосов
/ 17 мая 2019

Это решение предоставляет удобный и легко читаемый способ управления именами и индексами столбцов. Он основан на карте, которая определяет отношение столбца к имени / индексу. Карта также поможет нам обрабатывать как индекс извлеченного столбца, так и его имя.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructType, StructField}

val rdd = spark.sparkContext.parallelize(Seq(
"1|500|400|300",
"1|34|67|89",
"2|10|20|56",
"3|2|5|56",
"3|1|8|22"))

val dictColums = Map("c0" -> 0, "c2" -> 2)

val schema = StructType(dictColums.keys.toSeq.map(StructField(_, StringType, true)))

val mappedRDD = rdd.map{line => line.split('|')}
                    .map{
                      cols => Row.fromSeq(dictColums.values.toSeq.map{cols(_)})
                    }

val df = spark.createDataFrame(mappedRDD, schema).show

//output
+---+---+
| c0| c2|
+---+---+
|  1|400|
|  1| 67|
|  2| 20|
|  3|  5|
|  3|  8|
+---+---+
  • Сначала мы объявим dictColums, в этом примере мы извлечем столбцы "c0" -> 0 и "c2" -> 2
  • Далее создадим схему из ключей карты
  • Одна карта (которая у вас уже есть) разделит линию на |, вторая создаст Row, содержащую значения, которые соответствуют каждому элементу dictColums.values

UPDATE:

Вы также можете создать функцию из вышеперечисленных функций, чтобы иметь возможность многократно использовать ее:

import org.apache.spark.sql.DataFrame

def stringRddToDataFrame(colsMapping: Map[String, Int], rdd: RDD[String]) : DataFrame = {
  val schema = StructType(colsMapping.keys.toSeq.map(StructField(_, StringType, true)))

  val mappedRDD = rdd.map{line => line.split('|')}
                    .map{
                      cols => Row.fromSeq(colsMapping.values.toSeq.map{cols(_)})
                    }

  spark.createDataFrame(mappedRDD, schema)
}

А затем используйте его для вашего случая:

val cols = Map("c0" -> 0, "c1" -> 1, "c5" -> 5, "c6" -> 6 ..... "c23" -> 23)

val df = stringRddToDataFrame(cols, rdd)
0 голосов
/ 15 мая 2019

Как показано ниже, если вы не хотите писать повторяющиеся x (i), вы можете обработать их в цикле.Пример 1:

val strpcols = mvnmdata.map(x => x.split('|'))
  .map(x =>{
    val xbuffer = new ArrayBuffer[String]()
    for (i <- Array(0,1,5,6...)){
      xbuffer.append(x(i))
    }
    xbuffer
  })

Если вы хотите определить индексный список только с началом и концом и числами, которые следует исключить, см. Пример 2 ниже:

scala> (1 to 10).toSet
res8: scala.collection.immutable.Set[Int] = Set(5, 10, 1, 6, 9, 2, 7, 3, 8, 4)

scala> ((1 to 10).toSet -- Set(2,9)).toArray.sortBy(row=>row)
res9: Array[Int] = Array(1, 3, 4, 5, 6, 7, 8, 10)

Последний код, который вы хотите:

  //define the function to process indexes
  def getSpecIndexes(start:Int, end:Int, removedValueSet:Set[Int]):Array[Int] = {
    ((start to end).toSet -- removedValueSet).toArray.sortBy(row=>row)
  }

  val strpcols = mvnmdata.map(x => x.split('|'))
    .map(x =>{
      val xbuffer = new ArrayBuffer[String]()
      //call the function
      for (i <- getSpecIndexes(0,100,Set(3,4,5,6))){
        xbuffer.append(x(i))
      }
      xbuffer
    })
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...