Функция агрегации с отслеживанием состояния в PySpark - PullRequest
0 голосов
/ 02 октября 2018

В PySpark я пытаюсь определить пользовательский агрегатор , который накапливает состояние .Возможно ли это в Spark 2.3?

AFAIK, теперь возможно определить пользовательский UDAF в PySpark начиная с Spark 2.3 (см. Как определить и использовать пользовательскую функцию агрегирования в Spark SQL? ), позвонив pandas_udf с ключевым словом PandasUDFType.GROUPED_AGG.Однако, учитывая, что он просто принимает функцию в качестве параметра, я не думаю, что можно переносить состояние во время агрегации.

Из Scala я вижу, что возможно агрегирование с сохранением состояния с помощью любого расширения UserDefinedAggregateFunction или org.apache.spark.sql.expressions.Aggregator, но могу ли я сделать нечто подобное только на стороне Python?

1 Ответ

0 голосов
/ 02 октября 2018

Вы можете использовать аккумулятор .

Вы можете использовать встроенную систему управления потоками искр .

пример простого аккумулятора для использования в SQL

from  pyspark.sql.types import IntegerType

# have some data
df = spark.range(10).toDF("num")

# have a table
df.createOrReplaceTempView("num_table")

# have an accumulator
accSum = sc.accumulator(0)

# have a function that accumulates
def add_acc(int_val):
  accSum.add(int_val)
  return int_val

# register function as udf
spark.udf.register("reg_addacc", add_acc, IntegerType())

# use in sql
spark.sql("SELECT sum(reg_addacc(num)) FROM num_table").show()

# get value from accumulator
print(accSum.value)

45

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...