Невозможно записать данные формата json в путь при использовании структурированной потоковой передачи.При выполнении spark2-submit создаются только _spark_metadata - PullRequest
0 голосов
/ 16 мая 2019

Я записываю данные json из структурированной потоковой передачи Kafka в путь к файлу, и когда я делаю это из оболочки, я могу это сделать.Когда я скомпилировал его в jar и выполнил spark2-submit, создается только _spark_metadata, а данные не найдены

Я попытался сделать это из оболочки и смог увидеть файлы json в пути к файлу.Я компилирую программу, используя «sbt clean package», а затем пытаюсь запустить ее с помощью spark-submit, она не будет создавать никаких данных.

export SPARK_KAFKA_VERSION=0.10
spark2-submit --jars /opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/kafka-clients-0.9.0-kafka-2.0.2.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/spark-sql-kafka-0-10_2.11-2.3.0.cloudera5-20190107.080402-22.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/mongo-spark-connector_2.11-2.3.2.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/bson-3.9.0.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/mongo-java-driver-3.9.0.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/mongodb-driver-core-3.9.0.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/mongodb-driver-3.9.0.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/mongodb-driver-async-3.9.0.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/casbah-commons_2.11-2.8.2.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/casbah-core_2.11-2.8.2.jar,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/casbah-query_2.11-2.8.2.jar --class OSB_kafkaToSpark --master yarn --deploy-mode client /home/streaming_osb_2.11-0.1.0-SNAPSHOT.jar

import com.mongodb.client.MongoCollection
import com.mongodb.spark.config.WriteConfig
import com.mongodb.spark.{MongoConnector}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, _}
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql._
import org.bson.Document
import org.apache.spark.sql.streaming.Trigger

object OSB_kafkaToSpark {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.
      builder().
      appName("OSB_kafkaToSpark").
      getOrCreate()

println("SparkSession -> "+spark)

    import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bootstrap1.com:9092, bootstrap2.com:9092")
      .option("subscribe", "topictest")
      .option("failOnDataLoss", "false")
      .load()
      val dfs = df.selectExpr("CAST(value AS STRING)")

val data = dfs.withColumn("splitted", split($"value", "/"))
      .select($"splitted".getItem(4).alias("region"), $"splitted".getItem(5).alias("service"), col("value"))
      .withColumn("service_type", regexp_extract($"service", """.*(Inbound|Outbound|Outound).*""", 1))
      .withColumn("region_type", concat(
        when(col("region").isNotNull, col("region")).otherwise(lit("null")), lit(" "),
        when(col("service").isNotNull, col("service_type")).otherwise(lit("null"))))
      .withColumn("datetime", regexp_extract($"value", """\d{4}-[01]\d-[0-3]\d [0-2]\d:[0-5]\d:[0-5]\d""", 0))

    val extractedDF = data.filter(
      col("region").isNotNull &&
        col("service").isNotNull &&
        col("value").isNotNull &&
        col("service_type").isNotNull &&
        col("region_type").isNotNull &&
      col("datetime").isNotNull)
        .filter("region != ''")
        .filter("service != ''")
        .filter("value != ''")
        .filter("service_type != ''")
        .filter("region_type != ''")
        .filter("datetime != ''")

     val pathstring = "/user/spark_streaming".concat(args(0))

    val query = extractedDF.writeStream
      .format("json")
      .option("path", pathstring)
      .option("checkpointLocation", "/user/checkpoint")
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("20 seconds"))
      .start()
query.stop()
  }
}

Я хотел бы сохранить потоковые данные в папку при запуске spark2-submit

1 Ответ

0 голосов
/ 23 мая 2019

Я понял ответ, и мне нужно будет использовать query.awaitTermination ()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...