Как я могу преобразовать входную строку в словарь для каждой строки столбца в pyspark - PullRequest
0 голосов
/ 24 сентября 2019

У меня есть значения столбца информационного кадра, где я получаю строковый ввод, как показано ниже, где startIndex - это индекс начала каждого символа, end-индекс - это конец появления этого символа в строке, а flag - сам символ..

    +---+------------------+
    | id|    Values        |
    +---+------------------+
    |01 |  AABBBAA         |
    |02 |  SSSAAAA         |
    +---+------------------+

Теперь я хочу преобразовать строку в словарь для каждой строки, как показано ниже:

    +---+--------------------+
    | id|    Values          |
    +---+--------------------+
    |01 |  [{"startIndex":0, |
    |   |    "endIndex" : 1, | 
    |   |    "flag" : A },   |
    |   |   {"startIndex":2, |
    |   |    "endIndex" : 4, |
    |   |    "flag" : B },   |
    |   |   {"startIndex":5, |
    |   |    "endIndex" : 6, |
    |   |    "flag" : A }]   |
    |02 |  [{"startIndex":0, |
    |   |    "endIndex" : 2, |
    |   |    "flag" : S },   |
    |   |   {"startIndex":3, |
    |   |    "endIndex" : 6, |
    |   |    "flag" : A }]   |
    +---+--------------------+-

У меня есть псевдокод для создания словаря, но я не уверен, как его применить.это все строки за один раз без использования петель.Также проблема такого подхода в том, что только последний словарь в рамке перезаписывается во всех строках


        import re
        x = "aaabbbbccaa"
        xs = re.findall(r"((.)\2*)", x)
        print(xs)
        start = 0
        output = '' 
        for item in xs:
            end = start + (len(item[0])-1)
            startIndex = start
            endIndex = end
            qualityFlag = item[1]
            print(startIndex, endIndex, qualityFlag)
            start = end+

1 Ответ

1 голос
/ 25 сентября 2019

Использование udf () для обертывания логики кода и to_json () для преобразования массива структур в строку:

from pyspark.sql.functions import udf, to_json
import re

df = spark.createDataFrame([
      ('01', 'AABBBAA')
    , ('02', 'SSSAAAA')
  ] , ['id', 'Values']
)

# argument `x` is a StringType() over the udf function
# return `row` as a list of dicts
@udf('array<struct<startIndex:long,endIndex:long,flag:string>>')
def set_fields(x):
    row = []
    for m in re.finditer(r'(.)\1*', x):
        row.append({
            'startIndex': m.start()
          , 'endIndex': m.end()-1
          , 'flag': m.group(1)
        })
    return row

df.select('id', to_json(set_fields('Values')).alias('Values')).show(truncate=False)
+---+----------------------------------------------------------------------------------------------------------------------------+
|id |Values                                                                                                                      |
+---+----------------------------------------------------------------------------------------------------------------------------+
|01 |[{"startIndex":0,"endIndex":1,"flag":"A"},{"startIndex":2,"endIndex":4,"flag":"B"},{"startIndex":5,"endIndex":6,"flag":"A"}]|
|02 |[{"startIndex":0,"endIndex":2,"flag":"S"},{"startIndex":3,"endIndex":6,"flag":"A"}]                                         |
+---+----------------------------------------------------------------------------------------------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...