Хороший вопрос, в идеале я бы пошел с udf , чтобы упростить задачу, но поскольку эта задача является хорошим примером использования функций высшего порядка Spark SQL ... Может быть, немного многословно, так чтоЯ разделил это на 4 шага. Дайте мне знать, если это работает, и любые вопросы приветствуются:
Шаг 1: преобразовать строку в массив строк
разделить строку по шаблону (?:(?!/)\p{Punct}|\s)+'))
, которыйявляется последовательной пунктуацией (кроме /
) или пробелами, а затем отфильтровывает пустые позиции (в начале / в конце). Временный столбец temp1
используется для сохранения промежуточных столбцов.
from pyspark.sql.functions import split, expr
df1 = df.withColumn('temp1', split('Assembly_name', r'(?:(?!/)\p{Punct}|\s)+')) \
.withColumn('temp1', expr("filter(temp1, x -> x <> '')"))
df1.select('temp1').show(truncate=False)
+-------------------------------------------------------------------------------------+
|temp1 |
+-------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06] |
|[OIL, PUMP, ASSEMBLY, A02EA09CA/CB/CC, 4999202399920239A06] |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06] |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26/GB26/LB26, ALL, OPTIONS, 49VICTRANS08]|
|[SUSPENSION, 7043244, S09PR6HSL/PS6HSL/HEL, 49SNOWSHOCKFRONT7043244SB] |
+-------------------------------------------------------------------------------------+
Шаг 2: преобразование temp1 в массив массивов
разделение элементов массива снова с использованием /
, поэтомучто все part-id в их собственном элементе массива
df2 = df1.withColumn('temp1', expr("transform(temp1, x -> split(x, '/'))"))
df2.select('temp1').show(truncate=False)
+----------------------------------------------------------------------------------------------------------+
|temp1 |
+----------------------------------------------------------------------------------------------------------+
|[[OIL], [PUMP], [ASSEMBLY], [A01EA09CA], [4999202399920239A06]] |
|[[OIL], [PUMP], [ASSEMBLY], [A02EA09CA, CB, CC], [4999202399920239A06]] |
|[[OIL], [PUMP], [ASSEMBLY], [A01EA05CA], [4999202399920239A06]] |
|[[DRIVE], [TRAIN], [TRANSMISSION], [6], [SPEED], [V08AB26, GB26, LB26], [ALL], [OPTIONS], [49VICTRANS08]] |
|[[SUSPENSION], [7043244], [S09PR6HSL, PS6HSL, HEL], [49SNOWSHOCKFRONT7043244SB]] |
+----------------------------------------------------------------------------------------------------------+
Шаг-3: используйте агрегат для сброса идентификаторов part *
Функция агрегата будет работать на внутренних массивах:
df3 = df2.withColumn('temp1', expr("""
flatten(
transform(temp1, x ->
transform(sequence(1, size(x)), i ->
aggregate(
sequence(1, i)
, x[0]
, (acc,j) -> concat(substr(acc, 1, length(x[0])-length(x[j-1])), x[j-1])
)
)
)
)
"""))
df3.select('temp1').show(truncate=False)
+----------------------------------------------------------------------------------------------+
|temp1 |
+----------------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06] |
|[OIL, PUMP, ASSEMBLY, A02EA09CA, A02EA09CB, A02EA09CC, 4999202399920239A06] |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06] |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26, V08GB26, V08LB26, ALL, OPTIONS, 49VICTRANS08] |
|[SUSPENSION, 7043244, S09PR6HSL, S09PS6HSL, S09PS6HEL, 49SNOWSHOCKFRONT7043244SB] |
+----------------------------------------------------------------------------------------------+
Где:
transform(temp1, x -> func1(x))
: перебирать каждый элемент в массиве temp1 для запуска func1 (x) , x - это внутренний массив (массив строк) func1(x)
, упомянутый выше, является другой функцией преобразования, которая выполняет итерацию последовательности (1, размер (x)) и выполните func2 (i) для каждого i :
transform(sequence(1, size(x)), i -> func2(i))
func2(i)
, упомянутого выше, является агрегатной функцией, которая выполняет итерацию попоследовательность (1, i) с начальным значением x [0] и накапливать / уменьшать с помощью функции:
(acc,j) -> concat(substr(acc, 1, length(acc)-length(x[j-1])), x[j-1])
Примечание: substr()
позицияНа основе 1 и array-indexing
на 0, поэтому нам нужно x [j-1] для ссылки на текущий элемент массива в приведенной выше функции уменьшения / агрегирования
наконец, запуститеflatten()
для объединения внутренних массивов
На этом шаге выполняется что-то вроде следующего псевдо-цикла:
for x in temp1:
for i in range(1, size(x)+1):
acc = x[0]
for j in range(1,i+1):
acc = concat(substr(acc, 1, length(acc)-length(x[j-1])), x[j-1])
Шаг 4: объединение и удаление дубликатов
df4 = df3.groupby('Itemno').agg(
expr("concat_ws(' ', array_distinct(flatten(collect_list(temp1)))) AS Assembly_names")
, expr("concat_ws(' ', collect_set(Assembly_id)) as Assembly_ids")
)
Где:
- использовать collect_list () , чтобы получить массив массивов (temp1, который является массивом строк)
- используйте flatten () для преобразования вышеуказанного в массив строк
- используйте array_distinct () для удаления дубликатов
используйте concat_ws () для преобразования вышеуказанного массива в строку
df4.select('Assembly_names').show(truncate=False)
+---------------------------------------------------------------------------------------+
|Assembly_names |
+---------------------------------------------------------------------------------------+
|OIL PUMP ASSEMBLY A01EA09CA 4999202399920239A06 A02EA09CA A02EA09CB A02EA09CC A01EA05CA|
|SUSPENSION 7043244 S09PR6HSL S09PS6HSL S09PS6HEL 49SNOWSHOCKFRONT7043244SB |
|DRIVE TRAIN TRANSMISSION 6 SPEED V08AB26 V08GB26 V08LB26 ALL OPTIONS 49VICTRANS08 |
+---------------------------------------------------------------------------------------+
ОБНОВЛЕНИЕ:
Первый из них легко исправить, этоГораздо проще, чем существующий (не нужно агрегировать). Для второго, следующее решение требует перебирать строку char по char, что может быть медленным. Если это так, мы можем проверить с помощью udf.
Ниже приведены изменения:
Шаг-1: Просто добавьте подчеркивание, чтобы исключитьиз знаков препинания: (обратите внимание, если подчеркивание отображается в других местах текста, возможно, сначала нужно запустить regexp_replace, чтобы очистить их)
df1 = df.withColumn('temp1', split('Assembly_name', r'(?:(?![/_])\p{Punct}|\s)+')) \
.withColumn('temp1', expr("filter(temp1, x -> x <> '')"))
Шаг 2: разбить массив массива дальше на массив массивов массивов, самый внутренний массив разделил строку на символы. переверните самый внутренний массив, чтобы его было легко сравнивать.
df2 = df1.withColumn('temp1', expr("transform(temp1, x -> split(x, '/'))")) \
.withColumn('temp1', expr("transform(temp1, x -> transform(x, y -> reverse(split(y, ''))) )"))
Шаг-3: Используйте transform () вместо aggregate () для сброса пар-идентификаторов. мы проверяем y [i] (элемент самого внутреннего массива), если он имеет значение NULL или подчеркивание, а затем заменяем его соответствующим элементом из x [0] [i] . затем мы инвертируем массив и используем concat_ws ('' ..) для преобразования его обратно в строку.
df3 = df2.withColumn('temp1', expr("""
flatten(
transform(temp1, x ->
transform(x, y ->
concat_ws('',
reverse(
transform(sequence(0, size(x[0])-1), i -> IF(y[i] is NULL or y[i] == '_', x[0][i], y[i]))
)
)
)
)
)
"""))
Ниже приведен результат из приведенного выше
df3.select('temp1').show(truncate=False)
+---------------------------------------------------------------------------------------------+
|temp1 |
+---------------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06] |
|[OIL, PUMP, ASSEMBLY, A02EA09CA, A02EA09CB, A02EA09CC, 4999202399920239A06] |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06] |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26, V08GB26, V08LB26, ALL, OPTIONS, 49VICTRANS08]|
|[SUSPENSION, 7043244, S09PR6HSL, S09PS6HSL, S09PR6HEL, 49SNOWSHOCKFRONT7043244SB] |
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PR6HSL, 49SNOWDRIVECLUTCH09600TRG] |
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PR6HSL, 49SNOWDRIVECLUTCH09600TRG] |
+---------------------------------------------------------------------------------------------+
Поле перед обработкой:
df.select('Assembly_name').show(truncate=False)
+----------------------------------------------------------------------------------+
|Assembly_name |
+----------------------------------------------------------------------------------+
|OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06) |
|OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06) |
|OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06) |
|DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)|
|SUSPENSION (7043244) - S09PR6HSL/PS6HSL/HEL (49SNOWSHOCKFRONT7043244SB) |
|DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/PS_HSL/H_L (49SNOWDRIVECLUTCH09600TRG) |
|DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/_S__SL/H_L (49SNOWDRIVECLUTCH09600TRG) |
+----------------------------------------------------------------------------------+
ОБНОВЛЕНИЕ-2 добавлено Шаг-0:
Шаг-0: для предварительной обработки столбца Assembly_name
, используйте regexp_replace + split , чтобы разделить модели # в новый столбец иудалить его из исходного столбца Assembly_name
:
from pyspark.sql.functions import regexp_replace, split
df0 = df.withColumn('new_col', split(regexp_replace('Assembly_name', r'^(.*)-\s*(\S+)(.*)$', '$1$3\0$2'),'\0')) \
.selectExpr(
'Itemno'
, 'Assembly_id'
, "coalesce(new_col[0], Assembly_name) as Assembly_name"
, "coalesce(new_col[1], '') as models"
)
df0.show(truncate=False)
+-------+-----------+---------------------------------------------------------------+--------------------+
|Itemno |Assembly_id|Assembly_name |models |
+-------+-----------+---------------------------------------------------------------+--------------------+
|0450056|44011 |OIL PUMP ASSEMBLY (4999202399920239A06) |A01EA09CA |
|0450056|135502 |OIL PUMP ASSEMBLY (4999202399920239A06) |A02EA09CA/CB/CC |
|0450056|37884 |OIL PUMP ASSEMBLY (4999202399920239A06) |A01EA05CA |
|0450067|12345 |DRIVE TRAIN, TRANSMISSION (6 SPEED) ALL OPTIONS (49VICTRANS08)|V08AB26/GB26/LB26 |
|0450068|1000 |SUSPENSION (7043244) (49SNOWSHOCKFRONT7043244SB) |S09PR6HSL/PS6HSL/HEL|
|0450066|12345 |DRIVE TRAIN, CLUTCH, PRIMARY (49SNOWDRIVECLUTCH09600TRG) |S09PR6HSL/PS_HSL/H_L|
|0450069|12346 |DRIVE TRAIN, CLUTCH, PRIMARY (49SNOWDRIVECLUTCH09600TRG) | |
+-------+-----------+---------------------------------------------------------------+--------------------+
Затем вы можете обработать Assembly_name
, используя RegexTokenier и StopwordsRemover, models
- это упрощенная версия текущей публикации, которую вы можете пропустить Шаг-1, но обратите внимание на глубину массивов.
(Примечание: удалено S09PR6HSL/_S__SL/H_L
из последней записи для тестирования)