PySpark UDF, только значения None на входе - PullRequest
0 голосов
/ 05 мая 2020

У меня проблемы с функцией UDF в моем потоковом приложении Kafka. Каждый раз, когда вызывается функция UDF, на входе отображается только значение None вместо допустимого значения столбца. Затем возникла ошибка TypeError, потому что приложение ожидает str, а не None.

Определение функции UDF:

@udf(returnType=StringType())
def get_asn(ip_addr):
    from fm_kafka2parquet.asn_lookup import AsnLookup

    result = AsnLookup\
        .get_instance(ASN_DB_PATH)\
        .get().lookup(ip_addr)[0]  # first record from tuple is ASN number
    if result is None:
        return "n/a"
    return result

Вызов функции UDF:

  # data frame for netflow reading
  df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", CONFIG_KAFKA_BOOTSTRAP) \
      .option("subscribe", CONFIG_KAFKA_TOPIC) \
      .option("startingOffsets", "latest") \
      .load() \
      .selectExpr("CAST(value AS STRING)") \
      .withColumn("net", from_json("value", Structures.get_ipfix_structure())) \
      .select("net.*")

  # remove ipfix prefix in case of ipfixv1 collector
  temp_list = []
  for c in df.columns:
      new_name = c.replace('ipfix.', '')
      temp_list.append(new_name)
  df = df.toDF(*temp_list)

  # enrichment
  edf = df \
      .withColumn("sourceAS", get_asn('sourceIPv4Address')) \
      .withColumn("destinationAS", get_asn('destinationIPv4Address'))

Все заканчивается ошибкой, которая вызывается библиотекой pyasn, используемой функцией UDF get_asn:

TypeError: search_best() argument 1 must be str, not None

Ответы [ 2 ]

0 голосов
/ 07 мая 2020

Кроме того, это выглядит подозрительно.

# remove ipfix prefix in case of ipfixv1 collector
  temp_list = []
  for c in df.columns:
      new_name = c.replace('ipfix.', '')
      temp_list.append(new_name)
  df = df.toDF(*temp_list)

Вы вносите изменения в имена столбцов, а затем выбираете их, но новые имена столбцов не во фрейме данных, верно? Таким образом, он должен возвращать пустой фрейм данных.

Если вы хотите переименовать столбцы, используйте -

df = df.withColumnRenamed(c, c.replace('ipfix.', ''))

Для получения подробной информации о том, как очистить имена столбцов в pyspark, обратитесь к этому - https://www.youtube.com/watch?v=vAHPAP9Oagc&t=1s

0 голосов
/ 06 мая 2020

Попробуйте использовать, как указано ниже. .withColumn ("sourceAS", get_asn (F.col ('sourceIPv4Address'))

...