Разделение входного файла журнала в фреймворке Pyspark - PullRequest
0 голосов
/ 08 июля 2020

У меня есть файл журнала, который мне нужно разделить с помощью Pyspark Dataframe. Ниже мой образец файла журнала.

20/06/25 12:19:33 INFO datasources.FileScanRDD: Reading File path: hdfs://bpaiddev/dev/data/warehouse/clean/falcon/ukc/masked_data/parquet/FRAUD_CUSTOMER_INFORMATION/rcd_crt_dttm_yyyymmdd=20200523/part-0042-ed52abc2w.c000.snapp.parquet, range:0-27899, partition values :[20200523]
20/06/25 12:19:34 INFO executor.EXECUTOR: Finished task 18.0 in stage 0.0 (TID 18),18994 bytes result sent to driver

Из образца журнала вы можете увидеть, что первая строка содержит больше деталей по сравнению со второй строкой . Мне нужно Timestamp, Status ,Message,Range,Value столбцов для первой строки, для второй строки я могу иметь только Timestamp,Status,Message столбцов.

Как применить функции регулярного выражения к таким данным? Пожалуйста, помогите мне решить эту проблему. Большое спасибо!

Ожидаемый результат:

    +-----------------+------+--------------------+--------------+--------------------+
    |         time_val|status|         log_message|         range|               value|
    +-----------------+------+--------------------+--------------+--------------------+
    |20/06/25 12:19:33|  INFO|datasources.FileS...| range:0-27899| partition values...|
    |20/06/25 12:19:34|  INFO|executor.EXECUTORd..|              |                    |
    +-----------------+------+--------------------+--------------+--------------------+

1 Ответ

0 голосов
/ 08 июля 2020

Вы можете сначала загрузить Dataframe с помощью Timestamp, 'Status' и всего остального как String.

input_df=spark.createDataFrame(sc.textFile("log_lines.log").map(lambda x : tuple([x[0:17], x[18:22], x[23:]])), ["time_val","status","message"])

+-----------------+------+--------------------+
|         time_val|status|             message|
+-----------------+------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...|
|20/06/25 12:19:34|  INFO|executor.EXECUTOR...|
+-----------------+------+--------------------+

Теперь вы сначала обрабатываете строки с Message,Range,Value, как показано ниже,

input_df.filter(F.col("message").startswith("datasources.FileScanRDD")).withColumn("log_message", F.split(F.col("message"), ",")[0]).withColumn("range", F.split(F.col("message"), ",")[1]).withColumn("value", F.split(F.col("message"), ",")[2])..drop("message").drop("message").show()


+-----------------+------+--------------------+--------------+--------------------+
|         time_val|status|         log_message|         range|               value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...| range:0-27899| partition values...|
+-----------------+------+--------------------+--------------+--------------------+

Затем вы можете обработать другую строку, в которой только что есть сообщение,

input_df.filter(~(F.col("message").startswith("executor"))).show()

+-----------------+------+--------------------+
|         time_val|status|             message|
+-----------------+------+--------------------+
|20/06/25 12:19:33|  INFO|datasources.FileS...|
+-----------------+------+--------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...