Извлечение нескольких столбцов из столбца в PySpark DataFrame с использованием именованного регулярного выражения - PullRequest
0 голосов
/ 02 ноября 2018

Предположим, у меня есть DataFrame df в pySpark следующей формы:

| id | type | description                                                  |
|  1 | "A"  | "Date: 2018/01/01\nDescr: This is a test des\ncription\n     |
|  2 | "B"  | "Date: 2018/01/02\nDescr: Another test descr\niption\n       |
|  3 | "A"  | "Date: 2018/01/03\nWarning: This is a warnin\ng, watch out\n |

, который, конечно, является фиктивным набором, но будет достаточным для этого примера.

Я сделал регулярное выражение с именованными группами, которые можно использовать для извлечения соответствующей информации из поля описания, что-то вроде:

^(?:(?:Date: (?P<DATE>.+?)\n)|(?:Descr: (?P<DESCR>.+?)\n)|(?:Warning: (?P<WARNING>.+?)\n)+$

опять же, фиктивное регулярное выражение, само регулярное регулярное выражение несколько сложнее, но цель состоит в том, чтобы охватить три возможные группы:

| DATE       | DESCR                        | WARNING                        |
| 2018/01/01 | This is a test des\ncription | None                           |
| 2018/01/02 | Another test descr\niption   | None                           |
| 2018/01/03 | None                         | This is a warnin\ng, watch out |

Теперь я хотел бы добавить столбцы, являющиеся результатом соответствия регулярному выражению, к исходному фрейму данных (т. Е. Объединить две фиктивные таблицы в этом вопросе в одну).

Я пробовал несколько способов сделать это, но ни один из них еще не привел к полному решению. Я попробовал вот что:

def extract_fields(string):
   patt = <ABOVE_PATTERN>
   result = re.match(patt, string, re.DOTALL).groupdict()
   # Actually, a slight work-around is needed to overcome the None problem when 
   #   no match can be made, I'm using pandas' .str.extract for this now
   return result

df.rdd.map(lambda x: extract_fields(x.description))

Это даст вторую таблицу, но я не вижу способа объединить это с исходными столбцами из df. Я попытался создать новый Row(), но затем я столкнулся с проблемами с упорядочением столбцов (и тем, что я не могу жестко закодировать имена столбцов, которые будут добавлены группами регулярных выражений), которые необходимы в Row() -конструктор, в результате чего в кадре данных все столбцы перемешаны. Как мне добиться того, чего я хочу, то есть один DataFrame с шестью столбцами: id, type, description, DATE, DESCR и WARNING?

Примечание . На самом деле поле описания - это не одно поле, а несколько столбцов. Используя concat_ws, я объединил эти столбцы в новые столбцы description с полями описания, разделенными \n, но, возможно, это можно включить более подходящим способом.

1 Ответ

0 голосов
/ 05 ноября 2018

Я думаю, что вы можете использовать функции Pandas для этого случая. Сначала я конвертирую df в rdd, чтобы разделить поле описания. Я тяну Pandas DF, а затем создаю Spark DF с помощью Pandas DF. Работает независимо от номера столбца в поле описания

>>> import pandas as pd
>>> import re
>>> 
>>> df.show(truncate=False)
+---+----+-----------------------------------------------------------+
|id |type|description                                                |
+---+----+-----------------------------------------------------------+
|1  |A   |Date: 2018/01/01\nDescr: This is a test des\ncription\n    |
|2  |B   |Date: 2018/01/02\nDescr: Another test desc\niption\n       |
|3  |A   |Date: 2018/01/03\nWarning: This is a warnin\ng, watch out\n|
+---+----+-----------------------------------------------------------+

>>> #convert df to rdd
>>> rdd = df.rdd.map(list)
>>> rdd.first()
[1, 'A', 'Date: 2018/01/01\\nDescr: This is a test des\\ncription\\n']
>>> 
>>> #split description field
>>> rddSplit = rdd.map(lambda x: (x[0],x[1],re.split('\n(?=[A-Z])', x[2].encode().decode('unicode_escape'))))
>>> rddSplit.first()
(1, 'A', ['Date: 2018/01/01', 'Descr: This is a test des\ncription\n'])
>>> 
>>> #create empty Pandas df
>>> df1 = pd.DataFrame()
>>> 
>>> #insert rows
>>> for rdd in rddSplit.collect():
...     a = {i.split(':')[0].strip():i.split(':')[1].strip('\n').replace('\n','\\n').strip() for i in rdd[2]}
...     a['id'] = rdd[0]
...     a['type'] = rdd[1]
...     df2 = pd.DataFrame([a], columns=a.keys())
...     df1 = pd.concat([df1, df2])
... 
>>> df1
         Date                         Descr                         Warning  id type
0  2018/01/01  This is a test des\ncription                             NaN   1    A
0  2018/01/02     Another test desc\niption                             NaN   2    B
0  2018/01/03                           NaN  This is a warnin\ng, watch out   3    A
>>>
>>> #create spark df
>>> df3 = spark.createDataFrame(df1.fillna('')).replace('',None)
>>> df3.show(truncate=False)
+----------+----------------------------+------------------------------+---+----+
|Date      |Descr                       |Warning                       |id |type|
+----------+----------------------------+------------------------------+---+----+
|2018/01/01|This is a test des\ncription|null                          |1  |A   |
|2018/01/02|Another test desc\niption   |null                          |2  |B   |
|2018/01/03|null                        |This is a warnin\ng, watch out|3  |A   |
+----------+----------------------------+------------------------------+---+----+
...