pyspark: эквивалент arrays_zip в Spark 2.3 - PullRequest
0 голосов
/ 29 апреля 2020

Как написать эквивалентную функцию arrays_zip в Spark 2.3?

Исходный код из Spark 2.4

def arrays_zip(*cols):
    """
    Collection function: Returns a merged array of structs in which the N-th struct contains all
    N-th values of input arrays.

    :param cols: columns of arrays to be merged.

    >>> from pyspark.sql.functions import arrays_zip
    >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
    >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
    [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
    """
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))

Как добиться аналога в PySpark?

Ответы [ 2 ]

0 голосов
/ 06 мая 2020

Этого можно добиться, создав пользовательскую функцию

import pyspark.sql.functions as f
import pyspark.sql.types as t

arrays_zip_ = f.udf(lambda x, y: list(zip(x, y)),  
      t.ArrayType(t.StructType([
          # Choose Datatype according to requirement
          t.StructField("first", t.IntegerType()),
          t.StructField("second", t.StringType())
  ])))

df = spark.createDataFrame([(([1, 2, 3], ['2', '3', '4']))], ['first', 'second'])

Теперь результаты отображаются с spark <= 2.3 </strong>

df.select(arrays_zip_('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

И результат с Spark версия 2.4

df.select(f.arrays_zip('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+
0 голосов
/ 06 мая 2020

Вы можете использовать UDF для получения той же функциональности, что и arrays_zip.

from pyspark.sql import functions as F
from pyspark.sql import types as T

def zip_func(*args):
    return zip(*args)

zip_udf = F.udf(zip_func, T.ArrayType(T.IntegerType()))

. Можно использовать так же, как и arrays_zip, например:

* 1008. *
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...