как использовать HyperLogLogPlus в pyspark - PullRequest
0 голосов
/ 18 октября 2019

Я пытаюсь использовать приблизительное число, отличное в pyspark, используя реализацию HyperLogLogPlus. Но если я попробую это:

from py4j.java_gateway import java_import
df = spark.createDataFrame([("4G_band1800", 12.0, 18.0, "TRUE"),
                            ("4G_band1800", 12.0, 18.0, "FALSE"),
                            ("4G_band1801", np.nan, 18.0, "TRUE"),
                            ("4G_band1801", None, 18.0, "TRUE")],
                            ("band", "A3", "A5", "status"),3)
java_import(sc._gateway.jvm, "com.clearspring.analytics.stream.cardinality.HyperLogLogPlus")

hp = sc._gateway.jvm.HyperLogLogPlus(4, 16)


def mapper(x):
    if x:
        hp.offer(x)
        # do something else
    return hp

def reducer(hp1, hp2):
    hp1.addAll(hp2)
    return hp1

a = df.rdd.mapPartitions(mapper).reduce(reducer)
a.cardinality()

Я получу эту ошибку:

py4j.protocol.Py4JError: An error occurred while calling o39.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

После некоторого анализа кажется, что эта ошибка связана с тем, что pickle не может сериализовать hp,Я попытался pyspark.sql.functions.approx_count_distinct(col, rsd=None), но это обрабатывает столбцы данных, в то время как мне нужно что-то, что может работать внутри отображения. Есть ли в любом случае, что мы можем напрямую использовать Java или класс Scala внутри метода карты? Спасибо!

...