Соединение LeftOuter не работает для потоковой передачи с искрой - PullRequest
0 голосов
/ 04 февраля 2019

Я пытаюсь выполнить левое внешнее соединение для потоковой структурированной искры, но строки, данные которых недоступны в правом Df, сбрасываются.Вот мой код:

Основной класс:

object LeftOuterTesting {

def main (args: Array [String]): Unit = {logger.setStreamingLogLevels ()

val spark = SparkSession.builder()
  .appName("leftOuter")
  .master("local[4]")
  .config("spark.debug.maxToStringFields", 50)
  .getOrCreate()

val leftSchema = new StructType()
  .add("left", ArrayType(
    new StructType()
      .add("name", StringType)
      .add("id", StringType)
      .add("time_stamp", StringType)
  ))

val rightSchema = new StructType()
  .add("right", ArrayType(
    new StructType()
      .add("right_name", StringType)
      .add("id", StringType)
      .add("timestamp", StringType)
  ))


var leftStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "left")
  .option("failOnDataLoss", "true")
  .load()

var rightStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "right")
  .option("failOnDataLoss", "true")
  .load()

import spark.implicits._

leftStream = leftStream.select($"value" cast "string" as "json")
  .select(from_json($"json", leftSchema) as "left").select("left.*")
  .select(explode(col("left")).as("list")).select("list.*")
  .withColumnRenamed("time_stamp", "temp_time_stamp")
  .withColumn("time_stamp", $"temp_time_stamp".cast("timestamp"))
  .drop("temp_time_stamp")
  .withWatermark("time_stamp", "60 seconds")

rightStream = rightStream.select($"value" cast "string" as "json")
  .select(from_json($"json", rightSchema) as "right").select("right.*")
  .select(explode(col("right")).as("list")).select("list.*")
  .withColumnRenamed("timestamp", "temp_timestamp")
  .withColumn("timestamp", $"temp_timestamp".cast("timestamp"))
  .drop("temp_timestamp")
  .withWatermark("timestamp", "60 seconds")

leftStream.printSchema()
rightStream.printSchema()

val joinedStream = leftStream.join(rightStream,
  expr(
    """
                                                          name = right_name AND
                                                          timestamp >= time_stamp AND
                                                          timestamp <= time_stamp + interval 1 minutes
                                                          """), "left_outer")


joinedStream.writeStream
  .outputMode("append")
  .option("truncate", false)
  .format("console")
  .start()
  .awaitTermination()

}}

Кажется, что выполняется внутреннее соединение независимо от предоставленного типа соединения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...