Как я могу легко использовать свои собственные методы класса среди операций pyspark? - PullRequest
0 голосов
/ 29 января 2019

У меня есть класс Age, файл csv и сеанс времени выполнения pyspark

ages.csv

Name;Age
alpha;noise20noise
beta;noi 3 sE 0
gamma;n 4 oi 0 se
phi;n50ise
detla;3no5ise
kappa;No 4 i 5 sE
omega;25noIsE

, который читается практически как (после анализа столбца Age):

Name;Age
alpha;20
beta;30
gamma;40
phi;50
detla;35
kappa;45
omega;25

Определенный класс: Age age.py

import re

class Age:
    # age is a number representing the age of a person
    def __init__(self, age):
        self.age = age

    def __eq__(self, other):
        return self.age == self.__parse(other)

    def __lt__(self, other):
        return self.age < self.__parse(other)

    def __gt__(self, other):
        return self.age > self.__parse(other)

    def __le__(self, other):
        return self.age <= self.__parse(other)

    def __ge__(self, other):
        return self.age >= self.__parse(other)

    def __parse(self, age):
        return int(''.join(re.findall(r'\d', age)))

# Let's test this class
if __name__ == '__main__':
    print(Age(18) == 'noise18noise')
    print(Age(18) <= 'aka 1 fakj 8 jal')
    print(Age(18) >= 'jaa 18 ka')
    print(Age(18) < '1 kda 9')
    print(Age(18) > 'akfa 1 na 7 noise')

Output:
True
True
True
True
True

Тест сработал.Я хочу использовать его в pyspark

Запустите pyspark, прочитайте ages.csv и импортируйте Age

Using Python version 3.6.7 (default, Oct 23 2018 19:16:44)
SparkSession available as 'spark'.
>>> ages = spark.read.csv('ages.csv', sep=';', header=True)
19/01/28 14:44:18 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
>>> ages.show()
+-----+------------+
| Name|         Age|
+-----+------------+
|alpha|noise20noise|
| beta|  noi 3 sE 0|
|gamma| n 4 oi 0 se|
|  phi|      n50ise|
|detla|     3no5ise|
|kappa| No 4 i 5 sE|
|omega|     25noIsE|
+-----+------------+

Теперь я хочу получить всех людей, которым 20 лет, например

>>> from age import Age
>>> ages.filter(ages.Age == Age(20)).show()

И это ошибка, которую я получаю

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/column.py", line 116, in _
    njc = getattr(self._jc, name)(jc)
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in _build_args
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in <listcomp>
File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 298, in get_command_part
AttributeError: 'Age' object has no attribute '_get_object_id'

Итак, моя 1-я проблема - как решить эту ошибку

Это моя первая попытка решитьэта проблема: я изменил определение class Age, чтобы расширить str следующим образом:

age.py

...
class Age(str):
    ....

Как вторая попытка:

>>> ages.filter(ages.Age == Age(20)).show()
+----+---+
|Name|Age|
+----+---+
+----+---+

Тем не менее у нас все еще есть:

>>> 'noise20noise' == Age(20)
True

Как видите, AttributeError: 'Age' object has no attribute '_get_object_id' исчезает, но не вычисляет правильный ответ, что является моей 2-й проблемой

Снова вот моя попытка: я использую пользовательские функции pyspark

>>> import pyspark.sql.functions as F
>>> import pyspark.sql.types as T
>>> eq20 = F.udf(lambda c: c == Age(20), T.BooleanType())
>>> ages.filter(eq20(ages.Age)).show()
+-----+------------+
| Name|         Age|
+-----+------------+
|alpha|noise20noise|
+-----+------------+

Теперь это работает.Но вот в чем дело: мне больше всего нравится первая идиома

>>> ages.filter(ages.Age == Age(20)).show()

, которая проще и выразительнее.Я не хочу определять такую ​​функцию, как eq20, eq21, less_than50, greater_than30, etc каждый раз

Я мог бы сделать это определение в самом классе Age, но я не знаю, как это сделать.Тем не менее, это то, что я пробовал до сих пор, используя python decorator

age.py

# other imports here
...

import pyspark.sql.functions as F
import pyspark.sql.types as T

def connect_to_pyspark(function):
    return F.udf(function, T.BooleanType())

class Age(str):
    ...

    @connect_to_pyspark
    def __eq__(self, other):
        return self.age == self.__parse(other)

    ...
    # do the same decorator for the other comparative methods

Тест снова:

>>> ages.filter(ages.Age == Age(20)).show()
+----+---+
|Name|Age|
+----+---+
+----+---+

И это неРабота.Или мой декоратор плохо написан?

Как все это решить?Достаточно ли хорошо мое решение первой проблемы?Если нет, то что нужно сделать вместо этого?Если да, то как решить вторую проблему?

1 Ответ

0 голосов
/ 29 января 2019

Получение ages.Age == Age(20) будет довольно сложным, потому что spark не соблюдает соглашения Python по реализации __eq__.Подробнее об этом позже, но если вы в порядке Age(20) == ages.Age, у вас есть несколько вариантов.ИМХО, самый простой способ сделать это - обернуть логику разбора в udf:

parse_udf = F.udf(..., T.IntegerType())
class Age:
    ...
    def __eq__(self, other: Column):
        return F.lit(self.age) == parse_udf(other)

Обратите внимание, что Age не имеет подкласса str, это просто причинит вред,Если вы хотите использовать декоратор, ваш декоратор не должен возвращать udf, он должен возвращать функцию, которая применяет udf.Вот так:

import re
import pyspark.sql.functions as F
import pyspark.sql.types as T

def connect_to_pyspark(function):
  def helper(age, other):
    myUdf = F.udf(lambda item_from_other: function(age, item_from_other), T.BooleanType())
    return myUdf(other)
  return helper

class Age:

    def __init__(self, age):
      self.age = 45

    def __parse(self, other):
      return int(''.join(re.findall(r'\d', other)))

    @connect_to_pyspark
    def __eq__(self, other):
        return self.age == self.__parse(other)

ages.withColumn("eq20", Age(20) == df.Age).show()

Подробнее о том, почему вам нужно использовать Ages(20) == ages.Age.В python, если вы делаете a == b и класс a не знает, как сравнивать с b, он должен вернуть NotImplemented, а затем python попытается b.__eq__(a), но spark никогда не вернет NotImplemented, поэтому __eq__из Age будет вызываться только в том случае, если оно указано первым в выражении: (.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...