Разделить RDD [String] на RDD [кортежи] - PullRequest
0 голосов
/ 30 апреля 2019

Я новичок в Scala и RDD. Я использую Scala на Spark 2.4. У меня есть RDD [String] с такими строками:

(a, b, c, d, ...)

Я бы хотел разделить эту строку в каждой коме, чтобы получить RDD[(String, String, String, ...)].

Решения, подобные следующим, очевидно, невозможны в отношении количества элементов.

rdd.map(x => (x.split(",")(0), x.split(",")(1), x.split(",")(2)))

Может быть, есть способ автоматизировать это? Все работает будет хорошо.

Несмотря на мои усилия, у меня пока нет решения моей проблемы,

Большое спасибо!

Ответы [ 3 ]

3 голосов
/ 30 апреля 2019

Одним из решений является написать функцию отображения:

def parse(s: String) = s.split(",") match {
    case Array(a,b,c) => (a,b,c)
}

parse("x,x,x") // (x,x,x)

Вы можете написать более общее решение, используя бесформенное:

def toTuple[H <: HList](s: String)(implicit ft: FromTraversable[H], t: Tupler[H]) = s.split(",").toHList[H].get.tupled

тогда вы можете использовать его напрямую:

toTuple[String :: String :: String :: HNil]("x,x,x") // (x,x,x)
toTuple[String :: String :: HNil]("x,x") // (x,x)

или исправьте, затем введите и используйте его:

def parse3(s: String) = toTuple[String :: String :: String :: HNil](s)

parse3("x,x,x") // (x,x,x)
3 голосов
/ 30 апреля 2019

Если количество элементов фиксировано, вы можете сделать что-то вроде:

val tuples =
  rdd
    .map(line => line.replaceAll("[\\(\\)]", "").split(","))
    .collect {
      case Array(col1, col2, ..., coln) => (col1, col2, ..., coln)
    }
// tuples: RDD[(String, String, ..., String)]
2 голосов
/ 01 мая 2019

Обратите внимание, что максимальный размер кортежа ограничен 22, поэтому перечислить их все будет не так долго ...

Кстати, в книге Искра в действии , на странице 110 записано:

Нет элегантного способа преобразования массива в кортеж, поэтому вы должны прибегнуть к этому уродливому выражению:

scala> val itPostsRDD = itPostsSplit.map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10), x(11), x(12))
itPostsRDD: org.apache.spark.rdd.RDD[(String, String, ...
...