Я пытаюсь извлечь несколько значений столбцов из существующего столбца в потоковом фрейме данных pyspark.
Я читаю поток, используя
stream_dataframe = spark_session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", broker) \
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.load()
В настоящее время я разделяю строку на столбец value и применив к нему схему,
assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
# split attributes to nested array in one Column
col = split(sdf[col_name], split_str)
# now expand col to multiple top-level columns
for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
return sdf
Я хотел использовать именованное регулярное выражение вместо указанного выше.
Я попытался использовать приведенный ниже код,
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}\/\w{3}\/\d{4}\:\d{2}\:\d{2}\:\d{2} (\+|\-)\d{4})\]'
method = r'\s([A-Z]{3,7})\s'
# url = r'\s((\/((\w+\/*\?*.(\w))+)\s))'
url = r'\s(\/[a-zA-Z0-9\/\S]+)'
protocol = r'\s([A-Z]+\/\d\.\d)\s'
status_pattern_size = r'\s(\d{3})\s(\d+)\s'
uuid_pattern = r'(([A-Za-z0-9\-]+)$)|(([0-9a-f]{32})$)'
df = df.selectExpr(regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', ts_pattern, 1).alias('time'),
regexp_extract('value', method, 1).alias('http_method'),
regexp_extract('value', url, 1).alias('request_uri'),
regexp_extract('value', protocol, 1).alias('http_protocol'),
regexp_extract('value', status_pattern_size, 1).cast('integer').alias('response_status'),
regexp_extract('value', status_pattern_size, 2).cast('integer').alias('response_time'),
regexp_extract('value', uuid_pattern, 1).alias('instance_id'))
Это вызывает у меня сообщение об ошибке: Column is not iterable
.
Я хотел использовать вместо этого следующее регулярное выражение имени, так как приведенное выше приведет к нескольким вызовам regexp_extract,
(?P<host>\S+)\s+\S+\s+(?P<user>\S+)\s+\[(?P<time>.*?)\]\s+(?P<http_method>\S+)\s+(?P<request_uri>\S+)\s+(?P<http_protocol>\S+)\s+(?P<response_status>\S+)\s+(?P<reponse_time>\S+)\s+(?P<instance_id>\S+)
для извлечения значения для соответствующих столбцов. Возможно ли это сделать в потоковом фрейме данных pyspark?