Я пытаюсь использовать приблизительное число, отличное в pyspark, используя реализацию HyperLogLogPlus. Но если я попробую это:
from py4j.java_gateway import java_import
df = spark.createDataFrame([("4G_band1800", 12.0, 18.0, "TRUE"),
("4G_band1800", 12.0, 18.0, "FALSE"),
("4G_band1801", np.nan, 18.0, "TRUE"),
("4G_band1801", None, 18.0, "TRUE")],
("band", "A3", "A5", "status"),3)
java_import(sc._gateway.jvm, "com.clearspring.analytics.stream.cardinality.HyperLogLogPlus")
hp = sc._gateway.jvm.HyperLogLogPlus(4, 16)
def mapper(x):
if x:
hp.offer(x)
# do something else
return hp
def reducer(hp1, hp2):
hp1.addAll(hp2)
return hp1
a = df.rdd.mapPartitions(mapper).reduce(reducer)
a.cardinality()
Я получу эту ошибку:
py4j.protocol.Py4JError: An error occurred while calling o39.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
После некоторого анализа кажется, что эта ошибка связана с тем, что pickle не может сериализовать hp
,Я попытался pyspark.sql.functions.approx_count_distinct(col, rsd=None)
, но это обрабатывает столбцы данных, в то время как мне нужно что-то, что может работать внутри отображения. Есть ли в любом случае, что мы можем напрямую использовать Java или класс Scala внутри метода карты? Спасибо!