lowerByKey список списков в PySpark - PullRequest
0 голосов
/ 21 января 2019

Я новичок в pyspark, и пока немного сложно понять, как он работает, особенно когда вы используете библиотеки, такие как pandas.Но, похоже, это путь для больших данных.

Для моей текущей работы ETL у меня есть следующие элементы:

Это мой rdd:

[
    [
    ('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
    ], 
    [
    ('SMSG', 'BKT'), ('SQNR', '00000024'), ('STNQ', '06'), ('TRNN', '000002'), ('NRID', '  '), ('TREC', '020'), ('TRNN', '000002'), ('NRID', '  '), ('TACN', '001'), ('CARF', '          '), ... 
    ],
    ...
]

Данные строки представляют собой текстовый файл фиксированного размера.

Теперь я хочу сделать groupByKey для каждой ячейки списка.

конечный результат должен быть:

[
    [
    ('SMSG_1', 'BKT'),('SMSG_2','BKS'),('SQNR_1', '00000004'),('SQNR_2', '00000005'),('STNQ_1','06'),('STNQ_2','24'),('TRNN', '000001'),()('DAIS', '171231'),...
    ],
    [
    ('SMSG', 'BKT'),('SQNR', '00000024'),('STNQ','06'),('TRNN', '000002'),('NRID', '  '), ('TREC', '020'), ('TACN', '001'), ('CARF', '          '),...
    ],
    ...
]

В основном правила таковы:

1 - если ключи одинаковые и значения одинаковые, удалите дубликаты.

2 - если ключи одинаковые и значения разные, переименуйте столбцы и добавьте суффикс как «_Number», где Number можно заменить номером итерации этого ключа.

Мой код начинается сследующее:

def addBKT():
...
def prepareTrans():
...
if __name__ == '__main__':
    input_folder = '/Users/admin/Documents/Training/FR20180101HOT' 
    rdd = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
    rdd = rdd.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
    print(rdd.take(1))

Печать дает мне (как и раньше) следующий список списков кортежей.Я беру только 1 подсписок, но полный rdd имеет около 2000 подсписков кортежей:

[
    [
    ('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
    ]
]

Я попытался сначала уменьшить вложенные списки следующим образом:

rdd = rdd.flatMap(lambda x:x).reduceByKey(list)

Я ожидал, чтоприведите новый список списков без дубликатов, а для кортежей с разными значениями сгруппируйте их все под одним ключом.Однако я не могу этого сделать.

В качестве второго шага я планировал преобразовать кортежи с несколькими значениями в новые пары кортежей так же, как я получил значения в сгруппированном кортеже: т.е. ('Key', ['Value1', 'Value2']) становиться ('Key_1', 'Value1'), ('Key_2', 'Value2')

Наконец, результатом всех этих преобразований является преобразование финальногоСнова в DataFrame и сохраните его в формате паркета.

Я действительно надеюсь, что кто-то делал что-то подобное в прошлом.Я потратил много времени, чтобы попытаться сделать это, но я не смог сделать это, и я не смог найти ни одного примера в Интернете.

Спасибо за вашу помощь.

Ответы [ 3 ]

0 голосов
/ 23 января 2019

Большое спасибо за ссылку, я последовал предложенному решению.Фрейм данных успешно создан, и это здорово.

    input_folder = '/Users/admin/Documents/Training/FR20180101HOT' 
    rdd_split = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
    rdd_trans = rdd_split.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
    #rdd_group = rdd_trans.map(lambda x : x[i] for i in range(len(x))).reduceByKey(lambda x, y: str(x) + ','+ str(y))   
    df = spark.read.options(inferSchema="true").csv(rdd_trans)
    print(df.show(1)) 

Печать показывает мне что-то вроде этого:

+--------+-------+--------+------------+--------+------+--------+----------+----...

|     _c0|    _c1|     _c2|         _c3|     _c4|   _c5|     _c6|       _c7|     _c8|   _c9|    _c10|   _c11|    _c12|   _c13|    _c14|          _c15|    _c16|                _c17|    _c18|    _c19|    _c20|              _c21|    _c22| _c23|    _c24| _c25|    _c26| _c27|    _c28| _c29|    _c30|                _c31|    _c32|   _c33|    _c34|        _c35|    _c36|  _c37|    _c38|      _c39|    _c40|      _c41|    _c42|              _c43|    _c44| _c45|    _c46|    _c47|    _c48|   _c49|    _c50|        _c51|    _c52| _c53|    _c54|               _c55|    _c56|    _c57|    _c58|          _c59|    _c60|             _c61|    _c62|    _c63|    _c64|                _c65|    _c66|   _c67|    _c68|        _c69|    _c70|  _c71|    _c72|      _c73|    _c74|      _c75|    _c76|              _c77|    _c78| _c79|    _c80|   _c81|    _c82| _c83|      _c84|        _c85|      _c86|   _c87|      _c88|        _c89|      _c90|   _c91|      _c92|        _c93|      _c94|   _c95|    _c96|    _c97|    _c98|  _c99|   _c100|   _c101|   _c102|  _c103|   _c104|       _c105|   _c106| _c107|   _c108|     _c109|   _c110|     _c111|   _c112|             _c113|   _c114|_c115|   _c116|_c117|   _c118|_c119|     _c120|       _c121|     _c122| _c123|     _c124|       _c125|     _c126| _c127|     _c128|       _c129|     _c130|  _c131|   _c132|_c133|   _c134| _c135|   _c136|   _c137|   _c138|  _c139|   _c140|       _c141|   _c142| _c143|   _c144|     _c145|   _c146|     _c147|   _c148|             _c149|   _c150|_c151|   _c152|_c153|   _c154|_c155|     _c156|       _c157|     _c158| _c159|     _c160|       _c161|     _c162|_c163|     _c164|       _c165|     _c166|  _c167|   _c168|_c169|   _c170| _c171|   _c172|   _c173|   _c174|  _c175|   _c176|       _c177|   _c178| _c179|   _c180|     _c181|   _c182|     _c183|   _c184|             _c185|   _c186|_c187|   _c188|_c189|   _c190|_c191|     _c192|       _c193|     _c194|  _c195|     _c196|       _c197|     _c198| _c199|     _c200|       _c201|     _c202|  _c203|   _c204|_c205|   _c206| _c207|   _c208|   _c209|   _c210|  _c211|   _c212|       _c213|   _c214| _c215|   _c216|     _c217|   _c218|     _c219|   _c220|             _c221|   _c222|_c223|   _c224|_c225|   _c226|_c227|     _c228|       _c229|     _c230| _c231|     _c232|       _c233|     _c234| _c235|     _c236|       _c237|     _c238|  _c239|   _c240|_c241|   _c242| _c243|   _c244|   _c245|   _c246|  _c247|   _c248|       _c249|   _c250| _c251|   _c252|     _c253|   _c254|     _c255|   _c256|             _c257|   _c258|_c259|   _c260|_c261|   _c262|_c263|     _c264|       _c265|     _c266| _c267|     _c268|       _c269|     _c270|_c271|     _c272|       _c273|     _c274|_c275|   _c276|_c277|   _c278| _c279|   _c280|   _c281|   _c282|  _c283|   _c284|       _c285|   _c286| _c287|   _c288|     _c289|   _c290|     _c291|   _c292|             _c293|   _c294|_c295|   _c296|  _c297|   _c298|     _c299|   _c300|    _c301|   _c302|_c303|   _c304|     _c305|   _c306|    _c307|   _c308|_c309|   _c310|    _c311|   _c312|_c313|   _c314|_c315|   _c316|_c317|   _c318|               _c319|   _c320|   _c321|   _c322|  _c323|   _c324|       _c325|   _c326| _c327|   _c328|     _c329|   _c330|     _c331|   _c332|             _c333|   _c334|_c335|   _c336|             _c337|   _c338|  _c339|   _c340|      _c341|   _c342|       _c343|   _c344|               _c345|   _c346|              _c347|   _c348|  _c349|   _c350|       _c351|   _c352| _c353|   _c354|     _c355|   _c356|     _c357|   _c358|             _c359|   _c360|_c361|   _c362|_c363|   _c364|_c365|   _c366|    _c367|   _c368|    _c369|   _c370|    _c371|   _c372|    _c373|   _c374|  _c375|   _c376|_c377|   _c378|    _c379|   _c380| _c381|   _c382|    _c383|   _c384|    _c385|   _c386| _c387|   _c388|  _c389|   _c390|              _c391|   _c392|               _c393|   _c394|  _c395|   _c396|         _c397|   _c398|  _c399|   _c400|       _c401|   _c402| _c403|   _c404|     _c405|   _c406|     _c407|   _c408|             _c409|   _c410|_c411|   _c412|_c413|   _c414|_c415|   _c416|    _c417|   _c418|    _c419|   _c420|    _c421|   _c422|    _c423|   _c424|  _c425|   _c426|_c427|   _c428|    _c429|   _c430| _c431|   _c432|    _c433|   _c434|    _c435|   _c436| _c437|   _c438|  _c439|   _c440|              _c441|   _c442|               _c443|   _c444|  _c445|   _c446|         _c447|   _c448|  _c449|   _c450|       _c451|   _c452| _c453|   _c454|     _c455|   _c456|     _c457|   _c458|             _c459|   _c460|_c461|   _c462|_c463|   _c464|_c465|   _c466|    _c467|   _c468|    _c469|   _c470|    _c471|   _c472|    _c473|   _c474|  _c475|   _c476|_c477|   _c478|    _c479|   _c480| _c481|   _c482|    _c483|   _c484|    _c485|   _c486| _c487|   _c488|  _c489|   _c490|              _c491|   _c492|               _c493|   _c494|  _c495|   _c496|         _c497|   _c498|  _c499|   _c500|       _c501|   _c502| _c503|   _c504|     _c505|   _c506|     _c507|   _c508|             _c509|   _c510|_c511|   _c512|_c513|   _c514|_c515|   _c516|    _c517|   _c518|    _c519|   _c520|    _c521|   _c522|    _c523|   _c524|  _c525|   _c526|_c527|   _c528|    _c529|   _c530| _c531|   _c532|    _c533|   _c534|    _c535|   _c536| _c537|   _c538|  _c539|   _c540|              _c541|   _c542|               _c543|   _c544|  _c545|   _c546|         _c547|   _c548|  _c549|   _c550|       _c551|   _c552| _c553|   _c554|     _c555|   _c556|     _c557|   _c558|             _c559|   _c560|_c561|   _c562|           _c563|   _c564|_c565|   _c566|           _c567|   _c568|           _c569|   _c570|   _c571|   _c572|_c573|   _c574|     _c575|   _c576|_c577|   _c578|_c579|   _c580|       _c581|   _c582|               _c583|   _c584|  _c585|   _c586|       _c587|   _c588| _c589|   _c590|     _c591|   _c592|     _c593|   _c594|             _c595|   _c596|_c597|   _c598|               _c599|   _c600|               _c601|   _c602|      _c603|   _c604|  _c605|   _c606|       _c607|   _c608|  _c609|   _c610|       _c611|   _c612| _c613|   _c614|     _c615|   _c616|     _c617|   _c618|             _c619|   _c620|_c621|   _c622|_c623|   _c624|               _c625|   _c626|               _c627|   _c628|  _c629|   _c630|       _c631|   _c632| _c633|   _c634|     _c635|   _c636|     _c637|   _c638|             _c639|   _c640|_c641|   _c642|_c643|   _c644|               _c645|   _c646|       _c647|   _c648|  _c649|   _c650|       _c651|   _c652| _c653|   _c654|     _c655|   _c656|     _c657|   _c658|             _c659|   _c660|_c661|   _c662|_c663|   _c664|               _c665|   _c666|       _c667|   _c668|  _c669|   _c670|       _c671|   _c672| _c673|   _c674|     _c675|   _c676|     _c677|   _c678|         _c679|   _c680|   _c681|   _c682|               _c683|   _c684|   _c685|   _c686| _c687|   _c688|     _c689|   _c690|             _c691|   _c692|     _c693|   _c694|   _c695|   _c696|_c697|   _c698|               _c699|   _c700|    _c701|
+--------+-------+--------+------------+--------+------+--------+----------+-------...

|[('SMSG'| 'BKT')| ('SQNR'| '00000004')| ('STNQ'| '06')| ('TRNN'| '000001')| ('NRID'| '  ')| ('TREC'| '020')| ('TACN'| '001')| ('CARF'| '          ')| ('CSTF'| '               ...| ('RPSI'| 'SABR')| ('ESAC'| '              ')| ('DISI'| ' ')| ('NRMI'| ' ')| ('NRCT'| ' ')| ('AREI'| ' ')| ('RESD'| '               ...| ('SMSG'| 'BKS')| ('SQNR'| '00000005')| ('STNQ'| '24')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('CPUI'| 'FFFF')| ('CJCP'| '   ')| ('AGTN'| '20212146')| ('RFIC'| ' ')| ('TOUR'| '               ')| ('TRNC'| 'TKTT')| ('TODC'| 'CDGCDG    ')| ('PNRR'| 'IKQOWZ/AA    ')| ('TIIS'| '0000')| ('RESD'| '               ...| ('SMSG'| 'BKS')| ('SQNR'| '00000006')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 225.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'YR      ')| ('TMFA_1'| 300.0)| ('TMFT_2'| 'FR      ')| ('TMFA_2'| 20.81)| ('TMFT_3'| 'QX      ')| ('TMFA_3'| 27.91)| ('TDAM'| 712.92)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000007')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'IZ      ')| ('TMFA_1'| 4.51)| ('TMFT_2'| 'YC      ')| ('TMFA_2'| 9.22)| ('TMFT_3'| 'XY      ')| ('TMFA_3'| 11.74)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000008')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'XA      ')| ('TMFA_1'| 6.64)| ('TMFT_2'| 'AY      ')| ('TMFA_2'| 9.4)| ('TMFT_3'| 'WD      ')| ('TMFA_3'| 29.33)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000009')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'EK      ')| ('TMFA_1'| 18.89)| ('TMFT_2'| 'EL      ')| ('TMFA_2'| 4.19)| ('TMFT_3'| 'HG      ')| ('TMFA_3'| 16.76)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000010')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'JT      ')| ('TMFA_1'| 2.52)| ('TMFT_2'| 'UC      ')| ('TMFA_2'| 6.72)| ('TMFT_3'| 'QK      ')| ('TMFA_3'| 16.76)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000011')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'XF      ')| ('TMFA_1'| 2.52)| ('TMFT_2'| 'XFCLT3  ')| ('TMFA_2'| 0.0)| ('TMFT_3'| '        ')| ('TMFA_3'| 0.0)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000012')| ('STNQ'| '39')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('STAT'| 'I  ')| ('COTP'| '      ')| ('CORT'| '00000')| ('COAM'| 0.0)| ('SPTP'| '      ')| ('SPRT'| '00000')| ('SPAM'| 0.0)| ('EFRT'| '00000')| ('EFCO'| 0.0)| ('APBC'| 0.0)| ('RDII'| ' ')| ('RESD'| '               ...| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000013')| ('STNQ'| '46')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('ORIT'| '              ')| ('ORIL'| '   ')| ('ORID'| '       ')| ('ORIA'| '00000000')| ('ENRS'| 'NONREF/RESTRICT...| ('RESD'| '               ')| ('SMSG'| 'BKI')| ('SQNR'| '00000014')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '1')| ('STPO'| 'X')| ('NBDA'| '22APR')| ('NADA'| '22APR')| ('ORAC'| 'CDG  ')| ('DSTC'| 'MIA  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| '  63 ')| ('RBKD'| 'O ')| ('FTDA'| '22APR')| ('FTDT'| '1155 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BKI')| ('SQNR'| '00000015')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '2')| ('STPO'| 'O')| ('NBDA'| '22APR')| ('NADA'| '22APR')| ('ORAC'| 'MIA  ')| ('DSTC'| 'MBJ  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| '1515 ')| ('RBKD'| 'O ')| ('FTDA'| '22APR')| ('FTDT'| '1801 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BKI')| ('SQNR'| '00000016')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '3')| ('STPO'| 'X')| ('NBDA'| '29APR')| ('NADA'| '29APR')| ('ORAC'| 'MBJ  ')| ('DSTC'| 'CLT  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| ' 844 ')| ('RBKD'| 'O ')| ('FTDA'| '29APR')| ('FTDT'| '1059 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BKI')| ('SQNR'| '00000017')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '4')| ('STPO'| ' ')| ('NBDA'| '29APR')| ('NADA'| '29APR')| ('ORAC'| 'CLT  ')| ('DSTC'| 'CDG  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| ' 786 ')| ('RBKD'| 'O ')| ('FTDA'| '29APR')| ('FTDT'| '1630 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BAR')| ('SQNR'| '00000018')| ('STNQ'| '64')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FARE'| 'EUR   225.00')| ('TKMI'| '/')| ('EQFR'| '            ')| ('TOTL'| 'EUR   712.92')| ('SASI'| '0011')| ('FCMI'| '0')| ('BAID'| '      ')| ('BEOT'| ' ')| ('FCPI'| '0')| ('AENT'| '        ')| ('RESD'| '               ...| ('SMSG'| 'BAR')| ('SQNR'| '00000019')| ('STNQ'| '65')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('PXNM'| '               ...| ('PXDA'| '               ...| ('DOBR'| '02APR68')| ('PXTP'| '   ')| ('RESD'| '        ')| ('SMSG'| 'BAR')| ('SQNR'| '00000020')| ('STNQ'| '66')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FPSN'| '1')| ('FPIN'| 'AA132193       ...| ('RESD'| '               ...| ('SMSG'| 'BKF')| ('SQNR'| '00000021')| ('STNQ'| '81')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FRCS'| '1')| ('FRCA'| 'PAR AA X/MIA AA...| ('RESD'| '        ')| ('SMSG'| 'BKF')| ('SQNR'| '00000022')| ('STNQ'| '81')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FRCS'| '2')| ('FRCA'| '1IZ9.22YC11.74X...| ('RESD'| '        ')| ('SMSG'| 'BKP')| ('SQNR'| '00000023')| ('STNQ'| '84')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('FPTP'| 'CA        ')| ('FPAM'| 712.92)| ('FPAC'| '               ...| ('EXDA'| '    ')| ('EXPC'| '  ')| ('APLC'| '      ')| ('INVN'| '              ')| ('INVD'| '000000')| ('REMT'| 712.92)| ('CVVR'| ' ')| ('RESD'| '               ...| ('CUTP'| 'EUR2')]|
+--------+-------+--------+------------+--------+------+--------+----------+-------...

Я думаю, мне все еще нужно пройти через каждую пару столбцов, переименовать второй столбец со значением первой строкипервого столбца и, наконец, отбросьте все первые столбцы каждой пары столбцов.

Или возможно добавить дополнительные параметры в:

df = spark.read.options(inferSchema="true").csv(rdd_trans)

, чтобы получить точную правильную структуру данных?Это позволит избежать больше времени на обработку (моя цель - быть быстрее, чем в версии для панд)

Тем временем я пытался сделать:

df.write.parquet("/Users/admin/Documents/Training/FR20180101HOT.parquet")

Но получил ошибку:

Py4JJavaError: An error occurred while calling o447851.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8220.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8220.0 (TID 12712, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.

...
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
...

Я не могу выложить все сообщения об ошибках из-за ограничения текста, но, похоже, это связано с проблемой памяти.

Я сделал подсчет для df:

print(df.count())

15723

, который равен числу строк в моей версии для панд (другой код Python, не использующий pyspark), поэтому он получает правильное числорядов.Однако в пандах я могу без проблем извлекать паркет.

0 голосов
/ 27 января 2019

Вы можете попробовать regexp_replace для вашего случая.Проверьте приведенный ниже пример,

df1.withColumn("c0", regexp_replace("_c0", "[()']", "")).withColumn("c1", regexp_replace("_c1", "\)", "")).show()

+----+---+---+---+
| _c0|_c1| c0| c1|
+----+---+---+---+
|('a'| 2)|  a|  2|
|('b'| 4)|  b|  4|
|('c'| 6)|  c|  6|
+----+---+---+---+

0 голосов
/ 21 января 2019

Поскольку вы новичок в Spark, вы можете не знать о Spark Dataframe.Dataframe - это продвинутая концепция по сравнению с RDD.Здесь я решил вашу проблему с помощью Pyspark Dataframe.Посмотрите на это, не стесняйтесь изучать искры Dataframe.

rdd1 = sc.parallelize([("SMSG", "BKT"), ("SMSG", "BKT"), ("SMSG", "BKS"), ('SQNR', '00000004'), ('SQNR', '00000005') ])
rddToDF = rdd1.toDF(["C1", "C2"])
+----+--------+
|  C1|      C2|
+----+--------+
|SMSG|     BKT|
|SMSG|     BKT|
|SMSG|     BKS|
|SQNR|00000004|
|SQNR|00000005|
+----+--------+

DfRmDup = rddToDF.drop_duplicates() #Removing duplicates from Dataframe
DfRmDup.show()
+----+--------+
|  C1|      C2|
+----+--------+
|SQNR|00000004|
|SMSG|     BKT|
|SQNR|00000005|
|SMSG|     BKS|
+----+--------+

rank = DfRmDup.withColumn("rank", dense_rank().over(Window.partitionBy("C1").orderBy(asc("C2"))))
rank.show()
+----+--------+----+
|  C1|      C2|rank|
+----+--------+----+
|SQNR|00000004|   1|
|SQNR|00000005|   2|
|SMSG|     BKS|   1|
|SMSG|     BKT|   2|
+----+--------+----+

rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").show()
+------+--------+
|    C1|      C2|
+------+--------+
|SQNR_1|00000004|
|SQNR_2|00000005|
|SMSG_1|     BKS|
|SMSG_2|     BKT|
+------+--------+

#Converting back to RDD
rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").rdd.map(lambda x: (x[0],x[1])).collect()

[('SQNR_1', '00000004'),
 ('SQNR_2', '00000005'),
 ('SMSG_1', 'BKS'),
 ('SMSG_2', 'BKT')]
...