номер свечи минус тот же номер во время предварительного просмотра - PullRequest
0 голосов
/ 14 февраля 2019

Источником является CSV-файл:

id,sale,date
1,100,201901
1,105,201902
1,107,201904
1,108,201905
2,10,201901
2,11,201902
2,12,201904
2,13,201905

Речь идет о некоторых продажах продуктов, 1,100,201901 означает от начала до даты 201901,100 продуктов с идентификатором 1, которые были проданы.1,105,201902 означает, что с начала до даты 201902,105 товаров с идентификатором 1 было продано. Так, во втором месяце 2019 года было продано только 5 товаров.

что я ожидаюдобавьте к нему столбец, используя apache spark, который обозначает, сколько товаров было продано в текущем месяце.Ожидаемый результат:

id,sale,date,inc
1,100,201901,0
1,105,201902,5
1,107,201904,2
1,108,201905,1
2,10,201901,1
2,11,201902,1
2,12,201904,1
2,13,201905,1

В реальном случае это пакетные задания.

Я устал использовать соединение (код ниже), я не уверен, стоит ли использовать накопительный пакетили куб, или аккумулятор.

Если мы выполняем пакетное задание каждый месяц, кажется, что все в порядке, проблема в том, что если через месяц мы забудем запустить пакетное задание, мы запустим его в следующем месяце.

Например, последняя строка кода покажет:

| id|sale|  date|saleInc|
+---+----+------+-------+
|  1|2000|201901|   null|
|  1|2005|201902|      5|
|  1|2007|201903|      7|
+---+----+------+-------+

, но на самом деле 201903 saleInc должно быть 2, а не 7, это должно быть 2007 - 2005, но не 2007-2000

Это просто мой код, вы не можете зависеть от него, вы можете использовать другой способ.

package increamental.test
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
//import com.qydata.stock.db._//a02z10 av1049 1yue29
import scala.reflect.api.materializeTypeTag
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
import scala.xml.dtd.Scanner
object D20190123 {
     def main(args: Array[String]){

       var sparkConf = new SparkConf().setMaster("local[1]")//.set("spark.default.parallelism","1").set("spark.streaming.blockInterval", "1").set("spark.shuffle.sort.bypassMergeThreshold", "1").set("spark.executor.cores", "1") .set("spark.executor.cores", "1")
           // .set("spark.cores.max", "1")
       val builder =  SparkSession.builder().config(sparkConf)//.enableHiveSupport()  
       val ss =  builder.getOrCreate()  
       import ss.implicits._
       ss.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
       var sc = ss.sparkContext
       sc.setLogLevel("error");



      var hive=Seq.empty[( Int,Int,String,Int)].toDF("id","sale","date","saleInc")
      println("====hive"); hive.show()

       val mongo1=Seq((1,2000,"201901")).toDF("id","sale","date");
       println("====mongo1"); mongo1.show()
       val newOfMongo1= mongo1.where('date>197001)
       println("====newOfMongo1"); newOfMongo1.show()
       val saleInHive1=hive.groupBy("id").agg('id,max('sale) as "mx").select($"id" as "hid",'mx)
       println("====saleInHive1");saleInHive1.show()
       val hiveAppend1=newOfMongo1.join(saleInHive1,'id==='hid,"left").withColumn("saleInc", 'sale-'mx)
       .select("id","sale","date","saleInc")
       println("====hiveAppend1");hiveAppend1.show()
       hive=hive.union(hiveAppend1)
       println("====hive"); hive.show()

      /* second batch may be missed
       * 
//       var hive=mongo1.select('id, 'sale,lit(0) as 'saleInc)//Seq((1,2000,0)).toDF("id","sale","saleInc")
       val mongo2=Seq((1,2000,"201901"),(1,2005,"201902")).toDF("id","sale","date")
       println("====mongo2"); mongo2.show()
       val newOfMongo2= mongo2.where('date>201901)
       println("====newOfMongo2"); newOfMongo2.show()
       val saleInHive2=hive.groupBy("id").agg('id,max('sale) as "mx").select($"id" as "hid",'mx)
       println("====saleInHive2");saleInHive2.show()
       val hiveAppend2=newOfMongo2.join(saleInHive2,'id==='hid,"left").withColumn("saleInc", 'sale-'mx)
       .select("id","sale","date","saleInc")
       println("====hiveAppend2");hiveAppend2.show()
       hive=hive.union(hiveAppend2)
       println("====hive"); hive.show()
       */

       val mongo3=Seq((1,2000,"201901"),(1,2005,"201902"),(1,2007,"201903")).toDF("id","sale","date")
       println("====mongo3"); mongo3.show()
       val newOfMongo3= mongo3.where('date>201901)//02
       println("====newOfMongo3"); newOfMongo3.show()
       val saleInHive3=hive.groupBy("id").agg('id,max('sale) as "mx").select($"id" as "hid",'mx)
       println("====saleInHive3"); saleInHive3.show()
       val hiveAppend3=newOfMongo3.join(saleInHive3,'id==='hid,"left").withColumn("saleInc", 'sale-'mx)
       .select("id","sale","date","saleInc")
       println("====hiveAppend3");hiveAppend3.show()
       hive=hive.union(hiveAppend3)

       println("====hive");hive.show()

     }
}

1 Ответ

0 голосов
/ 14 февраля 2019

Итак, вы хотите lag

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

df.withColumn( 
  "inc",
   $"sale" - lag($"sale", 1).over(Window.partitionBy($"id").orderBy($"date")))
...