Подход к исправлению данных столбца assembly_id и имя_сборки в версии 2.4.4 - PullRequest
2 голосов
/ 01 ноября 2019

Я работал над задачей очистки данных в spark 2.4.4, но застрял в следующих двух задачах (упомянуто в разделе вопросов) . Ниже приведены данные и вопросы:

1. Смонтируйте данные и прочитайте файл паркета в кадре данных

partFitmentRawDF = sqlContext.read.parquet("/mnt/blob/devdatasciencesto/pga-parts-forecast/raw/parts-fits/")

2. Пример данных

display(partFitmentRawDF)
Itemno   Assembly_id    Assembly_name
0450056   44011         OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056   135502        OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056   37884         OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)

Я сделал другую обработку, чтобы данные выглядели как выше, но я застрял в следующих задачах

3.Вопрос

a. Если мы посмотрим на строку 2 и столбец Assembly_name, то есть три идентификатора A02EA09CA/CB/CC, но они были объединены. Не могли бы вы предложить, как это сделать A02EA09 A02EA09CB A02EA09CC. По сути, все части должны иметь отдельный идентификатор, соединенный с одним пробелом между ними. Другой пример той же проблемы - изменить DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08) на DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26 V08GB26 V08LB26 ALL OPTIONS (49VICTRANS08) или SUSPENSION (7043244) - S09PR6HSL/PS6HSL/HEL (49SNOWSHOCKFRONT7043244SB) на SUSPENSION (7043244) - S09PR6HSL S09PS6HSL S09PS6HEL (49SNOWSHOCKFRONT7043244SB).

b. Перевернуть несколько строк в assemble_id and assembly_name column, принадлежащих одномуitemno в одну строку и удалите повторяющиеся слова.

Таким образом, следующий набор данных

Itemno   Assembly_id    Assembly_name
0450056   44011         OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056   135502        OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056   37884         OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)

приведет к указанному ниже окончательному набору данных (в котором нет знаков препинания, более одного пробела междуслова и повторяющиеся слова)

Itemno   Assembly_id            Assembly_name
0450056  44011 135502 37884     OIL PUMP ASSEMBLY A01EA09CA 4999202399920239A06 A02EA09CA A02EA09CB A02EA09CC 4999202399920239A06

Не могли бы вы помочь мне в этом? Заранее благодарим за любезную помощь!

Проблемы после тестирования решения JXC

1. Проблема конкатенации

Если начальный набор данных имеет следующий вид

itemno  fits_assembly_id    fits_assembly_name
1322660 35459               DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/PS6HSL/HEL (49SNOWDRIVECLUTCH09600TRG)

После step-3 , выполняется следующее

+-------------------------------------------------------------------------------------------+
|temp1                                                                                      |
+-------------------------------------------------------------------------------------------+
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PS6HEL, 49SNOWDRIVECLUTCH09600TRG]|

Первоначально это было S09PR6HSL/PS6HSL/HEL, поэтому оно должно измениться на S09PR6HSL S09PS6HSL S09PR6HEL, но это S09PR6HSL S09PS6HSL S09PS6HEL. 3-й пункт должен быть S09PR6HEL, но это S09PS6HEL. Я полагаю, что эта часть должна быть объединена с самой первой строки и добавлена ​​ко всем остальным после /.

2. Замена подчеркивания: Это новое, потому что я только что заметил несколько строк с этим. Иногда строка после / имеет подчеркивание. В этом случае буква (из первой строки) из той же позиции, что и подчеркивание, должна заменить _ во второй или более поздней строке из самой первой строки. Например, если данные DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/PS_HSL/H_L (49SNOWDRIVECLUTCH09600TRG), они должны измениться на DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL S09PS6HSL S09PR6HSL (49SNOWDRIVECLUTCH09600TRG). Здесь In /PS_HSL/ _ заменено значением 6, поскольку PS_HSL соответствует PR6HSL, поэтому замените _ на 6 и добавьте S09, чтобы сделать его полным идентификатором, равным S09PS6HSL. В основном, взять данные из первой строки и добавить их к более поздним строкам (после /) , если длина не совпадает, чтобы сделать его полным идентификатором. Если есть _, возьмите данные из той же позиции, что и _ из первой строки, и замените их в последующих строках идентификатора.

3. Отдельные подстроки, объединенные / исохранить остаток строки в новом столбце, добавленном к тому же кадру данных

Например:

enter image description here

Ошибка после шага 0:

enter image description here

1 Ответ

2 голосов
/ 02 ноября 2019

Хороший вопрос, в идеале я бы пошел с udf , чтобы упростить задачу, но поскольку эта задача является хорошим примером использования функций высшего порядка Spark SQL ... Может быть, немного многословно, так чтоЯ разделил это на 4 шага. Дайте мне знать, если это работает, и любые вопросы приветствуются:

Шаг 1: преобразовать строку в массив строк

разделить строку по шаблону (?:(?!/)\p{Punct}|\s)+')), которыйявляется последовательной пунктуацией (кроме /) или пробелами, а затем отфильтровывает пустые позиции (в начале / в конце). Временный столбец temp1 используется для сохранения промежуточных столбцов.

from pyspark.sql.functions import split, expr

df1 = df.withColumn('temp1', split('Assembly_name', r'(?:(?!/)\p{Punct}|\s)+')) \
        .withColumn('temp1', expr("filter(temp1, x -> x <> '')"))

df1.select('temp1').show(truncate=False)
+-------------------------------------------------------------------------------------+
|temp1                                                                                |
+-------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06]                                |
|[OIL, PUMP, ASSEMBLY, A02EA09CA/CB/CC, 4999202399920239A06]                          |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06]                                |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26/GB26/LB26, ALL, OPTIONS, 49VICTRANS08]|
|[SUSPENSION, 7043244, S09PR6HSL/PS6HSL/HEL, 49SNOWSHOCKFRONT7043244SB]               |
+-------------------------------------------------------------------------------------+

Шаг 2: преобразование temp1 в массив массивов

разделение элементов массива снова с использованием /, поэтомучто все part-id в их собственном элементе массива

df2 = df1.withColumn('temp1', expr("transform(temp1, x -> split(x, '/'))"))
df2.select('temp1').show(truncate=False)
+----------------------------------------------------------------------------------------------------------+
|temp1                                                                                                     |
+----------------------------------------------------------------------------------------------------------+
|[[OIL], [PUMP], [ASSEMBLY], [A01EA09CA], [4999202399920239A06]]                                           |
|[[OIL], [PUMP], [ASSEMBLY], [A02EA09CA, CB, CC], [4999202399920239A06]]                                   |
|[[OIL], [PUMP], [ASSEMBLY], [A01EA05CA], [4999202399920239A06]]                                           |
|[[DRIVE], [TRAIN], [TRANSMISSION], [6], [SPEED], [V08AB26, GB26, LB26], [ALL], [OPTIONS], [49VICTRANS08]] |
|[[SUSPENSION], [7043244], [S09PR6HSL, PS6HSL, HEL], [49SNOWSHOCKFRONT7043244SB]]                          |
+----------------------------------------------------------------------------------------------------------+

Шаг-3: используйте агрегат для сброса идентификаторов part *

Функция агрегата будет работать на внутренних массивах:

df3 = df2.withColumn('temp1', expr("""

       flatten(
         transform(temp1, x ->
           transform(sequence(1, size(x)), i ->
             aggregate(
                 sequence(1, i)
               , x[0]
               , (acc,j) -> concat(substr(acc, 1, length(x[0])-length(x[j-1])), x[j-1])
             )
           )
         )
       )

    """))

df3.select('temp1').show(truncate=False)
+----------------------------------------------------------------------------------------------+
|temp1                                                                                         |
+----------------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06]                                         |
|[OIL, PUMP, ASSEMBLY, A02EA09CA, A02EA09CB, A02EA09CC, 4999202399920239A06]                   |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06]                                         |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26, V08GB26, V08LB26, ALL, OPTIONS, 49VICTRANS08] |
|[SUSPENSION, 7043244, S09PR6HSL, S09PS6HSL, S09PS6HEL, 49SNOWSHOCKFRONT7043244SB]             |
+----------------------------------------------------------------------------------------------+

Где:

  • transform(temp1, x -> func1(x)): перебирать каждый элемент в массиве temp1 для запуска func1 (x) , x - это внутренний массив (массив строк)
  • func1(x), упомянутый выше, является другой функцией преобразования, которая выполняет итерацию последовательности (1, размер (x)) и выполните func2 (i) для каждого i :

    transform(sequence(1, size(x)), i -> func2(i))
    
  • func2(i), упомянутого выше, является агрегатной функцией, которая выполняет итерацию попоследовательность (1, i) с начальным значением x [0] и накапливать / уменьшать с помощью функции:

    (acc,j) -> concat(substr(acc, 1, length(acc)-length(x[j-1])), x[j-1])
    

    Примечание: substr() позицияНа основе 1 и array-indexing на 0, поэтому нам нужно x [j-1] для ссылки на текущий элемент массива в приведенной выше функции уменьшения / агрегирования

  • наконец, запуститеflatten() для объединения внутренних массивов

    На этом шаге выполняется что-то вроде следующего псевдо-цикла:

    for x in temp1:
      for i in range(1, size(x)+1):
        acc = x[0]
        for j in range(1,i+1):
          acc = concat(substr(acc, 1, length(acc)-length(x[j-1])), x[j-1])
    

Шаг 4: объединение и удаление дубликатов

df4 = df3.groupby('Itemno').agg(
      expr("concat_ws(' ', array_distinct(flatten(collect_list(temp1)))) AS Assembly_names")
    , expr("concat_ws(' ', collect_set(Assembly_id)) as Assembly_ids")
  )

Где:

  • использовать collect_list () , чтобы получить массив массивов (temp1, который является массивом строк)
  • используйте flatten () для преобразования вышеуказанного в массив строк
  • используйте array_distinct () для удаления дубликатов
  • используйте concat_ws () для преобразования вышеуказанного массива в строку

    df4.select('Assembly_names').show(truncate=False)
    +---------------------------------------------------------------------------------------+
    |Assembly_names                                                                         |
    +---------------------------------------------------------------------------------------+
    |OIL PUMP ASSEMBLY A01EA09CA 4999202399920239A06 A02EA09CA A02EA09CB A02EA09CC A01EA05CA|
    |SUSPENSION 7043244 S09PR6HSL S09PS6HSL S09PS6HEL 49SNOWSHOCKFRONT7043244SB             |
    |DRIVE TRAIN TRANSMISSION 6 SPEED V08AB26 V08GB26 V08LB26 ALL OPTIONS 49VICTRANS08      |
    +---------------------------------------------------------------------------------------+
    

ОБНОВЛЕНИЕ:

Первый из них легко исправить, этоГораздо проще, чем существующий (не нужно агрегировать). Для второго, следующее решение требует перебирать строку char по char, что может быть медленным. Если это так, мы можем проверить с помощью udf.

Ниже приведены изменения:

  • Шаг-1: Просто добавьте подчеркивание, чтобы исключитьиз знаков препинания: (обратите внимание, если подчеркивание отображается в других местах текста, возможно, сначала нужно запустить regexp_replace, чтобы очистить их)

    df1 = df.withColumn('temp1', split('Assembly_name', r'(?:(?![/_])\p{Punct}|\s)+')) \ 
            .withColumn('temp1', expr("filter(temp1, x -> x <> '')"))
    
  • Шаг 2: разбить массив массива дальше на массив массивов массивов, самый внутренний массив разделил строку на символы. переверните самый внутренний массив, чтобы его было легко сравнивать.

    df2 = df1.withColumn('temp1', expr("transform(temp1, x -> split(x, '/'))")) \
             .withColumn('temp1', expr("transform(temp1, x -> transform(x, y -> reverse(split(y, ''))) )"))
    
  • Шаг-3: Используйте transform () вместо aggregate () для сброса пар-идентификаторов. мы проверяем y [i] (элемент самого внутреннего массива), если он имеет значение NULL или подчеркивание, а затем заменяем его соответствующим элементом из x [0] [i] . затем мы инвертируем массив и используем concat_ws ('' ..) для преобразования его обратно в строку.

    df3 = df2.withColumn('temp1', expr("""
    
       flatten(
         transform(temp1, x ->
           transform(x, y ->
             concat_ws('', 
               reverse(
                 transform(sequence(0, size(x[0])-1), i -> IF(y[i] is NULL or y[i] == '_', x[0][i], y[i]))
               )
             )
           )
         ) 
       ) 
    
    """))
    

Ниже приведен результат из приведенного выше

df3.select('temp1').show(truncate=False)                                                                           
+---------------------------------------------------------------------------------------------+
|temp1                                                                                        |
+---------------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06]                                        |
|[OIL, PUMP, ASSEMBLY, A02EA09CA, A02EA09CB, A02EA09CC, 4999202399920239A06]                  |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06]                                        |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26, V08GB26, V08LB26, ALL, OPTIONS, 49VICTRANS08]|
|[SUSPENSION, 7043244, S09PR6HSL, S09PS6HSL, S09PR6HEL, 49SNOWSHOCKFRONT7043244SB]            |
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PR6HSL, 49SNOWDRIVECLUTCH09600TRG]  |
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PR6HSL, 49SNOWDRIVECLUTCH09600TRG]  |
+---------------------------------------------------------------------------------------------+

Поле перед обработкой:

df.select('Assembly_name').show(truncate=False)                                                                    
+----------------------------------------------------------------------------------+
|Assembly_name                                                                     |
+----------------------------------------------------------------------------------+
|OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)                               |
|OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)                         |
|OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)                               |
|DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)|
|SUSPENSION (7043244) - S09PR6HSL/PS6HSL/HEL (49SNOWSHOCKFRONT7043244SB)           |
|DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/PS_HSL/H_L (49SNOWDRIVECLUTCH09600TRG)   |
|DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/_S__SL/H_L (49SNOWDRIVECLUTCH09600TRG)   |
+----------------------------------------------------------------------------------+
  • Шаг-4 : без изменений.

ОБНОВЛЕНИЕ-2 добавлено Шаг-0:

Шаг-0: для предварительной обработки столбца Assembly_name, используйте regexp_replace + split , чтобы разделить модели # в новый столбец иудалить его из исходного столбца Assembly_name:

from pyspark.sql.functions import regexp_replace, split

df0 = df.withColumn('new_col', split(regexp_replace('Assembly_name', r'^(.*)-\s*(\S+)(.*)$', '$1$3\0$2'),'\0')) \
    .selectExpr(
        'Itemno'
      , 'Assembly_id'
      , "coalesce(new_col[0], Assembly_name) as Assembly_name"
      , "coalesce(new_col[1], '') as models"
)

df0.show(truncate=False)
+-------+-----------+---------------------------------------------------------------+--------------------+
|Itemno |Assembly_id|Assembly_name                                                  |models              |
+-------+-----------+---------------------------------------------------------------+--------------------+
|0450056|44011      |OIL PUMP ASSEMBLY  (4999202399920239A06)                       |A01EA09CA           |
|0450056|135502     |OIL PUMP ASSEMBLY  (4999202399920239A06)                       |A02EA09CA/CB/CC     |
|0450056|37884      |OIL PUMP ASSEMBLY  (4999202399920239A06)                       |A01EA05CA           |
|0450067|12345      |DRIVE TRAIN, TRANSMISSION (6 SPEED)  ALL OPTIONS (49VICTRANS08)|V08AB26/GB26/LB26   |
|0450068|1000       |SUSPENSION (7043244)  (49SNOWSHOCKFRONT7043244SB)              |S09PR6HSL/PS6HSL/HEL|
|0450066|12345      |DRIVE TRAIN, CLUTCH, PRIMARY  (49SNOWDRIVECLUTCH09600TRG)      |S09PR6HSL/PS_HSL/H_L|
|0450069|12346      |DRIVE TRAIN, CLUTCH, PRIMARY (49SNOWDRIVECLUTCH09600TRG)       |                    |
+-------+-----------+---------------------------------------------------------------+--------------------+

Затем вы можете обработать Assembly_name, используя RegexTokenier и StopwordsRemover, models - это упрощенная версия текущей публикации, которую вы можете пропустить Шаг-1, но обратите внимание на глубину массивов.

(Примечание: удалено S09PR6HSL/_S__SL/H_L из последней записи для тестирования)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...