Как отправить паркет в кафку партиями, используя strcutured spark streaming? - PullRequest
0 голосов
/ 22 января 2019

Я читаю файлы паркета и конвертирую его в формат JSON, а затем отправляю в kafka.Вопрос в том, что он прочитал весь паркет, поэтому отправьте его в kafka один раз, но я хочу отправить данные json построчно или партиями:

object WriteParquet2Kafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("yarn")
      .appName("Write Parquet to Kafka")
      .getOrCreate()

    import spark.implicits._
    val ds: DataFrame = spark.readStream
      .schema(parquet-schema)
      .parquet(path-to-parquet-file)


    val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_*  ) ) as "value" )
      .filter($"key" isNotNull)

    val ddf = df
      .writeStream
      .format("kafka")
      .option("topic", topics)
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("checkpointLocation", "/tmp/test")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    ddf.awaitTermination()
  }
}

Возможно ли это сделать?

1 Ответ

0 голосов
/ 26 марта 2019

Я, наконец, выясняю, как решить мой вопрос, просто добавьте option и установите подходящее число для maxFilesPerTrigger:

    val df: DataFrame = spark
      .readStream
      .option("maxFilesPerTrigger", 1)
      .schema(parquetSchema)
      .parquet(parqurtUri)

Примечание: maxFilesPerTrigger должно быть равно 1, так чтокаждая паркетная папка читается.

...