Мы пытаемся написать искровое потоковое приложение, которое будет писать в hdfs.Однако всякий раз, когда мы пишем файлы, появляется много дубликатов.Это поведение с или без сбоя приложения, используя kill.А также для Dstream и структурированного API.Источник - тема кафки.Поведение каталога контрольных точек звучит очень случайно.Я не сталкивался с очень важной информацией по этому вопросу.
Вопрос в том, может ли каталог контрольных точек обеспечивать поведение только один раз?
scala version: 2.11.8
spark version: 2.3.1.3.0.1.0-187
kafka version : 2.11-1.1.0
zookeeper version : 3.4.8-1
HDP : 3.1
Любая помощь приветствуется.Спасибо, Гаутама
object sparkStructuredDownloading {
val kafka_brokers="kfk01.*.com:9092,kfk02.*.com:9092,kfk03.*.com:9092"
def main(args: Array[String]): Unit = {
var topic = args(0).trim().toString()
new downloadingAnalysis(kafka_brokers ,topic).process()
}
}
class downloadingAnalysis(brokers: String,topic: String) {
def process(): Unit = {
// try{
val spark = SparkSession.builder()
.appName("sparkStructuredDownloading")
// .appName("kafka_duplicate")
.getOrCreate()
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
println("Application Started")
import spark.implicits._
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.Trigger
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
//.option("kafka.group.id", "testduplicate")
.load()
val personJsonDf = inputDf.selectExpr("CAST(value AS STRING)") //Converting binary to text
println("READ STREAM INITIATED")
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
val filteredDF= personJsonDf.filter(line=> new ParseLogs().validateLogLine(line.get(0).toString()))
spark.sqlContext.udf.register("parseLogLine", (logLine: String) => {
val df1 = filteredDF.selectExpr("parseLogLine(value) as result")
println(df1.schema)
println("WRITE STREAM INITIATED")
val checkpoint_loc="/warehouse/test_duplicate/download/checkpoint1"
val kafkaOutput = result.writeStream
.outputMode("append")
.format("orc")
.option("path", "/warehouse/test_duplicate/download/data1")
.option("maxRecordsPerFile", 10)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
.awaitTermination()
}