Объедините два списка с одним другим элементом - PullRequest
0 голосов
/ 27 сентября 2019

Я новичок в Scala и Spark, и я не знаю, как это сделать.

Я предварительно обработал файл CSV, в результате чего появился СДР, содержащий списки в этом формате:

List("2014-01-01T23:56:06.0", NaN, 1, NaN)
List("2014-01-01T23:56:06.0", NaN, NaN, 2)

Все списки имеют одинаковое количество элементов.

Я хочу объединить списки, имеющие один и тот же первый элемент (отметку времени).Например, я хочу, чтобы эти два примера списков создавали только один список со следующими значениями:

List("2014-01-01T23:56:06.0", NaN, 1, 2)

Спасибо за вашу помощь:)

Ответы [ 2 ]

1 голос
/ 27 сентября 2019
    # Below can help you in achieving your target

    val input_rdd1 = spark.sparkContext.parallelize(List(("2014-01-01T23:56:06.0", "NaN", "1", "NaN")))
    val input_rdd2 = spark.sparkContext.parallelize(List(("2014-01-01T23:56:06.0", "NaN", "NaN", "2")))
    //added one more row for your data
    val input_rdd3 = spark.sparkContext.parallelize(List(("2014-01-01T23:56:06.0", "2", "NaN", "NaN")))
    val input_df1 = input_rdd1.toDF("col1", "col2", "col3", "col4")
    val input_df2 = input_rdd2.toDF("col1", "col2", "col3", "col4")
    val input_df3 = input_rdd3.toDF("col1", "col2", "col3", "col4")

    val output_df = input_df1.union(input_df2).union(input_df3).groupBy($"col1").agg(min($"col2").as("col2"), min($"col3").as("col3"), min($"col4").as("col4"))

    output_df.show

output:
+--------------------+----+----+----+
|                col1|col2|col3|col4|
+--------------------+----+----+----+
|2014-01-01T23:56:...|   2|   1|   2|
+--------------------+----+----+----+
0 голосов
/ 27 сентября 2019

Если значения хвоста массива являются двойными, их можно реализовать следующим образом (как рекомендует sachav):

val original = sparkContext.parallelize(
  Seq(
    List("2014-01-01T23:56:06.0", NaN, 1.0, NaN),
    List("2014-01-01T23:56:06.0", NaN, NaN, 2.0)
  )
)

val result = original
  .map(v => v.head -> v.tail)
  .reduceByKey(
    (acc, curr) => acc.zip(curr).map({ case (left, right) => if (left.asInstanceOf[Double].isNaN) right else left }))
  .map(v => v._1 :: v._2)

result.foreach(println)

Вывод:

List(2014-01-01T23:56:06.0, NaN, 1.0, 2.0)
...