Spark: загрузка нескольких файлов, индивидуальный анализ, объединение результатов и сохранение - PullRequest
0 голосов
/ 14 апреля 2019

Я новичок в Spark и не совсем понимаю, как это спросить (какие термины использовать и т. Д.), Поэтому вот картина того, что я концептуально пытаюсь выполнить:

Conceptual need diagram

У меня есть много небольших, отдельных .txt файлов "книги" (например, файлы с разделителями строк с отметкой времени и значениями атрибутов в то время).

I 'мне бы хотелось:

  1. Считать каждый файл "книги" в отдельные кадры данных (читается: НЕ объединяется в один большой кадр данных);

  2. Выполнить некоторые базовые вычисления для каждого отдельного фрейма данных, что приведет к ряду новых значений данных;а затем

  3. Объединить все отдельные строки результатов в конечный объект и сохранить его на диске в файле с разделителями строк.

Кажетсякак почти каждый ответ, который я нахожу (когда прибегаю к поиску связанных терминов), касается загрузки нескольких файлов в один RDD или DataFrame, но я нашел этот код Scala:

val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = { 
println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs comes here
}

files.collect.foreach( filename => {
    doSomething(filename)
})

... но:

A.Я не могу сказать, распараллеливает ли это операцию чтения / анализа, и

B.Я не думаю, что это позволяет объединить результаты в один объект.

Любое направление или рекомендации очень ценятся!

Обновление

ЭтоКажется, что то, что я пытаюсь сделать (запустить сценарий для нескольких файлов параллельно, а затем объединить результаты) может потребоваться что-то вроде пулы потоков (?).

Для ясности, вотпример вычисления, которое я хотел бы выполнить для DataFrame, созданного чтением в файле «бухгалтерской книги»:

from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp

# Read "ledger file"
df = spark.read.json("/path/to/ledger-filename.txt")

# Convert string ==> timestamp & sort
df = (df.withColumn("timestamp", to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))).sort('timestamp')

columns_with_age = ("location", "status")
columns_without_age = ("wh_id")

# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]

# Create an empty "final row" dictionary
final_row = {}

# For each column for which we want to calculate an age value ...
for c in columns_with_age:

    # Initialize loop values
    target_value = last_row.__getitem__(c)
    final_row[c] = target_value
    timestamp_at_lookback = last_row.__getitem__("timestamp")
    look_back = 1
    different = False

    while not different:
        previous_row = df.collect()[row_count - 1 - look_back]
        if previous_row.__getitem__(c) == target_value:
            timestamp_at_lookback = previous_row.__getitem__("timestamp")
            look_back += 1

        else:
            different = True

    # At this point, a difference has been found, so calculate the age
    final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days

Таким образом, такая книга, как эта:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

уменьшитк (при условии, что расчет был выполнен 2019-04-14):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }

Ответы [ 2 ]

0 голосов
/ 14 апреля 2019

Вы можете получить пути к файлам в hdfs

import  org.apache.hadoop.fs.{FileSystem,Path}

val files=FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path(your_path)).map( x => x.getPath ).map(x=> "hdfs://"+x.toUri().getRawPath())

создать уникальный фрейм данных для каждого пути

val arr_df= files.map(spark.read.format("csv").option("delimeter", ",").option("header", true).load(_))

Применение фильтра или любого преобразования перед объединением в один фрейм данных

val df= arr_df.map(x=> x.where(your_filter)).reduce(_ union _)
0 голосов
/ 14 апреля 2019

Использование wholeTextFiles не рекомендуется, так как он загружает полный файл в память сразу. Если вы действительно хотите создать отдельный фрейм данных для файла, вы можете просто использовать полный путь вместо каталога. Однако это не рекомендуется и, скорее всего, приведет к плохому использованию ресурсов. Вместо этого рассмотрите возможность использования input_file_path https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name--

Например:

spark
.read
  .textFile("path/to/files")
  .withColumn("file", input_file_name())
  .filter($"value" like "%a%")
  .groupBy($"file")
  .agg(count($"value"))
  .show(10, false)
+----------------------------+------------+
|file                        |count(value)|
+----------------------------+------------+
|path/to/files/1.txt         |2           |
|path/to/files/2.txt         |4           |
+----------------------------+------------+

, поэтому файлы могут обрабатываться индивидуально, а затем объединяться.

...