Разделить столбец на основе указанной позиции - PullRequest
0 голосов
/ 17 мая 2018

Предположим, что мой фрейм данных такой, как показано ниже, и я хочу эффективно разделить Col1 на основе позиции.

df = sc.parallelize([['sdbsajkdbnasjdh'],['sdahasdbasjda']]).toDF(['Col1'])

+---------------+
|           Col1|
+---------------+
|sdbsajkdbnasjdh|
|  sdahasdbasjda|
+---------------+

pos = [(1,2),(3,5),(7,10)]

Например, на основе списка позиций я хочу получить набор результатов, как показано ниже:

d|sa|dbn
d|ha|bas

Я хочу эффективный способ разделения данных.

Мое текущее решение такое же, как показано ниже, но оно не работает, если я предоставлю список адресов дольше (если в списке 10 записей) с EofError.

udf1 = udf(lambda x:  "|".join(str(x) for x in [x[j[0]:j[1]] for j in pos]),StringType())
final_df = df.withColumn("Split",udf1('Col1'))

Ответы [ 3 ]

0 голосов
/ 17 мая 2018

Возможно, перебрав строку только один раз, вы сделаете это более эффективным.

(Предполагая текущую и правильную структуру "pos"):

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = sc.parallelize([['sdbsajkdbngdfgdfgdfgdfgdgdfgdgdfgdgdgsfhhfghfghdfghdfasjdh'],\
                     ['sdahasdbsdfgsdfhhsfghhkghmaeserewyuouip,mfhdfbdfgbdfgjdtsagasjda']]).toDF(['Col1'])
pos = [(1,2),(3,5),(7,10),(11,15),(20,21),(22,24),(24,26),(26,30),(35,38),(38,42),(42,43),(45,50)]

flat_pos = [x for y in pos for x in y]

def split_function(row):
    return_list = []
    start_copy = False
    flat_pos_index = 0
    temp_list = []
    max_pos = flat_pos[-1]
    i = 0
    while i < len(row):
        if i > max_pos:
            break
        elif i == flat_pos[flat_pos_index]: 
            if flat_pos_index % 2 == 0:
                start_copy = True
            else:
                start_copy = False
                return_list.append("".join(temp_list))
                temp_list = []
            flat_pos_index += 1
        if start_copy:
            temp_list.append(row[i])
        if flat_pos_index + 1 < len(flat_pos) and flat_pos[flat_pos_index] == flat_pos[flat_pos_index + 1]:
                flat_pos_index += 1
                start_copy = False
                return_list.append("".join(temp_list))
                temp_list = []
        else:
            i += 1

    return "|".join(return_list)


udf2 = udf(split_function ,StringType())
final_df = df.withColumn("Split",udf2('Col1'))

final_df.collect()

[Строка (Стлб1 = u'sdbsajkdbngdfgdfgdfgdfgdgdfgdgdfgdgdgsfhhfghfghdfghdfasjdh», Разделить = u'd | са | ДБН | dfgd | д | г | г | fgdg | D | F | г | ghdfg '), Строка (Стлб1 = u'sdahasdbsdfgsdfhhsfghhkghmaeserewyuouip, mfhdfbdfgbdfgjdtsagasjda», Split = u'd | га | BSD | GSDF | ч | к | ч | aese | о | р | ч | bdfgb ')]

0 голосов
/ 17 мая 2018

Вам не нужно использовать udf для этого.

Вместо этого вы можете использовать понимание списка над кортежами в сочетании с pyspark.sql.functions.array и pyspark.sql.functions.substring, чтобы получить нужные подстроки.

Обратите внимание, что первый аргумент substring() обрабатывает начало строки как индекс 1, поэтому мы передаем start+1.Второй аргумент - длина строки, поэтому я передаю (stop-start):

import pyspark.sql.functions as f
df.withColumn(
    'Split',
    f.array(
        [
            f.substring(
                str=f.col('Col1'),
                pos=start+1,
                len=(stop-start)
            ) 
            for start, stop in pos
        ]
    )
).show()
#+---------------+------------+
#|           Col1|       Split|
#+---------------+------------+
#|sdbsajkdbnasjdh|[d, sa, dbn]|
#|  sdahasdbasjda|[d, ha, bas]|
#+---------------+------------+

Чтобы объединить их вместе с помощью "|", оберните вызов в array() с помощью pyspark.sql.functions.concat_ws:

df.withColumn(
    'Split',
    f.concat_ws(
        "|",
        f.array(
            [
                f.substring(f.col('Col1'), start+1, (stop-start)) 
                for start, stop in pos
            ]
        )
    )
).show()
#+---------------+--------+
#|           Col1|   Split|
#+---------------+--------+
#|sdbsajkdbnasjdh|d|sa|dbn|
#|  sdahasdbasjda|d|ha|bas|
#+---------------+--------+

Использование функций DataFrame будет быстрее, чем использование udf.

0 голосов
/ 17 мая 2018

Мы можем использовать substr () из функций pyspark, чтобы получить отдельные столбцы для каждой подстроки. Затем UDF для составления строк для соединения столбцов. Попробовал приведенный ниже код,

from pyspark.sql import functions as F

udf1 = F.udf(lambda x : '|'.join(x))

df = df.withColumn('Col2',udf1(F.struct([df.Col1.substr(j[0]+1,j[1]-j[0]) for j in pos])))
+---------------+--------+
|           Col1|    Col2|
+---------------+--------+
|sdbsajkdbnasjdh|d|sa|dbn|
|  sdahasdbasjda|d|ha|bas|
+---------------+--------+

Поскольку substr () принимает startpos и length, мы вычисляем длину. Надеюсь, это поможет.!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...