Как добавить столбец current_timestamp () в потоковый фрейм данных? - PullRequest
0 голосов
/ 07 января 2020

Я использую Spark 2.4.3 и Scala.

Я получаю сообщения из источника потоковой кафки следующей структуры:

{"message": "Jan 7 17:53:48 PA-850.abc.com 1,2020/01/07 17:53:41,001801012404,TRAFFIC,drop,2304,2020/01/07 17:53:41,10.7.26.51,10.8.3.11,0.0.0.0,0.0.0.0,interzone-default,,,not-applicable,vsys1,SERVER-VLAN,VPN,ethernet1/6.18,,test-1,2020/01/07 17:53:41,0,1,45194,514,0,0,0x0,udp,deny,588,588,0,1,2020/01/07 17:53:45,0,any,0,35067255521,0x8000000000000000,10.0.0.0-10.255.255.255,10.0.0.0-10.255.255.255,0,1,0,policy-deny,0,0,0,0,,PA-850,from-policy,,,0,,0,,N/A,0,0,0,0,b804eab2-f240-467a-be97-6f8c382afd4c,0","source_host": "10.7.26.51"}

Моя цель состоит в том, чтобы добавить новый столбец timestamp в каждую строку с текущей отметкой времени в моих данных потоковой передачи. Я должен вставить все эти строки в таблицу кассандры.

package devices

import configurations._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, from_json, lower, split}
import org.apache.spark.sql.cassandra._
import scala.collection.mutable.{ListBuffer, Map}
import scala.io.Source
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType,TimestampType}
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.unix_timestamp

object PA {

  def main(args: Array[String]): Unit = {

    val spark = SparkBuilder.spark

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", configHelper.kafka_host)
      .option("subscribe", configHelper.log_topic)
      .option("startingOffsets", "earliest")
      .option("multiLine", true)
      .option("includeTimestamp",true)
      .load()

    df.printSchema()  

    def getDeviceNameOSMapping():Map[String,String]= {
      val osmapping=scala.collection.mutable.Map[String, String]()
      val bufferedSource = Source.fromFile(configHelper.os_mapping_file)
      for (line <- bufferedSource.getLines) {
      val cols = line.split(",").map(_.trim)
      osmapping+=(cols(0).toLowerCase()->cols(1).toLowerCase())
    }

      bufferedSource.close
      return  osmapping
    }

    val deviceOSMapping = spark.sparkContext.broadcast(getDeviceNameOSMapping())

    val debug = true

    val msg = df.selectExpr("CAST(value AS STRING)")
      .withColumn("value", lower(col("value")))
      .select(from_json(col("value"), cefFormat.cef_json).as("data"))
      .select("data.*")

    import spark.sqlContext.implicits._  

    val newDF = msg.withColumn("created", lit(current_timestamp()))

  msg.writeStream
      .foreachBatch { (batchDF, _) =>

        val syslogDF=batchDF.filter(!$"message".contains("invalid syslog message:"))
                            .filter(!$"message".contains("fluentd worker is now stopping worker=0"))
                            .filter(!$"message".contains("fluentd worker is now running worker=0"))

        val syslogRDD=syslogDF.rdd.map(r=>{
          r.getString(0)
        }).map(x=>{  
          parseSysLog(x)
            })
            .filter(x=>deviceOSMapping.value.contains(x._1))

try {
          val threat_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("threat"))
              .map(x => Row.fromSeq(x._3)),formatPA.threat_9_0)



              if(debug)
              threat_9_0_DF.show(true)

            threat_9_0_DF.write
            .cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
            .mode("append")
            .save


            println("threat_9_0_DF saved")
           }
        catch {
              case e:Exception=>{
              println(e.getMessage)
              }
            }
try {
              val traffic_9_0_DF = spark.sqlContext.createDataFrame(syslogRDD.filter(x => deviceOSMapping.value(x._1).equals("9.0") & x._2.equals("traffic"))
                .map(x => Row.fromSeq(x._3)),formatPA.traffic_9_0)

if(debug)
traffic_9_0_DF.show(true)  

traffic_9_0_DF.write
                .cassandraFormat(configHelper.cassandra_table_syslog, configHelper.cassandra_keyspace)
                .mode("append")
                .save
              println("traffic_9_0_DF saved")
           }
        catch {
              case e:Exception=>{
              println(e.getMessage)
              }
            }

        }.start().awaitTermination()

      def parseSysLog(msg: String): (String,String,List[String]) = {

        //println("PRINTING MESSAGES")
        //println(msg)
        val splitmsg=msg.split(",")
        val traffic_type=splitmsg(3)
        val temp=splitmsg(0).split(" ")
        val date_time=temp.dropRight(2).mkString(" ")
        val domain_name=temp(temp.size-2)
        val future_use1=temp(temp.size-1)
        val device_name=domain_name.split("\\.")(0)
        var result=new ListBuffer[String]()
          //result+=temp2
          result+=date_time
          result+=domain_name
          result+=future_use1
          result=result++splitmsg.slice(1,splitmsg.size).toList
                        (device_name,traffic_type,result.toList)
      }
  }
}

    package configurations

    import org.apache.spark.sql.types.{StringType, StructType, TimestampType, DateType}

    object formatPA {

    val threat_9_0=new StructType()
            .add("date_time",StringType)
            .add("log_source",StringType)
            .add("future_use1",StringType)
            .add("received_time",StringType)
            .add("serial_number",StringType)
            .add("traffic_type",StringType)
            .add("threat_content_type",StringType)
            .add("future_use2",StringType)
            .add("generated_time",StringType)
            .add("src_ip",StringType)
            .add("dst_ip",StringType)
            .add("src_nat",StringType)
            .add("dst_nat",StringType)
            .add("rule_name",StringType)
            .add("src_user",StringType)
            .add("dst_user",StringType)
            .add("app",StringType)
            .add("vsys",StringType)
            .add("src_zone",StringType)
            .add("dst_zone",StringType)
            .add("igr_int",StringType)
            .add("egr_int",StringType)
            .add("log_fw_profile",StringType)
            .add("future_use3",StringType)
            .add("session_id",StringType)
            .add("repeat_count",StringType)
            .add("src_port",StringType)
            .add("dst_port",StringType)
            .add("src_nat_port",StringType)
            .add("dst_nat_port",StringType)
            .add("flags",StringType)
            .add("protocol",StringType)
            .add("action",StringType)
            .add("miscellaneous",StringType)
            .add("threat_id",StringType)
            .add("category",StringType)
            .add("severity",StringType)
            .add("direction",StringType)
            .add("seq_num",StringType)
            .add("act_flag",StringType)
            .add("src_geo_location",StringType)
            .add("dst_geo_location",StringType)
            .add("future_use4",StringType)
            .add("content_type",StringType)
            .add("pcap_id",StringType)
            .add("file_digest",StringType)
            .add("apt_cloud",StringType)
            .add("url_index",StringType)
            .add("user_agent",StringType)
            .add("file_type",StringType)
            .add("x_forwarded_for",StringType)
            .add("referer",StringType)
            .add("sender",StringType)
            .add("subject",StringType)
            .add("recipient",StringType)
            .add("report_id",StringType)
            .add("dghl1",StringType)
            .add("dghl2",StringType)
            .add("dghl3",StringType)
            .add("dghl4",StringType)
            .add("vsys_name",StringType)
            .add("device_name",StringType)
            .add("future_use5",StringType)
            .add("src_vm_uuid",StringType)
            .add("dst_vm_uuid",StringType)
            .add("http_method",StringType)
            .add("tunnel_id_imsi",StringType)
            .add("monitor_tag_imei",StringType)
            .add("parent_session_id",StringType)
            .add("parent_start_time",StringType)
            .add("tunnel_type",StringType)
            .add("threat_category",StringType)
            .add("content_version",StringType)
            .add("future_use6",StringType)
            .add("sctp_association_id",StringType)
            .add("payload_protocol_id",StringType)
            .add("http_headers",StringType)
            .add("url_category_list",StringType)
            .add("uuid_for_rule",StringType)
            .add("http_2_connection",StringType)
            .add("created",TimestampType)

        val traffic_9_0=new StructType()
            .add("date_time",StringType)
            .add("log_source",StringType)
            .add("future_use1",StringType)
            .add("received_time",StringType)
            .add("serial_number",StringType)
            .add("traffic_type",StringType)
            .add("threat_content_type",StringType)
            .add("future_use2",StringType)
            .add("generated_time",StringType)
            .add("src_ip",StringType)
            .add("dst_ip",StringType)
            .add("src_nat",StringType)
            .add("dst_nat",StringType)
            .add("rule_name",StringType)
            .add("src_user",StringType)
            .add("dst_user",StringType)
            .add("app",StringType)
            .add("vsys",StringType)
            .add("src_zone",StringType)
            .add("dst_zone",StringType)
            .add("igr_int",StringType)
            .add("egr_int",StringType)
            .add("log_fw_profile",StringType)
            .add("future_use3",StringType)
            .add("session_id",StringType)
            .add("repeat_count",StringType)
            .add("src_port",StringType)
            .add("dst_port",StringType)
            .add("src_nat_port",StringType)
            .add("dst_nat_port",StringType)
            .add("flags",StringType)
            .add("protocol",StringType)
            .add("action",StringType)
            .add("bytes",StringType)
            .add("bytes_sent",StringType)
            .add("bytes_received",StringType)
            .add("packets",StringType)
            .add("start_time",StringType)
            .add("end_time",StringType)
            .add("category",StringType)
            .add("future_use4",StringType)
            .add("seq_num",StringType)
            .add("act_flag",StringType)
            .add("src_geo_location",StringType)
            .add("dst_geo_location",StringType)
            .add("future_use5",StringType)
            .add("packet_sent",StringType)
            .add("packet_received",StringType)
            .add("session_end_reason",StringType)
            .add("dghl1",StringType)
            .add("dghl2",StringType)
            .add("dghl3",StringType)
            .add("dghl4",StringType)
            .add("vsys_name",StringType)
            .add("device_name",StringType)
            .add("action_source",StringType)
            .add("src_vm_uuid",StringType)
            .add("dst_vm_uuid",StringType)
            .add("tunnel_id_imsi",StringType)
            .add("monitor_tag_imei",StringType)
            .add("parent_session_id",StringType)
            .add("parent_start_time",StringType)
            .add("tunnel_type",StringType)
            .add("sctp_association_id",StringType)
            .add("sctp_chunks",StringType)
            .add("sctp_chunks_sent",StringType)
            .add("sctp_chunks_received",StringType)
            .add("uuid_for_rule",StringType)
            .add("http_2_connection",StringType)
            .add("created",TimestampType)    
}

Вывод для приведенного выше кода выглядит следующим образом:

+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
                |date_time|log_source|future_use1|received_time|serial_number|traffic_type|threat_content_type|future_use2|generated_time|src_ip|dst_ip|src_nat|dst_nat|rule_name|src_user|dst_user|app|vsys|src_zone|dst_zone|igr_int|egr_int|log_fw_profile|future_use3|session_id|repeat_count|src_port|dst_port|src_nat_port|dst_nat_port|flags|protocol|action|miscellaneous|threat_id|category|severity|direction|seq_num|act_flag|src_geo_location|dst_geo_location|future_use4|content_type|pcap_id|file_digest|apt_cloud|url_index|user_agent|file_type|x_forwarded_for|referer|sender|subject|recipient|report_id|dghl1|dghl2|dghl3|dghl4|vsys_name|device_name|future_use5|src_vm_uuid|dst_vm_uuid|http_method|tunnel_id_imsi|monitor_tag_imei|parent_session_id|parent_start_time|tunnel_type|threat_category|content_version|future_use6|sctp_association_id|payload_protocol_id|http_headers|url_category_list|uuid_for_rule|http_2_connection|created|
+---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+
                +---------+----------+-----------+-------------+-------------+------------+-------------------+-----------+--------------+------+------+-------+-------+---------+--------+--------+---+----+--------+--------+-------+-------+--------------+-----------+----------+------------+--------+--------+------------+------------+-----+--------+------+-------------+---------+--------+--------+---------+-------+--------+----------------+----------------+-----------+------------+-------+-----------+---------+---------+----------+---------+---------------+-------+------+-------+---------+---------+-----+-----+-----+-----+---------+-----------+-----------+-----------+-----------+-----------+--------------+----------------+-----------------+-----------------+-----------+---------------+---------------+-----------+-------------------+-------------------+------------+-----------------+-------------+-----------------+-------+

                threat_9_0_DF saved
            20/01/08 14:59:49 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
                java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 69
                if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 69, created), TimestampType), true, false) AS created#773
                    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
                    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
                    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
                    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
                    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
                    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
                    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
                    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
                    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
                    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
                    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
                    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
                    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
                    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
                    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
                    at org.apache.spark.scheduler.Task.run(Task.scala:121)
                    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
                    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
                    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
                    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                    at java.lang.Thread.run(Thread.java:748)
                Caused by: java.lang.ArrayIndexOutOfBoundsException: 69
                    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
                    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191)
                    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166)
                    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_34$(Unknown Source)
                    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
                    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
                    ... 25 more

1 Ответ

0 голосов
/ 08 января 2020

Похоже, что не имеет значения, что сообщения имеют формат JSON, не так ли?


Давайте затем воспользуемся образцом набора данных любой схемы и добавим столбец отметки времени.

val messages = spark.range(3)
scala> messages.printSchema
root
 |-- id: long (nullable = false)

val withTs = messages.withColumn("timestamp", current_timestamp())
scala> withTs.printSchema
root
 |-- id: long (nullable = false)
 |-- timestamp: timestamp (nullable = false)

Это дает вам набор данных со столбцом метки времени.


Следующая строка в вашем коде тоже должна работать (вам не нужно lit).

val xDF = thDF.withColumn("created", lit(current_timestamp()))   //This does not get cast to TimestampType

Что вы подразумеваете под "Это не приведено к TimestampType"? Как вы это проверите? Возможно, вы путаете TimestampType в Искре и Кассандре? Разъем Spark для Cassandra должен справиться с этим.

Давайте попробуем:

val litTs = spark.range(3).withColumn("ts", lit(current_timestamp))
scala> litTs.printSchema
root
 |-- id: long (nullable = false)
 |-- ts: timestamp (nullable = false)

import org.apache.spark.sql.types._
val dataType = litTs.schema("ts").dataType
assert(dataType.isInstanceOf[TimestampType])

scala> println(dataType)
TimestampType
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...