Преобразовать столбец с флагами int в массив строк в pyspark - PullRequest
0 голосов
/ 28 апреля 2020

У меня есть фрейм данных со столбцом, называемым «черты», который представляет собой целое число, состоящее из нескольких флагов.

Мне нужно преобразовать этот столбец в список строк (для elasti c индексация поиска). Преобразование выглядит следующим образом.

TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2
def flag_to_list(flag: int) -> List[str]:
    trait_list = []
    if flag & (1 << TRAIT_0):
        trait_list.append("TRAIT_0")
    elif flag & (1 << TRAIT_1):
        trait_list.append("TRAIT_1")
    elif flag & (1 << TRAIT_2):
        trait_list.append("TRAIT_2")

    return trait_list

Какой самый эффективный способ выполнить это преобразование в pyspark? Я видел много примеров того, как выполнять конкатенацию и разбиение строк, но не такую ​​операцию, как эта.

Использование pyspark vesion 2.4.5

Ввод json выглядит следующим образом: { "name": "John Doe", "traits": 5 } Вывод json должен выглядеть так: { "name": "John Doe", "traits": ["TRAIT_0", "TRAIT_2"] }

Ответы [ 2 ]

1 голос
/ 29 апреля 2020

IIU C, вы можете попробовать встроенные функции Spark SQL: (1) использовать conv + split для преобразования целых чисел (base-10) -> двоичное (base-2) -> string -> массив строк (обратный), (2) на основе значений 0 или 1 и индексов их массивов в filter и transform массив в соответствующий массив именованных признаков:

from pyspark.sql.functions import expr

df = spark.createDataFrame([("name1", 5),("name2", 1),("name3", 0),("name4", 12)], ['name', 'traits'])
#DataFrame[name: string, traits: bigint]

traits = [ "Traits_{}".format(i) for i in range(8) ]
traits_array = "array({})".format(",".join("'{}'".format(e) for e in traits))
# array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')

sql_expr = """

    filter(
      transform(
        /* convert int -> binary -> string -> array of strings, and then reverse the array */
        reverse(split(string(conv(traits,10,2)),'(?!$)')),
        /* take the corresponding items from the traits_array when value > 0, else NULL */
        (x,i) -> {}[IF(x='1',i,NULL)]
      ),
      /* filter out NULL items from the array */
      y -> y is not NULL
    ) AS trait_list

""".format(traits_array)
# filter(
#   transform(
#     reverse(split(string(conv(traits,10,2)),'(?!$)')),
#     (x,i) -> array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')[IF(x='1',i,NULL)]
#   ),
#   y -> y is not NULL
# )

df.withColumn("traits_list", expr(sql_expr)).show(truncate=False)
+-----+------+--------------------+
|name |traits|traits_list         |
+-----+------+--------------------+
|name1|5     |[Traits_0, Traits_2]|
|name2|1     |[Traits_0]          |
|name3|0     |[]                  |
|name4|12    |[Traits_2, Traits_3]|
+-----+------+--------------------+

Ниже приведен результат после выполнения reverse(split(string(conv(traits,10,2)),'(?!$)')), обратите внимание, что шаблон split (?!$) используется, чтобы избежать NULL, отображаемого как последний элемент массива .

df.selectExpr("*", "reverse(split(string(conv(traits,10,2)),'(?!$)')) as t1").show()
+-----+------+------------+
| name|traits|          t1|
+-----+------+------------+
|name1|     5|   [1, 0, 1]|
|name2|     1|         [1]|
|name3|     0|         [0]|
|name4|    12|[0, 0, 1, 1]|
+-----+------+------------+
1 голос
/ 28 апреля 2020

Мы можем определить UDF, чтобы обернуть вашу функцию, а затем вызвать ее. Вот пример кода:

from typing import List
from pyspark.sql.types import ArrayType, StringType

TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2


def flag_to_list(flag: int) -> List[str]:
    trait_list = []
    if flag & (1 << TRAIT_0):
        trait_list.append("TRAIT_0")
    elif flag & (1 << TRAIT_1):
        trait_list.append("TRAIT_1")
    elif flag & (1 << TRAIT_2):
        trait_list.append("TRAIT_2")
    return trait_list


flag_to_list_udf = udf(lambda x: None if x is None else flag_to_list(x),
                       ArrayType(StringType()))

# Create dummy data to test
data = [
    { "name": "John Doe", "traits": 5 },
    { "name": "Jane Doe", "traits": 2 },
    { "name": "Jane Roe", "traits": 0 },
    { "name": "John Roe", "traits": 6 },
]
df = spark.createDataFrame(data, 'name STRING, traits INT')
df.show()
# +--------+------+
# |    name|traits|
# +--------+------+
# |John Doe|     5|
# |Jane Doe|     2|
# |Jane Roe|     0|
# |John Roe|     6|
# +--------+------+

df = df.withColumn('traits_processed', flag_to_list_udf(df['traits']))
df.show()
# +--------+------+----------------+
# |    name|traits|traits_processed|
# +--------+------+----------------+
# |John Doe|     5|       [TRAIT_0]|
# |Jane Doe|     2|       [TRAIT_1]|
# |Jane Roe|     0|              []|
# |John Roe|     6|       [TRAIT_1]|
# +--------+------+----------------+

Если вы не хотите создавать новый столбец, вы можете заменить traits_processed на traits.

...