Как извлечь предыдущее и следующее предложение строки в искре? - PullRequest
0 голосов
/ 18 октября 2018

Я анализирую файл журнала для анализа воздействия на клиента с помощью Apache spark.У меня есть файл журнала, который содержит отметку времени в одной строке, сведения о клиенте в другой строке и ошибку, вызванную в другой строке, я хочу вывод в одном файле, который объединит все извлеченные записи в одну строку.Вот мой файл журнала ниже:

2018-10-15 05:24:00.102 ERROR 7 --- [DefaultMessageListenerContainer-2] c.l.p.a.c.event.listener.MQListener      : The ABC/CDE object received for the xyz event was not valid. e_id=11111111, s_id=111, e_name=ABC

com.xyz.abc.pqr.exception.PNotVException: The r received from C was invalid/lacks mandatory fields. S_id: 123, P_Id: 123456789, R_Number: 12345678
    at com.xyz.abc.pqr.mprofile.CPServiceImpl.lambda$bPByC$1(CPServiceImpl.java:240)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    rContainer.doInvokeListener(AbstractMessageListenerContainer.java:721)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:681)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:651)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Invalid D because cm: null and pk: null were missing.
    at com.xyz.abc.pqr.mp.DD.resolveDetailsFromCDE(DD.java:151)
    at com.xyz.abc.pqr.mp.DD.<init>(DD.java:35)
    at java.util.ArrayList.forEach(ArrayList.java:1249)

2018-10-15 05:24:25.136 ERROR 7 --- [DefaultMessageListenerContainer-1] c.l.p.a.c.event.listener.MQListener      : The ABC/CDE object received for the C1 event was not valid. entity_id=2222222, s_id=3333, event_name=CDE

com.xyz.abc.pqr.PNotVException: The r received from C was invalid/lacks mandatory fields. S_id: 123, P_Id: 123456789, R_Number: 12345678
    at com.xyz.abc.pqr.mp.CSImpl.lambda$buildABCByCo$1(CSImpl.java:240)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at com.xyz.abc.pqr.event.handler.DHandler.handle(CDEEventHandler.java:45)
    at sun.reflect.GMA.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: null

1 Ответ

0 голосов
/ 18 октября 2018

Вы можете использовать API DataFrame, чтобы сделать это несколькими способами.Вот один

import org.apache.spark.sql.functions._

val rd = sc.textFile("/FileStore/tables/log.txt").zipWithIndex.map{case (r, i) => Row(r, i)}
val schema = StructType(StructField("logs", StringType, false) :: StructField("id", LongType, false) :: Nil)
val df = spark.sqlContext.createDataFrame(rd, schema)
df.show

+--------------------+---+
|                logs| id|
+--------------------+---+
|2018-10-15 05:24:...|  0|
|                    |  1|
|com.xyz.abc.pqr.e...|  2|
|    at com.xyz.ab...|  3|
|    at java.util....|  4|
|    rContainer.do...|  5|
|    at org.spring...|  6|
|    at org.spring...|  7|
|    at java.lang....|  8|
|Caused by: java.l...|  9|
|    at com.xyz.ab...| 10|
|    at com.xyz.ab...| 11|
|    at com.xyz.ab...| 12|
|    at java.util....| 13|
|                    | 14|
|2018-10-15 05:24:...| 15|
|                    | 16|
|com.xyz.abc.pqr.P...| 17|

val df1 = df.filter($"logs".contains("c.l.p.a.c.event.listener.MQListener")).withColumn("logs",regexp_replace($"logs","ERROR.*","")).sort("id")
df1.show

+--------------------+---+
|                logs| id|
+--------------------+---+
|2018-10-15 05:24:...|  0|
|2018-10-15 05:24:...| 15|
+--------------------+---+

val df2 = df.filter($"logs".contains("PrescriptionNotValidException:")).withColumn("logs",regexp_replace($"logs","(.*?)mandatory fields.","")).sort("id")
df2.show

+--------------------+---+
|                logs| id|
+--------------------+---+
| StoreId: 123, Co...|  2|
| StoreId: 234, Co...| 17|
+--------------------+---+

val df3 = df.filter($"logs".contains("Caused by: java.lang.")).sort("id")
df3.show
df1.select("logs").collect.toSeq.zip(df2.select("logs").collect.toSeq).zip(df3.select("logs").collect.toSeq)

+--------------------+---+
|                logs| id|
+--------------------+---+
|Caused by: java.l...|  9|
|Caused by: java.l...| 28|
+--------------------+---+

df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [logs: string, id: bigint]
res71: Seq[((org.apache.spark.sql.Row, org.apache.spark.sql.Row), org.apache.spark.sql.Row)] = ArrayBuffer((([2018-10-15 05:24:00.102 ],[ StoreId: 123, Co Patient Id: 123456789, Rx Number: 12345678]),[Caused by: java.lang.IllegalArgumentException: Invalid Dispense Object because compound: null and pack: null were missing.]), (([2018-10-15 05:24:25.136 ],[ StoreId: 234, Co Patient Id: 999999, Rx Number: 45555]),[Caused by: java.lang.NullPointerException: null]))
...