Как заполнить столбец значением, взятым из (не смежной) предыдущей строки без естественного ключа разделения, используя Spark Scala DataFrame - PullRequest
0 голосов
/ 24 марта 2020

Вход:

val input = Seq(
     |   ("1 February"),
     |   ("n"),
     |   ("c"),
     |   ("b"),
     |   ("2 February"),
     |   ("h"),
     |   ("w"),
     |   ("e"),
     |   ("3 February"),
     |   ("y"),
     |   ("s"),
     |   ("j")
     | ).toDF("lines")

input.show

Вход выглядит следующим образом:

+----------+
|     lines|
+----------+
|1 February|
|         n|
|         c|
|         b|
|2 February|
|         h|
|         w|
|         e|
|3 February|
|         y|
|         s|
|         j|
+----------+

Требуемый вывод:

val output = Seq(
     |   ("1 February", "n"),
     |   ("1 February", "c"),
     |   ("1 February", "b"),
     |   ("2 February", "h"),
     |   ("2 February", "w"),
     |   ("2 February", "e"),
     |   ("3 February", "y"),
     |   ("3 February", "s"),
     |   ("3 February", "j")
     | ).toDF("date", "lines")

output.show

Требуемый вывод выглядит следующим образом:

+----------+-----+
|      date|lines|
+----------+-----+
|1 February|    n|
|1 February|    c|
|1 February|    b|
|2 February|    h|
|2 February|    w|
|2 February|    e|
|3 February|    y|
|3 February|    s|
|3 February|    j|
+----------+-----+

Я думаю об использовании функции задержки в scala Spark dataframe, но на самом деле не смог разобраться после нескольких часов. У кого-нибудь есть идея? Большое спасибо.

Ответы [ 3 ]

1 голос
/ 24 марта 2020

Масштаб записей и другие непонятные вещи.

Имхо, это пример плохой подачи, делающей жизнь более трудной и сложной, и если в масштабе ситуация такова, что соединение большой таблицы с менее большой таблицей присоединиться, что-то Спарк не так хорош в. Я не уверен в лучшей стратегии разделения, поскольку есть проблемы, но если у вас есть большие данные, вы можете разделить их по месяцам (, годам). Или даже по неделям и добавить в выходной магазин.

Вам нужно будет объединить поля, но вот решение:

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window

// Assume sorted input limited to a year? Unclear from the question. Volumes also not clear. Added some extra input.

val df = Seq(
    ("1 February"),
    ("n"),
    ("c"),
    ("b"),
    ("2 February"),
    ("h"),
    ("w"),
    ("e"),
    ("3 February"),
    ("y"),
    ("s"),
    ("j"),
    ("1 March"),
    ("c"),
    ("b"),
    ("x")
   ).toDF("line")

// Fill the other months in accordingly.
// Years aspect? Not considered.

val m = Map("january" -> 1,"february" -> 2,"march" -> 3, "april" -> 4)
val dfM = m.toSeq.toDF("month", "monthNum")
// monthNum not used in hindsight, but left in.

// Add a non-consecutive Sequence Nr. This not an issue as we use range checking later on, not a previous value of -1 relative to a Sequence Nr to check directly against.
// Partition position retained. The other option is .zipWithIndex with rdd conversion and back again to df. 
// monotonically_increasing_id is non-deterministic but it should be OK here as far as I can see.

val df2 = df.withColumn("idx", monotonically_increasing_id())

val df3 = df2.withColumn("_tmp", split($"line", " "))
             .select($"idx",
                 $"_tmp".getItem(0).as("col1"),
                 $"_tmp".getItem(1).as("col2")
                    )
            .drop("_tmp")

val df4 = (df3.join(broadcast(dfM), lower(df3.col("col2")) === dfM.col("month")).drop("month")) 

val w = org.apache.spark.sql.expressions.Window.orderBy("idx")  
val df5 = df4.withColumn("idxNextDate", (lead("idx", 1, 999999999999L).over(w)-1)).toDF("idx1","dte","mth", "mthNum", "idxNextDate")

// Not sure about performance, quite tricky, get large table / large table situation which has no real performance solution with Spark except for brute force parallelism.
// Could filter out the non-Null stuff.
val df6 = df3.filter($"col2".isNull)
val df7 = df6.join(df5, ($"idx" >= $"idx1") && ($"idx" <= $"idxNextDate"))
             .select($"dte", $"mth", $"col1".alias("lines"))
df7.show(false)

возвращает:

+---+--------+-----+
|dte|mth     |lines|
+---+--------+-----+
|1  |February|n    |
|1  |February|c    |
|1  |February|b    |
|2  |February|h    |
|2  |February|w    |
|2  |February|e    |
|3  |February|y    |
|3  |February|s    |
|3  |February|j    |
|1  |March   |c    |
|1  |March   |b    |
|1  |March   |x    |
+---+--------+-----+

Я подумаю о разбиении, но это кажется трудным В этом алгоритме определиться с подходящей стратегией. Я не уверен, что это элегантно возможно разделить, это хороший пример обработки, который не вписывается в стиль обработки Spark. Я могу быть исправлен, хотя.

ОБНОВЛЕНИЕ Не удается найти подходящее разбиение. Поэтому необходимо обрабатывать небольшие наборы.

В этом случае рекомендуется адаптировать канал так, чтобы сделать вещи тривиальными, а не заставлять людей работать для решения такой проблемы.

ОБНОВЛЕНИЕ 2 Найден творческий параллельный подход, нужно подумать, публиковать или нет.

0 голосов
/ 31 марта 2020

Предыдущий ответ, который я разместил, был правильным, но менее распространенным. Из .explain () мы могли наблюдать Exchange SinglePartition.

Это решение работает параллельно - в физическом плане не найдено ни единого раздела. В масштабе это выглядит хорошо, но .explain становится трудно читать. Это сложный приказ, если вы вообще не хотите об этом думать, так что, возможно, не всем.

Из-за очевидных преобладающих обстоятельств я мог справиться с этим. Это также мое последнее усилие здесь по Data Wrangling, поскольку оно не стоит потенциальных 15 + 10 баллов. Тем не менее, в моем другом ответе также говорится, что подача должна быть улучшена, что будет моим профессиональным советом. Следовательно, это решение, основанное на свободном времени, но оно выделяет несколько моментов и недостатков. Предполагается, что разбиение диапазона является надежным, проверив с помощью .glom, что это действительно так.

Слишком сложно объяснить, посмотреть комментарии и выводы.

Код запущен на ноутбуке:

// May be academic. If low volume of data then my other solution can suffice, this is a distributed+ solution.
// Need 2 repartitions, interesting concept that turns out to be true.   
// Runs in parallel but the .explain hard to interprete.
// By extracting twice carefully from parent RDD, hoping that partition awareness is retained and that whole partiton moved to other executor with other partition so as to avoid hashing.
// Not certain if range Partitioning needs to check the ranges.
// Some educational coding aspects left in.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.functions.{lead, lag}
import spark.implicits._

// Gen example data via DF, can come from files, ordering in those files assumed. I.e. no need to sort.
val df = Seq(
 ("1 February"), ("n"), ("c"), ("b"), 
 ("2 February"), ("hh"), ("www"), ("e"), 
 ("3 February"), ("y"), ("s"), ("j"),
 ("1 March"), ("c"), ("b"), ("x"),
 ("1 March"), ("c"), ("b"), ("x"),
 ("2 March"), ("c"), ("b"), ("x"),
 ("3 March"), ("c"), ("b"), ("x"), ("y"), ("z")
 ).toDF("line")

// Define Case Classes to avoid Row aspects on df --> rdd --> to DF which I always must look up again.
case class X(line: String)
case class Xtra(key: Long, line: String)

// Add the Seq Num using zipWithIndex.
val rdd = df.as[X].rdd.zipWithIndex().map{case (v,k) => (k,v)}
val ds = rdd.toDF("key", "line").as[Xtra]

// An idea that was not totally utilized but left in.
val m = Map("january" -> 1,"february" -> 2,"march" -> 3, "april" -> 4)
val dfMth = m.toSeq.toDF("monthM", "monthNum")

// Num partitions for processing. 
// Apply Range Partititioning to allow subsequent distributed processing as opposed to Exchange SinglePartition.

val n = 5 // Can be altered, set here. This value drives parallel processing.
sqlContext.setConf("spark.sql.shuffle.partitions", "5")

// Now records the partition the data is in. The issues is that there is no actual partitioning key and that the data of relevance can be in the next partition
// as well (at most one is the assumption made!), but we wish to have partitioned processing.
val rdd2 = ds.repartitionByRange(n, $"key")
   .rdd 
   .mapPartitionsWithIndex((index, iter) => {
      iter.map(x => (index, x ))   
    })
val df2 = rdd2.toDF("part", "line_data").cache

// Separate data into separate fields. 
val df3 = df2.withColumn("_tmp", split($"line_data.line", " "))
             .select($"part", $"line_data.key".as("key"),
                     $"_tmp".getItem(0).as("val"),
                     $"_tmp".getItem(1).as("month")
                    )
             .drop("_tmp")

// Data with partitioning and seq numbering and some tidy up of fields.
val df4 = df3.withColumn("val",   when(col("month").isNull, expr("substring(val,2,length(val)-2)")).otherwise(expr("substring(val,2,length(val))")))        
             .withColumn("month", expr("substring(month,1,length(month)-1)"))
val df44 = df4.withColumn("partX", $"part")

// Get first N rows per partition, N = 4 here, just an example.
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(df4.schema)
val df100 = df4.mapPartitions(iterator => iterator.take(4))
val df100U = df100.withColumn("partX", $"part" - 1)  
// Add 1st N rows of next partition to prior partition, except for last partition. Can be set higher if you think more data from previous entry spills over.
val dfC = df44.union(df100U)

// Repartition so that parallellization processing gives correct answer. Assuming Catalyst is partition aware. 
// Any other way means lose peformance. This is the parent RDD so we hope that Catalyst is smart enoygh.
val dfD = dfC.repartitionByRange(n, $"partX").cache() // Used twice.

// Get those values with a month entry that are the break points and get the adjacent values for comparison in next step.
val df55 = dfD.join(broadcast(dfMth), lower(dfD.col("month")) === dfMth.col("monthM")).drop("monthM").drop("monthNum")
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("partX").orderBy("key")  
val df555 = df55.withColumn("idxNextDate", (lead("key", 1, 999999999999L).over(w)-1)).toDF("part1", "key1", "val1", "month1", "partX", "idxNextDate").cache() 
// LKP table to be compared against with same range partition.

// Get the 'value' entries, not the month, date aspects.
val df66 = dfD.filter( $"month".isNull && $"partX" > -1)

// Make the calculation - in parallel, how well is not clear from DAG.
val dfZ = df66.join( df555, (df66("partX") === df555("partX")) 
                             && ($"key" geq $"key1") && ($"key" <= $"idxNextDate")
                             && ($"part1" === df555("partX")) 
                            && (df555("partX") =!= -1) )
              .select($"val1".alias("date"), $"month1".alias("month"), $"val".alias("lines"), $"key", df555("partX").alias("PX")).orderBy("PX", "key")//.explain()
dfZ.show(100)

Как всегда, для обеспечения безопасности при параллельной обработке необходимо выполнить сортировку, что видно из результатов - здесь не показано. Но это зависит от варианта использования. Процесс сортировки не может увидеть, что данные на самом деле уже отсортированы из-за перераспределения и отрисовки из того же родительского RDD и узких преобразований. Я все это проверил.

== Physical Plan ==
Sort [PX#2022 ASC NULLS FIRST, key#1758L ASC NULLS FIRST], true, 0 
... 

возвращает:

+----+--------+-----+---+---+
|date|   month|lines|key| PX|
+----+--------+-----+---+---+
|   1|February|    n|  1|  0|
|   1|February|    c|  2|  0|
|   1|February|    b|  3|  0|
|   2|February|   hh|  5|  0|
|   2|February|  www|  6|  0|
|   2|February|    e|  7|  0|
|   3|February|    y|  9|  1|
|   3|February|    s| 10|  1|
|   3|February|    j| 11|  1|
|   1|   March|    c| 13|  2|
|   1|   March|    b| 14|  2|
|   1|   March|    x| 15|  2|
|   1|   March|    c| 17|  2|
|   1|   March|    b| 18|  2|
|   1|   March|    x| 19|  2|
|   2|   March|    c| 21|  3|
|   2|   March|    b| 22|  3|
|   2|   March|    x| 23|  3|
|   3|   March|    c| 25|  4|
|   3|   March|    b| 26|  4|
|   3|   March|    x| 27|  4|
|   3|   March|    y| 28|  4|
|   3|   March|    z| 29|  4|
+----+--------+-----+---+---+

Вам не нужна сортировка, но вопрос немного неясен, если сортировка на самом деле требуется. Я думаю, улучшить подачу.

0 голосов
/ 27 марта 2020
val input1 = input.withColumn("RowId", monotonicallyIncreasingId).withColumn("Date", col("lines"))
val len = udf((str: String) => str.size)
val rowIds = input1.filter(len(col("lines")) < 13 && len(col("lines")) > 4).select("RowId").collect.map(_(0)).map(_.toString.toInt)
val rowIdSlided = (rowIds :+ input.count).sliding(2).toList
val output = rowIdSlided.foldLeft(input1)((acc, rowIdList) => {acc.withColumn("Date", when((col("RowId") > rowIdList(0) && col("RowId") < rowIdList(1)), ({val date = input1.filter(col("RowId") === rowIdList(0)).select("lines").collect.map(_(0)).map(_.toString); date(0)})).otherwise(col("Date")))}).drop("RowId")

Это должно дать вывод:

+----------+----------+
|     lines|      Date|
+----------+----------+
|1 February|1 February|
|         n|1 February|
|         c|1 February|
|         b|1 February|
|2 February|2 February|
|         h|2 February|
|         w|2 February|
|         e|2 February|
|3 February|3 February|
|         y|3 February|
|         s|3 February|
|         j|3 February|
+----------+----------+
...