Источником является CSV-файл:
id,sale,date
1,100,201901
1,105,201902
1,107,201904
1,108,201905
2,10,201901
2,11,201902
2,12,201904
2,13,201905
Речь идет о некоторых продажах продуктов, 1,100,201901
означает от начала до даты 201901,100 продуктов с идентификатором 1, которые были проданы.1,105,201902
означает, что с начала до даты 201902,105 товаров с идентификатором 1 было продано. Так, во втором месяце 2019 года было продано только 5 товаров.
что я ожидаюдобавьте к нему столбец, используя apache spark, который обозначает, сколько товаров было продано в текущем месяце.Ожидаемый результат:
id,sale,date,inc
1,100,201901,0
1,105,201902,5
1,107,201904,2
1,108,201905,1
2,10,201901,1
2,11,201902,1
2,12,201904,1
2,13,201905,1
В реальном случае это пакетные задания.
Я устал использовать соединение (код ниже), я не уверен, стоит ли использовать накопительный пакетили куб, или аккумулятор.
Если мы выполняем пакетное задание каждый месяц, кажется, что все в порядке, проблема в том, что если через месяц мы забудем запустить пакетное задание, мы запустим его в следующем месяце.
Например, последняя строка кода покажет:
| id|sale| date|saleInc|
+---+----+------+-------+
| 1|2000|201901| null|
| 1|2005|201902| 5|
| 1|2007|201903| 7|
+---+----+------+-------+
, но на самом деле 201903 saleInc должно быть 2, а не 7, это должно быть 2007 - 2005
, но не 2007-2000
Это просто мой код, вы не можете зависеть от него, вы можете использовать другой способ.
package increamental.test
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
//import com.qydata.stock.db._//a02z10 av1049 1yue29
import scala.reflect.api.materializeTypeTag
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
import scala.xml.dtd.Scanner
object D20190123 {
def main(args: Array[String]){
var sparkConf = new SparkConf().setMaster("local[1]")//.set("spark.default.parallelism","1").set("spark.streaming.blockInterval", "1").set("spark.shuffle.sort.bypassMergeThreshold", "1").set("spark.executor.cores", "1") .set("spark.executor.cores", "1")
// .set("spark.cores.max", "1")
val builder = SparkSession.builder().config(sparkConf)//.enableHiveSupport()
val ss = builder.getOrCreate()
import ss.implicits._
ss.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
var sc = ss.sparkContext
sc.setLogLevel("error");
var hive=Seq.empty[( Int,Int,String,Int)].toDF("id","sale","date","saleInc")
println("====hive"); hive.show()
val mongo1=Seq((1,2000,"201901")).toDF("id","sale","date");
println("====mongo1"); mongo1.show()
val newOfMongo1= mongo1.where('date>197001)
println("====newOfMongo1"); newOfMongo1.show()
val saleInHive1=hive.groupBy("id").agg('id,max('sale) as "mx").select($"id" as "hid",'mx)
println("====saleInHive1");saleInHive1.show()
val hiveAppend1=newOfMongo1.join(saleInHive1,'id==='hid,"left").withColumn("saleInc", 'sale-'mx)
.select("id","sale","date","saleInc")
println("====hiveAppend1");hiveAppend1.show()
hive=hive.union(hiveAppend1)
println("====hive"); hive.show()
/* second batch may be missed
*
// var hive=mongo1.select('id, 'sale,lit(0) as 'saleInc)//Seq((1,2000,0)).toDF("id","sale","saleInc")
val mongo2=Seq((1,2000,"201901"),(1,2005,"201902")).toDF("id","sale","date")
println("====mongo2"); mongo2.show()
val newOfMongo2= mongo2.where('date>201901)
println("====newOfMongo2"); newOfMongo2.show()
val saleInHive2=hive.groupBy("id").agg('id,max('sale) as "mx").select($"id" as "hid",'mx)
println("====saleInHive2");saleInHive2.show()
val hiveAppend2=newOfMongo2.join(saleInHive2,'id==='hid,"left").withColumn("saleInc", 'sale-'mx)
.select("id","sale","date","saleInc")
println("====hiveAppend2");hiveAppend2.show()
hive=hive.union(hiveAppend2)
println("====hive"); hive.show()
*/
val mongo3=Seq((1,2000,"201901"),(1,2005,"201902"),(1,2007,"201903")).toDF("id","sale","date")
println("====mongo3"); mongo3.show()
val newOfMongo3= mongo3.where('date>201901)//02
println("====newOfMongo3"); newOfMongo3.show()
val saleInHive3=hive.groupBy("id").agg('id,max('sale) as "mx").select($"id" as "hid",'mx)
println("====saleInHive3"); saleInHive3.show()
val hiveAppend3=newOfMongo3.join(saleInHive3,'id==='hid,"left").withColumn("saleInc", 'sale-'mx)
.select("id","sale","date","saleInc")
println("====hiveAppend3");hiveAppend3.show()
hive=hive.union(hiveAppend3)
println("====hive");hive.show()
}
}