Spark структурированное и Dstream приложение пишет дубликаты - PullRequest
0 голосов
/ 14 февраля 2019

Мы пытаемся написать искровое потоковое приложение, которое будет писать в hdfs.Однако всякий раз, когда мы пишем файлы, появляется много дубликатов.Это поведение с или без сбоя приложения, используя kill.А также для Dstream и структурированного API.Источник - тема кафки.Поведение каталога контрольных точек звучит очень случайно.Я не сталкивался с очень важной информацией по этому вопросу.

Вопрос в том, может ли каталог контрольных точек обеспечивать поведение только один раз?

scala version: 2.11.8
spark version:  2.3.1.3.0.1.0-187
kafka version :  2.11-1.1.0
zookeeper version :  3.4.8-1 
HDP : 3.1

Любая помощь приветствуется.Спасибо, Гаутама

object sparkStructuredDownloading {
    val kafka_brokers="kfk01.*.com:9092,kfk02.*.com:9092,kfk03.*.com:9092"
    def main(args: Array[String]): Unit = {
        var topic = args(0).trim().toString()
        new downloadingAnalysis(kafka_brokers ,topic).process()
        }

}

class downloadingAnalysis(brokers: String,topic: String) {

    def process(): Unit = {
        //  try{
        val spark = SparkSession.builder()
        .appName("sparkStructuredDownloading")
        // .appName("kafka_duplicate")
        .getOrCreate()
        spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

        println("Application Started")
        import spark.implicits._
        import scala.concurrent.duration._
        import org.apache.spark.sql.streaming.{OutputMode, Trigger}
        import org.apache.spark.sql.streaming.Trigger

        val inputDf = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topic)
        .option("startingOffsets", "latest")
        //.option("kafka.group.id", "testduplicate")
        .load()
        val personJsonDf = inputDf.selectExpr("CAST(value AS STRING)") //Converting binary to text
        println("READ STREAM INITIATED")

        import scala.concurrent.duration._
        import org.apache.spark.sql.streaming.{OutputMode, Trigger}
        import org.apache.spark.sql.streaming.Trigger

        import spark.implicits._
        val filteredDF= personJsonDf.filter(line=> new ParseLogs().validateLogLine(line.get(0).toString()))

        spark.sqlContext.udf.register("parseLogLine", (logLine: String) => {
        val df1 = filteredDF.selectExpr("parseLogLine(value) as result")
        println(df1.schema)
        println("WRITE STREAM INITIATED")
        val checkpoint_loc="/warehouse/test_duplicate/download/checkpoint1"
        val kafkaOutput = result.writeStream
        .outputMode("append")
        .format("orc")
        .option("path", "/warehouse/test_duplicate/download/data1")
        .option("maxRecordsPerFile", 10)
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .start()
        .awaitTermination() 

}

...