У меня проблемы с функцией UDF в моем потоковом приложении Kafka. Каждый раз, когда вызывается функция UDF, на входе отображается только значение None вместо допустимого значения столбца. Затем возникла ошибка TypeError, потому что приложение ожидает str, а не None.
Определение функции UDF:
@udf(returnType=StringType())
def get_asn(ip_addr):
from fm_kafka2parquet.asn_lookup import AsnLookup
result = AsnLookup\
.get_instance(ASN_DB_PATH)\
.get().lookup(ip_addr)[0] # first record from tuple is ASN number
if result is None:
return "n/a"
return result
Вызов функции UDF:
# data frame for netflow reading
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", CONFIG_KAFKA_BOOTSTRAP) \
.option("subscribe", CONFIG_KAFKA_TOPIC) \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value AS STRING)") \
.withColumn("net", from_json("value", Structures.get_ipfix_structure())) \
.select("net.*")
# remove ipfix prefix in case of ipfixv1 collector
temp_list = []
for c in df.columns:
new_name = c.replace('ipfix.', '')
temp_list.append(new_name)
df = df.toDF(*temp_list)
# enrichment
edf = df \
.withColumn("sourceAS", get_asn('sourceIPv4Address')) \
.withColumn("destinationAS", get_asn('destinationIPv4Address'))
Все заканчивается ошибкой, которая вызывается библиотекой pyasn, используемой функцией UDF get_asn:
TypeError: search_best() argument 1 must be str, not None