Генерация одного выходного файла для каждого обработанного входного файла в Apach Flink - PullRequest
0 голосов
/ 17 сентября 2018

Я использую Scala и Apache Flink для создания ETL, который периодически читает все файлы в каталоге в моей локальной файловой системе и записывает результат обработки каждого файла в один выходной файл в другом каталоге.

Итак, пример этого будет:

/dir/to/input/files/file1
/dir/to/intput/files/fil2
/dir/to/input/files/file3

и вывод ETL будет точно:

/dir/to/output/files/file1
/dir/to/output/files/file2
/dir/to/output/files/file3

Я пробовал различные подходы, включая сокращение параллельной обработки до единицы при записи в приемник данных, но я все еще не могу достичь требуемого результата.

Это мой текущий код:

   val path = "/path/to/input/files/"
   val format = new TextInputFormat(new Path(path))
   val socketStream = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)


   val wordsStream = socketStream.flatMap(value => value.split(",")).map(value => WordWithCount(value,1))

   val keyValuePair = wordsStream.keyBy(_.word)

   val countPair = keyValuePair.sum("count")

   countPair.print()

   countPair.writeAsText("/path/to/output/directory/"+
     DateTime.now().getHourOfDay.toString
     +
     DateTime.now().getMinuteOfHour.toString
     +
     DateTime.now().getSecondOfMinute.toString
     , FileSystem.WriteMode.NO_OVERWRITE)

// The first write method I trid:

   val sink = new BucketingSink[WordWithCount]("/path/to/output/directory/")
   sink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm"))

// The second write method I trid:

   val sink3 = new BucketingSink[WordWithCount]("/path/to/output/directory/")
   sink3.setUseTruncate(false)
   sink3.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
   sink3.setWriter(new StringWriter[WordWithCount])
   sink3.setBatchSize(3)
   sink3.setPendingPrefix("file-")
   sink3.setPendingSuffix(".txt")

Оба метода записи не дают желаемого результата.

Может кто-нибудь с опытом работы с Apache Flink подскажет мне подход к написанию.

1 Ответ

0 голосов
/ 18 сентября 2018

Я решил эту проблему, импортировав следующие зависимости для запуска на локальном компьютере:

  • Hadoop-AWS-2.7.3.jar
  • AWS-ява-СДК-s3-1.11.183.jar
  • AWS-ява-СДК-ядро-1.11.183.jar
  • AWS-ява-СДК-км-1.11.183.jar
  • ДЖЕКСОН-аннотаций-2.6.7.jar
  • ДЖЕКСОН-ядро-2.6.7.jar
  • ДЖЕКСОН-DataBind-2.6.7.jar
  • Joda-времени 2.8.1.jar
  • httpcore-4.4.4.jar
  • HttpClient-4.5.3.jar

Вы можете просмотреть его на:

https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html

Раздел «Обеспечение зависимости файловой системы S3»

...