Разделить строку в Dataframe с помощью Scala на Spark - PullRequest
0 голосов
/ 04 марта 2019

У меня есть лог-файл, который содержит более 100 столбцов.Из которых мне понадобились только два столбца: _raw и _time, поэтому я создал загруженный файл журнала как «csv» DF.

Шаг 1:

scala> val log = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("soa_prod_diag_10_jan.csv")
log: org.apache.spark.sql.DataFrame = [ARRAffinity: string, CoordinatorNonSecureURL: string ... 126 more fields]

Шаг 2: Я зарегистрировал DF как временную таблицу log.createOrReplaceTempView("logs")

Шаг 3: Я извлек два обязательных столбца '_raw'и '_time'

scala> val sqlDF = spark.sql("select _raw, _time from logs")
sqlDF: org.apache.spark.sql.DataFrame = [_raw: string, _time: string]

scala> sqlDF.show(1, false)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|_raw                                                                                                                                                                                                                                                                                                                                                                                                |_time|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [ERROR] [OSB-473003] [oracle.osb.statistics.statistics] [tid: [ACTIVE].ExecuteThread: '28' for queue: 'weblogic.kernel.Default (self-tuning)'] [userId: <anonymous>] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b,0] [partition-name: DOMAIN] [tenant-name: GLOBAL] Aggregation Server Not Available. Failed to get remote aggregator[[|null |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
only showing top 1 row

Мое требование:

Мне нужно разбить строку в столбце '_raw', чтобы получить [2019-01-10T23: 59: 59.998-06: 00] [xx_yyy_zz_sss_ra10] [ОШИБКА] [OSB-473003] [oracle.osb.statistics.statistics] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b] с именами столбцов a, b, c, d, e, fсоответственно

Также удалите все нулевые значения как из '_raw', так и из '_time'

Ваши ответы приветствуются:)

1 Ответ

0 голосов
/ 04 марта 2019

Вы можете разделить функцию и разделить _raw по пробелам.Это вернет массив, а затем вы можете извлечь значения из этого массива.Вы также можете использовать функцию regexp_extract для извлечения значений из сообщений журнала.Оба способа показаны ниже.Я надеюсь, что это полезно.

//Creating Test Data
val df = Seq("[2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [ERROR] [OSB-473003] [oracle.osb.statistics.statistics] [tid: [ACTIVE].ExecuteThread: '28' for queue: 'weblogic.kernel.Default (self-tuning)'] [userId: <anonymous>] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b,0] [partition-name: DOMAIN] [tenant-name: GLOBAL] Aggregation Server Not Available. Failed to get remote aggregator[[")
  .toDF("_raw")

val splitDF = df.withColumn("split_raw_arr", split($"_raw", " "))
  .withColumn("A", $"split_raw_arr"(0))
  .withColumn("B", $"split_raw_arr"(1))
  .withColumn("C", $"split_raw_arr"(2))
  .withColumn("D", $"split_raw_arr"(3))
  .withColumn("E", $"split_raw_arr"(4))
  .drop("_raw", "split_raw_arr")

splitDF.show(false)

+-------------------------------+--------------------+-------+------------+----------------------------------+
|A                              |B                   |C      |D           |E                                 |
+-------------------------------+--------------------+-------+------------+----------------------------------+
|[2019-01-10T23:59:59.998-06:00]|[xx_yyy_zz_sss_ra10]|[ERROR]|[OSB-473003]|[oracle.osb.statistics.statistics]|
+-------------------------------+--------------------+-------+------------+----------------------------------+

val extractedDF = df
  .withColumn("a", regexp_extract($"_raw", "\\[(.*?)\\]",1))
  .withColumn("b", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\]",2))
  .withColumn("c", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",3))
  .withColumn("d", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",4))
  .withColumn("e", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",5))
  .withColumn("f", regexp_extract($"_raw", "(?<=ecid: )(.*?)(?=,)",1))
  .drop("_raw")

+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+
|a                            |b                 |c    |d         |e                               |f                                            |
+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+
|2019-01-10T23:59:59.998-06:00|xx_yyy_zz_sss_ra10|ERROR|OSB-473003|oracle.osb.statistics.statistics|92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b|
+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...