Я использую Spark 2.4.3 и Scala.
Я получаю сообщения из источника потоковой кафки следующей структуры:
{"message": "Jan 7 17:53:48 PA-850.abc.com 1,2020/01/07 17:53:41,001801012404,TRAFFIC,drop,2304,2020/01/07 17:53:41,10.7.26.51,10.8.3.11,0.0.0.0,0.0.0.0,interzone-default,,,not-applicable,vsys1,SERVER-VLAN,VPN,ethernet1/6.18,,test-1,2020/01/07 17:53:41,0,1,45194,514,0,0,0x0,udp,deny,588,588,0,1,2020/01/07 17:53:45,0,any,0,35067255521,0x8000000000000000,10.0.0.0-10.255.255.255,10.0.0.0-10.255.255.255,0,1,0,policy-deny,0,0,0,0,,PA-850,from-policy,,,0,,0,,N/A,0,0,0,0,b804eab2-f240-467a-be97-6f8c382afd4c,0","source_host": "10.7.26.51"}
Моя цель состоит в том, чтобы добавить новый столбец timestamp
в каждую строку с текущей отметкой времени в моих данных потоковой передачи. Я должен вставить все эти строки в таблицу кассандры.
package devices
import configurations._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, from_json, lower, split}
import org.apache.spark.sql.cassandra._
import scala.collection.mutable.{ListBuffer, Map}
import scala.io.Source
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType,TimestampType}
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.unix_timestamp
object PA {
def main(args: Array[String]): Unit = {
val spark = SparkBuilder.spark
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", configHelper.kafka_host)
.option("subscribe", configHelper.log_topic)
.option("startingOffsets", "earliest")
.option("multiLine", true)
.option("includeTimestamp",true)
.load()
df.printSchema()
def getDeviceNameOSMapping():Map[String,String]= {
val osmapping=scala.collection.mutable.Map[String, String]()
val bufferedSource = Source.fromFile(configHelper.os_mapping_file)
for (line <- bufferedSource.getLines) {
val cols = line.split(",").map(_.trim)
osmapping+=(cols(0).toLowerCase()->cols(1).toLowerCase())
}
bufferedSource.close
return osmapping
}
val deviceOSMapping = spark.sparkContext.broadcast(getDeviceNameOSMapping())
val debug = true
val msg = df.selectExpr("CAST(value AS STRING)")
.withColumn("value", lower(col("value")))
.select(from_json(col("value"), cefFormat.cef_json).as("data"))
.select("data.*")
import spark.sqlContext.implicits._
val newDF = msg.withColumn("created", lit(current_timestamp()))
msg.writeStream
.foreachBatch { (batchDF, _) =>
val syslogDF=batchDF.filter(!$"message".contains("invalid syslog message:"))
.filter(!$"message".contains("fluentd worker is now stopping worker=0"))
.filter(!$"message".contains("fluentd worker is now running worker=0"))
val syslogRDD=syslogDF.rdd.map(r=>{
r.getString(0)
}).map(x=>{
parseSysLog(x)
})
.filter(x=>deviceOSMapping.value.contains(x._1))
try {
val threat_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("threat"))
.map(x => Row.fromSeq(x._3)),formatPA.threat_9_0)
if(debug)
threat_9_0_DF.show(true)
threat_9_0_DF.write
.cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
.mode("append")
.save
println("threat_9_0_DF saved")
}
catch {
case e:Exception=>{
println(e.getMessage)
}
}
try {
val traffic_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("traffic"))
.map(x => Row.fromSeq(x._3)),formatPA.traffic_9_0)
if(debug)
traffic_9_0_DF.show(true)
traffic_9_0_DF.write
.cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
.mode("append")
.save
println("traffic_9_0_DF saved")
}
catch {
case e:Exception=>{
println(e.getMessage)
}
}
}.start().awaitTermination()
def parseSysLog(msg: String): (String,String,List[String]) = {
//println("PRINTING MESSAGES")
//println(msg)
val splitmsg=msg.split(",")
val traffic_type=splitmsg(3)
val temp=splitmsg(0).split(" ")
val date_time=temp.dropRight(2).mkString(" ")
val domain_name=temp(temp.size-2)
val future_use1=temp(temp.size-1)
val device_name=domain_name.split("\\.")(0)
var result=new ListBuffer[String]()
//result+=temp2
result+=date_time
result+=domain_name
result+=future_use1
result=result++splitmsg.slice(1,splitmsg.size).toList
(device_name,traffic_type,result.toList)
}
}
}
package configurations
import org.apache.spark.sql.types.{StringType, StructType, TimestampType, DateType}
object formatPA {
val threat_9_0=new StructType()
.add("date_time",StringType)
.add("log_source",StringType)
.add("future_use1",StringType)
.add("received_time",StringType)
.add("serial_number",StringType)
.add("traffic_type",StringType)
.add("threat_content_type",StringType)
.add("future_use2",StringType)
.add("generated_time",StringType)
.add("src_ip",StringType)
.add("dst_ip",StringType)
.add("src_nat",StringType)
.add("dst_nat",StringType)
.add("rule_name",StringType)
.add("src_user",StringType)
.add("dst_user",StringType)
.add("app",StringType)
.add("vsys",StringType)
.add("src_zone",StringType)
.add("dst_zone",StringType)
.add("igr_int",StringType)
.add("egr_int",StringType)
.add("log_fw_profile",StringType)
.add("future_use3",StringType)
.add("session_id",StringType)
.add("repeat_count",StringType)
.add("src_port",StringType)
.add("dst_port",StringType)
.add("src_nat_port",StringType)
.add("dst_nat_port",StringType)
.add("flags",StringType)
.add("protocol",StringType)
.add("action",StringType)
.add("miscellaneous",StringType)
.add("threat_id",StringType)
.add("category",StringType)
.add("severity",StringType)
.add("direction",StringType)
.add("seq_num",StringType)
.add("act_flag",StringType)
.add("src_geo_location",StringType)
.add("dst_geo_location",StringType)
.add("future_use4",StringType)
.add("content_type",StringType)
.add("pcap_id",StringType)
.add("file_digest",StringType)
.add("apt_cloud",StringType)
.add("url_index",StringType)
.add("user_agent",StringType)
.add("file_type",StringType)
.add("x_forwarded_for",StringType)
.add("referer",StringType)
.add("sender",StringType)
.add("subject",StringType)
.add("recipient",StringType)
.add("report_id",StringType)
.add("dghl1",StringType)
.add("dghl2",StringType)
.add("dghl3",StringType)
.add("dghl4",StringType)
.add("vsys_name",StringType)
.add("device_name",StringType)
.add("future_use5",StringType)
.add("src_vm_uuid",StringType)
.add("dst_vm_uuid",StringType)
.add("http_method",StringType)
.add("tunnel_id_imsi",StringType)
.add("monitor_tag_imei",StringType)
.add("parent_session_id",StringType)
.add("parent_start_time",StringType)
.add("tunnel_type",StringType)
.add("threat_category",StringType)
.add("content_version",StringType)
.add("future_use6",StringType)
.add("sctp_association_id",StringType)
.add("payload_protocol_id",StringType)
.add("http_headers",StringType)
.add("url_category_list",StringType)
.add("uuid_for_rule",StringType)
.add("http_2_connection",StringType)
.add("created",TimestampType)
val traffic_9_0=new StructType()
.add("date_time",StringType)
.add("log_source",StringType)
.add("future_use1",StringType)
.add("received_time",StringType)
.add("serial_number",StringType)
.add("traffic_type",StringType)
.add("threat_content_type",StringType)
.add("future_use2",StringType)
.add("generated_time",StringType)
.add("src_ip",StringType)
.add("dst_ip",StringType)
.add("src_nat",StringType)
.add("dst_nat",StringType)
.add("rule_name",StringType)
.add("src_user",StringType)
.add("dst_user",StringType)
.add("app",StringType)
.add("vsys",StringType)
.add("src_zone",StringType)
.add("dst_zone",StringType)
.add("igr_int",StringType)
.add("egr_int",StringType)
.add("log_fw_profile",StringType)
.add("future_use3",StringType)
.add("session_id",StringType)
.add("repeat_count",StringType)
.add("src_port",StringType)
.add("dst_port",StringType)
.add("src_nat_port",StringType)
.add("dst_nat_port",StringType)
.add("flags",StringType)
.add("protocol",StringType)
.add("action",StringType)
.add("bytes",StringType)
.add("bytes_sent",StringType)
.add("bytes_received",StringType)
.add("packets",StringType)
.add("start_time",StringType)
.add("end_time",StringType)
.add("category",StringType)
.add("future_use4",StringType)
.add("seq_num",StringType)
.add("act_flag",StringType)
.add("src_geo_location",StringType)
.add("dst_geo_location",StringType)
.add("future_use5",StringType)
.add("packet_sent",StringType)
.add("packet_received",StringType)
.add("session_end_reason",StringType)
.add("dghl1",StringType)
.add("dghl2",StringType)
.add("dghl3",StringType)
.add("dghl4",StringType)
.add("vsys_name",StringType)
.add("device_name",StringType)
.add("action_source",StringType)
.add("src_vm_uuid",StringType)
.add("dst_vm_uuid",StringType)
.add("tunnel_id_imsi",StringType)
.add("monitor_tag_imei",StringType)
.add("parent_session_id",StringType)
.add("parent_start_time",StringType)
.add("tunnel_type",StringType)
.add("sctp_association_id",StringType)
.add("sctp_chunks",StringType)
.add("sctp_chunks_sent",StringType)
.add("sctp_chunks_received",StringType)
.add("uuid_for_rule",StringType)
.add("http_2_connection",StringType)
.add("created",TimestampType)
}
Вывод для приведенного выше кода выглядит следующим образом:
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
|date_time|log_source|future_use1|received_time|serial_number|traffic_type|threat_content_type|future_use2|generated_time|src_ip|dst_ip|src_nat|dst_nat|rule_name|src_user|dst_user|app|vsys|src_zone|dst_zone|igr_int|egr_int|log_fw_profile|future_use3|session_id|repeat_count|src_port|dst_port|src_nat_port|dst_nat_port|flags|protocol|action|miscellaneous|threat_id|category|severity|direction|seq_num|act_flag|src_geo_location|dst_geo_location|future_use4|content_type|pcap_id|file_digest|apt_cloud|url_index|user_agent|file_type|x_forwarded_for|referer|sender|subject|recipient|report_id|dghl1|dghl2|dghl3|dghl4|vsys_name|device_name|future_use5|src_vm_uuid|dst_vm_uuid|http_method|tunnel_id_imsi|monitor_tag_imei|parent_session_id|parent_start_time|tunnel_type|threat_category|content_version|future_use6|sctp_association_id|payload_protocol_id|http_headers|url_category_list|uuid_for_rule|http_2_connection|created|
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
threat_9_0_DF saved
20/01/08 14:59:49 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 69
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 69, created), TimestampType), true, false) AS created#773
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 69
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191)
at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_34$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
... 25 more