Исключение: ваше требование не выполнено: можно извлечь только скалярный векторизованный udf или sql batch udf 'Pyspark - PullRequest
0 голосов
/ 20 января 2020

Я пытаюсь реализовать pandas_udf в pyspark, где pandas_udf вернет тип структуры с 4 полями, и он будет расширен до 4 разных столбцов. Однако, когда я попытался вызвать эту функцию, я получил исключение, из-за которого не удалось выполнить требование: можно извлечь только скалярный векторизованный udf или sql batch udf '

def add_vars(self, df, index_dict_var):
        """

        :return:
        """

        def fill_var_udf(var_dict, index):
            try:
                val = var_dict.value[index]
                return val
            except Exception as e:
                print(index)
                print(var_dict)
                raise e

        def fill_var(var_dict):
            return pandas_udf(lambda index: fill_var_udf(var_dict, index),
                              returnType=StructType([StructField("t0", StringType()),
                                                     StructField("t1", StringType()),
                                                     StructField("t2", StringType()),
                                                     StructField("t3", StringType())]),
                              functionType=PandasUDFType.GROUPED_MAP)

        df = df.withColumn("var", fill_var(index_dict_var)(col('var_index')))
        for i in range(len(self.var_columns)):
            df = df.withColumn(self.var_columns[i], col('var')["t" + str(i)])
        df = df.drop('var_index')
        df.show()
        return df

Здесь var_dict - это словарь, содержащий тип int как ключ и список строки в качестве значения, всякий раз, когда я вызываю эту функцию, я получаю: Исключение: не удалось выполнить требование: можно извлечь только скалярный векторизованный udf или sql пакетный udf 'Pyspark Примечание. Я использую pandas_udf вместо udf для производительности усиление, как указано здесь: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Traceback:

Traceback (most recent call last):
  File "<path to file>", line 463, in steps
    self.run_process_for_all_segments()
  File "<path to file>", line 400, in run_process_for_all_segments
    data = self.get_final_data(segment_id)
  File "<path to file>", line 351, in get_final_data
    df1, df2, df3, df4, df5 = self.sample_segments(segment_id)
  File "<path to file>", line 300, in sample_segments
    data = self.add_variants(df4, self.index_dict_var)
  File "<path to file>", line 248, in add_variants
    df.show()
  File "<virtual_env path>/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 350, in show
  print(self._jdf.showString(n, 20, vertical))
  File "<path to virtual env>/lib/python2.7/site-packages/pyspark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "<path to virtual env>/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
IllegalArgumentException: u'requirement failed: Can only extract scalar vectorized udf or sql batch udf'
Traceback (most recent call last):
  File "<path to file>", line 520, in <module>
    obj1.steps()
  File "<path to file>", line 471, in steps
    raise Exception(e)
Exception: u'requirement failed: Can only extract scalar vectorized udf or sql batch udf'
...