передача объекта в UDF в pyspark - PullRequest
0 голосов
/ 02 ноября 2019

Мне нужно применить метод к каждой ячейке столбца в Spark DataFrame. Я использую базу данных для поиска значения ячейки. Я использую UDF, который получает базу данных в качестве входных данных, как показано ниже, но он не работает и возвращает ошибку.

from pyspark.sql.functions import udf, col
import random


asndb = pyasn.pyasn('/dbfs/mnt/geoip/ipasn.db')

def asn_mapper(ip, asndb):
  try:
    ret = asndb.lookup(ip)
    ret = ret[0]
    if ret == None:
      return '0'
    else: return str(ret)
  except:
    return '0'

def make_asn(asndb):
     return udf(lambda c: asn_mapper(c, asndb))

b= sqlContext.createDataFrame([("A", '22.33.44.55'), ("B", '11.22.11.44'), ("D", '44.32.11.44')],["Letter", "ip"])

b.withColumn("asn", make_asn(asndb)(col("ip"))).show()




/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1094.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 276, 10.65.251.77, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 394, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/databricks/spark/python/pyspark/worker.py", line 246, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
  File "/databricks/spark/python/pyspark/worker.py", line 160, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/databricks/spark/python/pyspark/worker.py", line 71, in read_command
    command = serializer.loads(command.value)
  File "/databricks/spark/python/pyspark/serializers.py", line 672, in loads
    return pickle.loads(obj)
UnpicklingError: state is not a dictionary

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:496)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)

НО, если я помещу базу данных в UDF, она будет работать. Следующий код работает. Я не хочу добавлять pyasn.pyasn('/dbfs/mnt/geoip/ipasn.db') внутри UDF, это делает его очень медленным.

def asn_mapper(ip):
  asndb = pyasn.pyasn('/dbfs/mnt/geoip/ipasn.db')
  try:
    ret = asndb.lookup(ip)
    ret = ret[0]
    if ret == None:
      return '0'
    else: return str(ret)
  except:
    return '0'

def make_asn():
     return udf(lambda c: asn_mapper(c, ))

b= sqlContext.createDataFrame([("A", '22.33.44.55'), ("B", '11.22.11.44'), ("D", '44.32.11.44')],["Letter", "ip"])

b.withColumn("asn", make_asn()(col("ip"))).show()

Есть ли способ запустить первый код?

1 Ответ

0 голосов
/ 03 ноября 2019

Мне кажется, вы пытаетесь найти ip из geodb, используйте объединение двух таблиц. на ip колонке. Это должно сработать.

...