Spark загружает коллекцию файлов в пакетном режиме и находит строку из каждого файла с дополнительной информацией от уровня файла - PullRequest
0 голосов
/ 16 ноября 2018

У меня есть коллекция файлов, указанная через запятую, например:

hdfs://user/cloudera/date=2018-01-15,hdfs://user/cloudera/date=2018-01-16,hdfs://user/cloudera/date=2018-01-17,hdfs://user/cloudera/date=2018-01-18,hdfs://user/cloudera/date=2018-01-19,hdfs://user/cloudera/date=2018-01-20,hdfs://user/cloudera/date=2018-01-21,hdfs://user/cloudera/date=2018-01-22

и я загружаю файлы с помощью Apache Spark, все сразу:

val input = sc.textFile(files)

Кроме того, у меня есть дополнительная информация, связанная с каждым файлом - уникальный идентификатор, например:

File                                     ID
--------------------------------------------------
hdfs://user/cloudera/date=2018-01-15  | 12345
hdfs://user/cloudera/date=2018-01-16  | 09245
hdfs://user/cloudera/date=2018-01-17  | 345hqw4
and so on

В качестве вывода мне нужно получить DataFrame со строками, где каждая строка будет содержать тот же идентификатор, что и идентификатор файла, из которого была прочитана эта строка.

Можно ли каким-то образом передать эту информацию Spark, чтобы иметь возможность связываться со строками?

Ответы [ 2 ]

0 голосов
/ 17 ноября 2018

Core sql подход с UDF (то же самое вы можете добиться с join, если вы представляете File -> ID mapping в Dataframe):

import org.apache.spark.sql.functions

val inputDf = sparkSession.read.text(".../src/test/resources/test")
    .withColumn("fileName", functions.input_file_name())

def withId(mapping: Map[String, String]) = functions.udf(
  (file: String) => mapping.get(file)
)

val mapping = Map(
  "file:///.../src/test/resources/test/test1.txt" -> "id1",
  "file:///.../src/test/resources/test/test2.txt" -> "id2"
)

val resutlDf = inputDf.withColumn("id", withId(mapping)(inputDf("fileName")))
resutlDf.show(false)

Результат:

+-----+---------------------------------------------+---+
|value|fileName                                     |id |
+-----+---------------------------------------------+---+
|row1 |file:///.../src/test/resources/test/test1.txt|id1|
|row11|file:///.../src/test/resources/test/test1.txt|id1|
|row2 |file:///.../src/test/resources/test/test2.txt|id2|
|row22|file:///.../src/test/resources/test/test2.txt|id2|
+-----+---------------------------------------------+---+

text1.txt:

row1
row11

text2.txt:

row2
row22
0 голосов
/ 16 ноября 2018

Это может помочь (не проверено)

// read single text file into DataFrame and add 'id' column
def readOneFile(filePath: String, fileId: String)(implicit spark: SparkSession): DataFrame = {
  val dfOriginal: DataFrame = spark.read.text(filePath)
  val dfWithIdColumn: DataFrame = dfOriginal.withColumn("id", lit(fileId))

  dfWithIdColumn
}

// read all text files into DataFrame
def readAllFiles(filePathIdsSeq: Seq[(String, String)])(implicit spark: SparkSession): DataFrame = {
  // create empty DataFrame with expected schema
  val emptyDfSchema: StructType = StructType(List(
    StructField("value", StringType, false),
    StructField("id", StringType, false)
  ))
  val emptyDf: DataFrame = spark.createDataFrame(
    rowRDD = spark.sparkContext.emptyRDD[Row],
    schema = emptyDfSchema
  )

  val unionDf: DataFrame = filePathIdsSeq.foldLeft(emptyDf) { (intermediateDf: DataFrame, filePathIdTuple: (String, String)) =>
    intermediateDf.union(readOneFile(filePathIdTuple._1, filePathIdTuple._2))
  }
  unionDf
}

Ссылки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...