Преобразование элемента массива после разбиения - PullRequest
2 голосов
/ 04 ноября 2019

У меня есть спарк DF с 1 столбцом массива col1

+--------------------------+
|COL1                      |                                                                                    
|                          |
+---------------------------
|[101:10001:A, 102:10002:B]|
|[201:20001:B, 202:20002:A]|                                        

Для всех элементов в массиве Я делю на : ихотите выбрать первую часть (например, 101), если последняя часть разделения равна A, а в противном случае - нет.

Ожидаемый результат:

+--------------------------+
|COL2                      |                                                                                    
|                          |
+---------------------------
|[101, None]               |
|[None, 202]               |                                        

Код:

expr = """
     transform(COL1,
     e -> when(split(e,':')[2] == 'A', split(e, ':')[0]).otherwise(None)
     )
    """
df = df.withColumn("COL2", expr(expr))

Я получил ParseException

\nextraneous input '(' expecting {')', ','}(line 3, pos 70)\n\n== SQL ==\n\n         transform(COL1,\n         e -> when(split(e,':')[2] == 'A', split(e, ':')[0]).otherwise(None)\n----------------------------------------------------------------------^^^\n         )\n        \n"
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 675, in expr
    return Column(sc._jvm.functions.expr(str))
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)

1 Ответ

2 голосов
/ 04 ноября 2019

Внутри функции expr() вам нужно использовать синтаксис SQL, а также вы должны переименовать переменную expr, которая перезаписывает функцию API expr и делает ее строкой. (используйте NULL в SQL, если вам не нужна буквальная строка None):

from pyspark.sql.functions import expr

expr1 = """ 
    transform(COL1, 
        e -> IF(split(e,':')[2] = 'A', split(e,':')[0], "None") 
    )
"""       

df.withColumn("COL2", expr(expr1)).show() 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...