Вы можете сначала загрузить Dataframe с помощью Timestamp
, 'Status' и всего остального как String
.
input_df=spark.createDataFrame(sc.textFile("log_lines.log").map(lambda x : tuple([x[0:17], x[18:22], x[23:]])), ["time_val","status","message"])
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
|20/06/25 12:19:34| INFO|executor.EXECUTOR...|
+-----------------+------+--------------------+
Теперь вы сначала обрабатываете строки с Message,Range,Value
, как показано ниже,
input_df.filter(F.col("message").startswith("datasources.FileScanRDD")).withColumn("log_message", F.split(F.col("message"), ",")[0]).withColumn("range", F.split(F.col("message"), ",")[1]).withColumn("value", F.split(F.col("message"), ",")[2])..drop("message").drop("message").show()
+-----------------+------+--------------------+--------------+--------------------+
| time_val|status| log_message| range| value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...| range:0-27899| partition values...|
+-----------------+------+--------------------+--------------+--------------------+
Затем вы можете обработать другую строку, в которой только что есть сообщение,
input_df.filter(~(F.col("message").startswith("executor"))).show()
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
+-----------------+------+--------------------+