Я использую Scala и Apache Flink для создания ETL, который периодически читает все файлы в каталоге в моей локальной файловой системе и записывает результат обработки каждого файла в один выходной файл в другом каталоге.
Итак, пример этого будет:
/dir/to/input/files/file1
/dir/to/intput/files/fil2
/dir/to/input/files/file3
и вывод ETL будет точно:
/dir/to/output/files/file1
/dir/to/output/files/file2
/dir/to/output/files/file3
Я пробовал различные подходы, включая сокращение параллельной обработки до единицы при записи в приемник данных, но я все еще не могу достичь требуемого результата.
Это мой текущий код:
val path = "/path/to/input/files/"
val format = new TextInputFormat(new Path(path))
val socketStream = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
val wordsStream = socketStream.flatMap(value => value.split(",")).map(value => WordWithCount(value,1))
val keyValuePair = wordsStream.keyBy(_.word)
val countPair = keyValuePair.sum("count")
countPair.print()
countPair.writeAsText("/path/to/output/directory/"+
DateTime.now().getHourOfDay.toString
+
DateTime.now().getMinuteOfHour.toString
+
DateTime.now().getSecondOfMinute.toString
, FileSystem.WriteMode.NO_OVERWRITE)
// The first write method I trid:
val sink = new BucketingSink[WordWithCount]("/path/to/output/directory/")
sink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm"))
// The second write method I trid:
val sink3 = new BucketingSink[WordWithCount]("/path/to/output/directory/")
sink3.setUseTruncate(false)
sink3.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
sink3.setWriter(new StringWriter[WordWithCount])
sink3.setBatchSize(3)
sink3.setPendingPrefix("file-")
sink3.setPendingSuffix(".txt")
Оба метода записи не дают желаемого результата.
Может кто-нибудь с опытом работы с Apache Flink подскажет мне подход к написанию.