Добавьте счетчик временного окна к apache источнику данных искры в scala - PullRequest
0 голосов
/ 30 января 2020

У меня есть следующий набор данных в файле CSV tshark.csv в каталоге ресурсов. Он содержит некоторую информацию о пакете TCP, которую я перехватил с помощью tShark

+-----------+----------+--------+-------------------+
|        src|       dst|protocol|               time|
+-----------+----------+--------+-------------------+
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|
|128.82.18.7|10.128.0.3|  ip:ssl|2020-01-29 04:01:49|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:30:20|
+-----------+----------+--------+-------------------+

Я подготовил набор данных с отметкой времени Unix, как показано ниже

  val schema = StructType(
    StructField("src", StringType, nullable = true) ::
      StructField("dst", StringType, nullable = true) ::
      StructField("protocol", StringType, nullable = true) ::
      StructField("time", TimestampType, nullable = true) ::
      Nil
  )

  // read tshark.csv file to DataFrame
  val df = spark.read.format("csv")
    .option("header", value = true)
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .option("timestampFormat", "MMM dd yyyy HH:mm:ss")
    .schema(schema)
    .load(getClass.getResource("/tshark.csv").getPath)
    .cache()

  val dft = df.withColumn("uxt", unix_timestamp($"time"))
  fDf.show()

Это добавит отметку времени Unix поле в dataframe

+-----------+----------+--------+-------------------+----------+
|        src|       dst|protocol|               time| timestamp|
+-----------+----------+--------+-------------------+----------+
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|1580288509|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|1580288509|
|128.82.18.7|10.128.0.3|  ip:ssl|2020-01-29 04:01:49|1580288509|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:30:20|1580290220|
+-----------+----------+--------+-------------------+----------+

Я хочу подсчитывать отсутствие пакетов, полученных от каждого устройства через каждые 10 минут, и добавляет это значение в каждую строку в наборе данных. Например, я хочу получить вывод, как показано ниже. Здесь с устройства 128.82.18.5 он проверяет 4 пакетов за последние 10 минут. Итак, я добавил 4 в строку отсчета. С устройства 128.82.18.6 оно проверяет 3 пакетов за последние 10 минут и т.д. c. С устройства 128.82.18.7 он проверяет 6 пакетов за последние 10 минут и т.д. c.

+-----------+----------+--------+-------------------+----------+---------+
|        src|       dst|protocol|               time| timestamp|  count  |
+-----------+----------+--------+-------------------+----------+---------+
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|1580288509|    6    |
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|1580288509|    6    |
|128.82.18.7|10.128.0.3|  ip:ssl|2020-01-29 04:01:49|1580288509|    6    |
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|    6    |
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|    6    |
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    6    |
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|    3    |
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    3    |
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    3    |
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4    |
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4    |
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4    |
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4    |
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    2    |
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    2    |
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:30:20|1580290220|    1    |
+-----------+----------+--------+-------------------+----------+---------+

Я пытался сделать это с помощью функции org.apache.spark.sql.expressions.Window с rangeBetween(-600, 0). -600 означает 10 минут в unix отметке времени. Но это не сработало.

  val w = Window
    .partitionBy("src")
    .orderBy("timestamp")
    .rangeBetween(-600, 0)
  val dfc = dft.withColumn("count", count("src").over(w))
  dfc.show()

Ниже приведен вывод, который не является ожидаемым.

+-----------+----------+--------+-------------------+----------+-----+
|        src|       dst|protocol|               time| timestamp|count|
+-----------+----------+--------+-------------------+----------+-----+
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    2|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    2|
|128.82.18.8|10.128.0.3|  ip:tcp|2020-01-29 04:30:20|1580290220|    1|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4|
|128.82.18.5|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    4|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|1580288509|    3|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:01:49|1580288509|    3|
|128.82.18.7|10.128.0.3|  ip:ssl|2020-01-29 04:01:49|1580288509|    3|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|    5|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|    5|
|128.82.18.7|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    6|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:09|1580288889|    1|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    3|
|128.82.18.6|10.128.0.3|  ip:tcp|2020-01-29 04:08:14|1580288894|    3|
+-----------+----------+--------+-------------------+----------+-----+
...