Обрабатывать каждый раздел и каждую строку в каждом разделе по одному - PullRequest
0 голосов
/ 23 сентября 2019

Вопрос:

У меня есть ниже 2 фреймов данных, хранящихся в массиве.Данные уже разделены по SECURITY_ID.

Dataframe 1 (DF1):
+-------------+----------+----------+--------+---------+---------+
| ACC_SECURITY|ACCOUNT_NO|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|
+-------------+----------+--------+---------+-----------+--------+
|9161530335G71|  91615303|1111    |     1000|      35G71|  -20000|
|9161530435G71|  91615304|2222    |     2000|      35G71|   -2883|
|9161530235G71|  91615302|3333    |     3000|      35G71|    2000|
|9211530135G71|  92115301|4444    |     4000|      35G71|    8003|
+-------------+----------+--------+---------+-----------+--------+

Dataframe 2 (DF2):
+-------------+----------+----------+--------+---------+---------+
| ACC_SECURITY|ACCOUNT_NO|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|
+-------------+----------+--------+---------+-----------+--------+
|3FA34789290X2|  3FA34789|5555    |     5000|      290X2|  -20000|
|32934789290X2|  32934789|6666    |     6000|      290X2|   -2883|
|00000019290X2|  00000019|7777    |     7000|      290X2|    2000|
|3S534789290X2|  3S534789|8888    |     8000|      290X2|    8003|
+-------------+----------+--------+---------+-----------+--------+

Пробная версия:

Как мне обрабатывать каждый фрейм данных отдельно, а под каждым фреймом данных я хочу обрабатывать по одной строке за раз.Я попробовал следующее

def methodA(d1: DataFrame): Unit {
    val securityIds = d1.select("SECURITY_ID").distinct.collect.flatMap(_.toSeq)
    val bySecurityArray = securityIds.map(securityIds => d1.where($"SECURITY_ID" <=> securityIds))

    for(i <- 0 until bySecurityArray.length) {
        allocOneDF = bySecurityArray(i).toDF()
        print("Number of partitions: " + allocProcessDF.rdd.getNumPartitions)
        methodB(allocProcessDF)
    }
} 

def methodA(d1: DataFrame): Unit {
    import org.apache.spark.api.java.function.ForeachPartitionFunction
    df.foreachPartition(ds => {

    //Tried below while and also foreach... its same result.
    //Option 1  
    while (ds.hasNext) {
        allocProcess(ds.next())
    }

    //Option 2
    ds.foreach(row => allocProcess(row))

    })

}

Я пытался обработать - используя foreachpartition на каждом Dataframe, поступающем из bySecurityArray - затем обработать каждую строку из результирующего набора данных (после foreachpartition), используя foreach

Но ясм. только первый исполняемый фрейм данных (SECURITY_ID = 35G71), но не второй фрейм данных (290X2).

Получено ошибок:

19/09/23 08:57:38 ERROR util.Utils: Exception encountered
java.io.StreamCorruptedException: invalid type code: 30
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1601)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1371)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:376)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/09/23 08:57:38 ERROR util.Utils: Exception encountered

19/09/23 08:57:38 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 218.0 (TID 10452, CANTSHARE_URL, executor 6): java.io.StreamCorruptedException: invalid type code: 30
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1601)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
    at java.lang.Thread.run(Thread.java:748)

19/09/23 08:57:38 INFO scheduler.DAGScheduler: ShuffleMapStage 218 (run at ThreadPoolExecutor.java:1149) failed in 0.120 s due to Job aborted due to stage failure: Task 9 in stage 218.0 failed 4 times, most recent failure: Lost task 9.3 in stage 218.0 (TID 10466, CANTSHARE_URL, executor 6): java.io.StreamCorruptedException: invalid type code: 30
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:376)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 24 сентября 2019

Spark не сохраняет порядок, так как данные распределены по разделу, в пределах порядка раздела все еще не гарантируется, так как может быть несколько задач.Для получения логического порядка coalesce (1) с последующей сортировкой (cols: *) можно применить операцию над Datafame, чтобы получить новый Datafame / Dataset, отсортированный по указанным столбцам, все в порядке возрастания.

def methodA(d1: DataFrame): Unit = {
val securityIds = d1.select("SECURITY_ID").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityId => d1.where(d1("SECURITY_ID") === securityId))

for (i <- 0 until bySecurityArray.length) {
  val allocOneDF = bySecurityArray(i).toDF()
  print("Number of partitions: " + allocOneDF.rdd.getNumPartitions)
  methodB(allocOneDF)
 }
}

def methodB(df: DataFrame): Unit = {
df.coalesce(1).sort("LONG_IND", "SHORT_IND").foreach(row => {
  println(row)
  //allocProcess(row)
 })
}
...