Я новичок в pyspark, и пока немного сложно понять, как он работает, особенно когда вы используете библиотеки, такие как pandas.Но, похоже, это путь для больших данных.
Для моей текущей работы ETL у меня есть следующие элементы:
Это мой rdd:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
],
[
('SMSG', 'BKT'), ('SQNR', '00000024'), ('STNQ', '06'), ('TRNN', '000002'), ('NRID', ' '), ('TREC', '020'), ('TRNN', '000002'), ('NRID', ' '), ('TACN', '001'), ('CARF', ' '), ...
],
...
]
Данные строки представляют собой текстовый файл фиксированного размера.
Теперь я хочу сделать groupByKey для каждой ячейки списка.
конечный результат должен быть:
[
[
('SMSG_1', 'BKT'),('SMSG_2','BKS'),('SQNR_1', '00000004'),('SQNR_2', '00000005'),('STNQ_1','06'),('STNQ_2','24'),('TRNN', '000001'),()('DAIS', '171231'),...
],
[
('SMSG', 'BKT'),('SQNR', '00000024'),('STNQ','06'),('TRNN', '000002'),('NRID', ' '), ('TREC', '020'), ('TACN', '001'), ('CARF', ' '),...
],
...
]
В основном правила таковы:
1 - если ключи одинаковые и значения одинаковые, удалите дубликаты.
2 - если ключи одинаковые и значения разные, переименуйте столбцы и добавьте суффикс как «_Number», где Number можно заменить номером итерации этого ключа.
Мой код начинается сследующее:
def addBKT():
...
def prepareTrans():
...
if __name__ == '__main__':
input_folder = '/Users/admin/Documents/Training/FR20180101HOT'
rdd = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
rdd = rdd.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
print(rdd.take(1))
Печать дает мне (как и раньше) следующий список списков кортежей.Я беру только 1 подсписок, но полный rdd имеет около 2000 подсписков кортежей:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
]
]
Я попытался сначала уменьшить вложенные списки следующим образом:
rdd = rdd.flatMap(lambda x:x).reduceByKey(list)
Я ожидал, чтоприведите новый список списков без дубликатов, а для кортежей с разными значениями сгруппируйте их все под одним ключом.Однако я не могу этого сделать.
В качестве второго шага я планировал преобразовать кортежи с несколькими значениями в новые пары кортежей так же, как я получил значения в сгруппированном кортеже: т.е. ('Key', ['Value1', 'Value2']) становиться ('Key_1', 'Value1'), ('Key_2', 'Value2')
Наконец, результатом всех этих преобразований является преобразование финальногоСнова в DataFrame и сохраните его в формате паркета.
Я действительно надеюсь, что кто-то делал что-то подобное в прошлом.Я потратил много времени, чтобы попытаться сделать это, но я не смог сделать это, и я не смог найти ни одного примера в Интернете.
Спасибо за вашу помощь.