Потоковое в паркет файлы не устраивает Flink 1.6.1 - PullRequest
0 голосов
/ 04 октября 2018

Я очень новичок, чтобы моргать (и паркет / хэдуп в этом отношении), поэтому я наверняка делаю что-то действительно глупое.Я пытаюсь создать приемник, который будет выгружать мой источник данных в файл паркета.

Мой код выглядит следующим образом:

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
val sink = StreamingFileSink.forBulkFormat(outputPath, ParquetAvroWriters.forReflectRecord(classOf[MyClass])).build()
testSource.addSink(sink)

К сожалению, я не получаю исключение, которое я былранее, но это все еще не генерирует правильный вывод.В настоящее время я получаю один файл .part-xxx с 4B данных в нем.В этом потоке содержится около 20 000 записей, поэтому это кажется неправильным.

Прежде чем я начал писать этот вопрос, я получал исключение для метода not found из ParquetAvroWriters.java в строке 84. Этот код выглядит следующим образом:

    return AvroParquetWriter.<T>builder(out)
            .withSchema(schema)
            .withDataModel(dataModel)
            .build();

Сигнатура метода AvroParquetWriter:

  public static <T> Builder<T> builder(Path file)

Но параметр во время вызова ParquetAvroWriters.java это StreamOutputFile, следовательно, нет ошибки метода.

IИспользую ссылку 1.6.1 и parquet-hadoop / parquet-avro 1.10.0.Как именно я должен настроить вещи, чтобы написать файл паркета?Это очень расстраивает - я даже не могу найти пример, который компилируется.Любая помощь будет принята с благодарностью!

Ответы [ 2 ]

0 голосов
/ 18 ноября 2018

Прочитав комментарии людей, я создал проект с тем же кодом (похожим), но вы можете скомпилировать и выполнить.

object CustomSource {

 case class TextOut(data:String )

 def generateRandomStringSource(out: SourceContext[TextOut]) = {
   val lines = Array("how are you", "you are how", " i am fine")
   while (true) {
    val index = Random.nextInt(3)
    Thread.sleep(200)
    out.collect(TextOut(lines(index)))
  }
}


def main(args: Array[String]) {

  val streamEnv = 
  StreamExecutionEnvironment.getExecutionEnvironment

  streamEnv.setParallelism(1)
  streamEnv.enableCheckpointing(10000, 
  CheckpointingMode.EXACTLY_ONCE)
  val sink = StreamingFileSink.forBulkFormat(new 
    Path("file:///tmp/test2"),
   ParquetAvroWriters.forReflectRecord(classOf[TextOut])).build()

  val customSource = streamEnv.addSource(generateRandomStringSource 
  _)

  customSource.print()

   customSource.addSink(sink)




   streamEnv.execute()

 }

}

Я создалпроект, чтобы показать, как работает, и минимальные вещи (jar, ect), что это необходимо.

Это ссылка: https://github.com/jose1003/flinkparquet

BR

Хосе

0 голосов
/ 06 октября 2018

Flink's StreamingFileSink при использовании массового формата автоматически использует OnCheckpointRollingPolicy.Это означает, что результаты материализуются только тогда, когда контрольная точка завершается.Это необходимо для обеспечения ровно одной гарантии обработки.

Я предполагаю, что вы используете CollectionSource в качестве тестового ввода и что обработка этого ввода занимает меньше, чем указанное 100ms.Следовательно, ни одна контрольная точка не может быть завершена, а также результаты не будут записаны.Flink не будет вызывать контрольную точку, когда вход полностью исчерпан.Таким образом, все события после последней завершенной контрольной точки не будут видны.

Попробуйте уменьшить интервал контрольной точки, увеличить количество элементов в вашем CollectionSource или написать свой собственный TestingSource extends SourceFunction, который выполняется по крайней мере какпока один интервал контрольной точки (например, во сне).Таким образом, Flink должен иметь возможность завершить контрольную точку и, таким образом, записать результаты в указанный каталог.

...