Избегайте записи файлов для пустых разделов в Spark Streaming - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть задание Spark Streaming, которое считывает данные из разделов kafka ( один исполнитель на раздел ).
Мне нужно сохранить преобразованные значения в HDFS, но нужно избегать создания пустых файлов.
Я пытался использовать isEmpty, но это не помогает, когда не все разделы пусты.

Перераспределение PS не является приемлемым решением из-за ухудшения производительности.

1 Ответ

0 голосов
/ 30 ноября 2018

Код работает только для PairRDD.

Код для текста:

  val conf = ssc.sparkContext.hadoopConfiguration
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[TextOutputFormat[Text, NullWritable]]
    classOf[OutputFormat[Text, NullWritable]])

  kafkaRdd.map(_.value -> NullWritable.get)
    .saveAsNewAPIHadoopFile(basePath,
      classOf[Text],
      classOf[NullWritable],
      classOf[LazyOutputFormat[Text, NullWritable]],
      conf)

Код для avro:

  val avro: RDD[(AvroKey[MyEvent], NullWritable)]) = ....
  val conf = ssc.sparkContext.hadoopConfiguration

  conf.set("avro.schema.output.key", MyEvent.SCHEMA$.toString)
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[AvroKeyOutputFormat[MyEvent]],
    classOf[OutputFormat[AvroKey[MyEvent], NullWritable]])

  avro.saveAsNewAPIHadoopFile(basePath,
    classOf[AvroKey[MyEvent]],
    classOf[NullWritable],
    classOf[LazyOutputFormat[AvroKey[MyEvent], NullWritable]],
    conf)

...