Как читать с помощью Spark, постоянно обновляющего каталог HDFS и разделяющего вывод на несколько файлов HDFS на основе строки (строки)? - PullRequest
0 голосов
/ 22 октября 2018

Проработанный сценарий -> каталог HDFS, который «заполняется» новыми данными журнала о нескольких типах операций с банковскими счетами.Каждая строка представляет случайный тип активности, а каждая строка (строка) содержит текст «ActivityType = <TheTypeHere>».

В Spark-Scala, какой наилучший подход для чтения входного файла / файлов в HDFSкаталог и вывод нескольких файлов HDFS, где каждый ActivityType записывается в свой новый файл?

Ответы [ 3 ]

0 голосов
/ 22 октября 2018

Вы можете сделать что-то подобное, используя RDD, при этом я предполагаю, что у вас есть файлы переменной длины, а затем конвертируется в DF:

val rdd = sc.textFile("/FileStore/tables/activity.txt")
val rdd2 = rdd.map(_.split(","))
          .keyBy(_(0))
val rdd3 = rdd2.map(x => (x._1, x._2.mkString(",")))
val df = rdd3.toDF("K", "V")  
//df.show(false)

df.write.partitionBy("K").text("SO_QUESTION")

Ввод:

ActivityType=<ACT_001>,34,56,67,89,90
ActivityType=<ACT_002>,A,1,2
ActivityType=<ACT_003>,ABC

Я получаю тогда каквыведите 3 файла, в данном случае 1 для каждой записи.Немного трудно показать, как это было в Databricks.

Вы можете настроить свой выходной формат, местоположение и т. Д. PartitionBy - это ключ.

0 голосов
/ 22 октября 2018

Адаптированный первый ответ к утверждению:

Местоположение строки «ключ» является случайным в родительской строке, единственное, что гарантируется, это то, что она содержит эту подстроку, вв этом случае «ActivityType», за которым следует некоторое значение.

Вопрос действительно об этом.Здесь идет:

// SO Question
val rdd = sc.textFile("/FileStore/tables/activitySO.txt")  
val rdd2 = rdd.map(x => (x.slice (x.indexOfSlice("ActivityType=<")+14, x.indexOfSlice(">", (x.indexOfSlice("ActivityType=<")+14))), x))
val df = rdd2.toDF("K", "V")
df.write.partitionBy("K").text("SO_QUESTION2")

Ввод:

ActivityType=<ACT_001>,34,56,67,89,90
3,4,4,ActivityType=<ACT_002>,A,1,2
ABC,ActivityType=<ACT_0033>
DEF,ActivityType=<ACT_0033>

Вывод - 3 файла, в которых ключ, например, не ActivityType =, а ACT_001 и т. Д. Данные ключа не удаляются,это все еще там в Последовательности.Вы можете изменить это, если хотите, а также расположение и формат вывода.

0 голосов
/ 22 октября 2018

Вы можете использовать MultipleOutputFormat для этого. Преобразовать rdd в пары ключ-значение, чтобы ActivityType был ключом. Spark создаст разные файлы для разных ключей. Вы можете решить, основываясь на ключе, где размещать файлы и каковы их имена..

...