Я новичок в Spark и не совсем понимаю, как это спросить (какие термины использовать и т. Д.), Поэтому вот картина того, что я концептуально пытаюсь выполнить:
У меня есть много небольших, отдельных .txt файлов "книги" (например, файлы с разделителями строк с отметкой времени и значениями атрибутов в то время).
I 'мне бы хотелось:
Считать каждый файл "книги" в отдельные кадры данных (читается: НЕ объединяется в один большой кадр данных);
Выполнить некоторые базовые вычисления для каждого отдельного фрейма данных, что приведет к ряду новых значений данных;а затем
Объединить все отдельные строки результатов в конечный объект и сохранить его на диске в файле с разделителями строк.
Кажетсякак почти каждый ответ, который я нахожу (когда прибегаю к поиску связанных терминов), касается загрузки нескольких файлов в один RDD или DataFrame, но я нашел этот код Scala:
val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = {
println (file);
// your logic of processing a single file comes here
val logData = sc.textFile(file);
val numAs = logData.filter(line => line.contains("a")).count();
println("Lines with a: %s".format(numAs));
// save rdd of single file processed data to hdfs comes here
}
files.collect.foreach( filename => {
doSomething(filename)
})
... но:
A.Я не могу сказать, распараллеливает ли это операцию чтения / анализа, и
B.Я не думаю, что это позволяет объединить результаты в один объект.
Любое направление или рекомендации очень ценятся!
Обновление
ЭтоКажется, что то, что я пытаюсь сделать (запустить сценарий для нескольких файлов параллельно, а затем объединить результаты) может потребоваться что-то вроде пулы потоков (?).
Для ясности, вотпример вычисления, которое я хотел бы выполнить для DataFrame, созданного чтением в файле «бухгалтерской книги»:
from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp
# Read "ledger file"
df = spark.read.json("/path/to/ledger-filename.txt")
# Convert string ==> timestamp & sort
df = (df.withColumn("timestamp", to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))).sort('timestamp')
columns_with_age = ("location", "status")
columns_without_age = ("wh_id")
# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]
# Create an empty "final row" dictionary
final_row = {}
# For each column for which we want to calculate an age value ...
for c in columns_with_age:
# Initialize loop values
target_value = last_row.__getitem__(c)
final_row[c] = target_value
timestamp_at_lookback = last_row.__getitem__("timestamp")
look_back = 1
different = False
while not different:
previous_row = df.collect()[row_count - 1 - look_back]
if previous_row.__getitem__(c) == target_value:
timestamp_at_lookback = previous_row.__getitem__("timestamp")
look_back += 1
else:
different = True
# At this point, a difference has been found, so calculate the age
final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days
Таким образом, такая книга, как эта:
+---------+------+-------------------+-----+
| location|status| timestamp|wh_id|
+---------+------+-------------------+-----+
| PUTAWAY| I|2019-04-01 03:14:00| 20|
|PICKABLE1| X|2019-04-01 04:24:00| 20|
|PICKABLE2| X|2019-04-01 05:33:00| 20|
|PICKABLE2| A|2019-04-01 06:42:00| 20|
| HOTPICK| A|2019-04-10 05:51:00| 20|
| ICEXCEPT| A|2019-04-10 07:04:00| 20|
| ICEXCEPT| X|2019-04-11 09:28:00| 20|
+---------+------+-------------------+-----+
уменьшитк (при условии, что расчет был выполнен 2019-04-14):
{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }