Форматирование структурированного потока Kafka в pyspark с использованием именованного регулярного выражения - PullRequest
1 голос
/ 05 мая 2020

Я пытаюсь извлечь несколько значений столбцов из существующего столбца в потоковом фрейме данных pyspark.

Я читаю поток, используя

stream_dataframe = spark_session.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", broker) \
        .option("subscribe", topic) \
        .option("startingOffsets", "earliest") \
        .load()

В настоящее время я разделяю строку на столбец value и применив к нему схему,

assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
# split attributes to nested array in one Column
col = split(sdf[col_name], split_str)
# now expand col to multiple top-level columns
for idx, field in enumerate(schema):
    sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
return sdf

Я хотел использовать именованное регулярное выражение вместо указанного выше.

Я попытался использовать приведенный ниже код,

host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}\/\w{3}\/\d{4}\:\d{2}\:\d{2}\:\d{2} (\+|\-)\d{4})\]'
method = r'\s([A-Z]{3,7})\s'
# url = r'\s((\/((\w+\/*\?*.(\w))+)\s))'
url = r'\s(\/[a-zA-Z0-9\/\S]+)'
protocol = r'\s([A-Z]+\/\d\.\d)\s'
status_pattern_size = r'\s(\d{3})\s(\d+)\s'
uuid_pattern = r'(([A-Za-z0-9\-]+)$)|(([0-9a-f]{32})$)'
df = df.selectExpr(regexp_extract('value', host_pattern, 1).alias('host'),
                   regexp_extract('value', ts_pattern, 1).alias('time'),
                   regexp_extract('value', method, 1).alias('http_method'),
                   regexp_extract('value', url, 1).alias('request_uri'),
                   regexp_extract('value', protocol, 1).alias('http_protocol'),
                   regexp_extract('value', status_pattern_size, 1).cast('integer').alias('response_status'),
                   regexp_extract('value', status_pattern_size, 2).cast('integer').alias('response_time'),
                   regexp_extract('value', uuid_pattern, 1).alias('instance_id'))

Это вызывает у меня сообщение об ошибке: Column is not iterable.

Я хотел использовать вместо этого следующее регулярное выражение имени, так как приведенное выше приведет к нескольким вызовам regexp_extract,

(?P<host>\S+)\s+\S+\s+(?P<user>\S+)\s+\[(?P<time>.*?)\]\s+(?P<http_method>\S+)\s+(?P<request_uri>\S+)\s+(?P<http_protocol>\S+)\s+(?P<response_status>\S+)\s+(?P<reponse_time>\S+)\s+(?P<instance_id>\S+)

для извлечения значения для соответствующих столбцов. Возможно ли это сделать в потоковом фрейме данных pyspark?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...