У меня есть следующий набор данных в файле CSV tshark.csv
в каталоге ресурсов. Он содержит некоторую информацию о пакете TCP, которую я перехватил с помощью tShark
+-----------+----------+--------+-------------------+
| src| dst|protocol| time|
+-----------+----------+--------+-------------------+
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|
|128.82.18.7|10.128.0.3| ip:ssl|2020-01-29 04:01:49|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:09|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:30:20|
+-----------+----------+--------+-------------------+
Я подготовил набор данных с отметкой времени Unix, как показано ниже
val schema = StructType(
StructField("src", StringType, nullable = true) ::
StructField("dst", StringType, nullable = true) ::
StructField("protocol", StringType, nullable = true) ::
StructField("time", TimestampType, nullable = true) ::
Nil
)
// read tshark.csv file to DataFrame
val df = spark.read.format("csv")
.option("header", value = true)
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.option("timestampFormat", "MMM dd yyyy HH:mm:ss")
.schema(schema)
.load(getClass.getResource("/tshark.csv").getPath)
.cache()
val dft = df.withColumn("uxt", unix_timestamp($"time"))
fDf.show()
Это добавит отметку времени Unix поле в dataframe
+-----------+----------+--------+-------------------+----------+
| src| dst|protocol| time| timestamp|
+-----------+----------+--------+-------------------+----------+
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|1580288509|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|1580288509|
|128.82.18.7|10.128.0.3| ip:ssl|2020-01-29 04:01:49|1580288509|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:30:20|1580290220|
+-----------+----------+--------+-------------------+----------+
Я хочу подсчитывать отсутствие пакетов, полученных от каждого устройства через каждые 10 минут, и добавляет это значение в каждую строку в наборе данных. Например, я хочу получить вывод, как показано ниже. Здесь с устройства 128.82.18.5
он проверяет 4
пакетов за последние 10 минут. Итак, я добавил 4 в строку отсчета. С устройства 128.82.18.6
оно проверяет 3
пакетов за последние 10 минут и т.д. c. С устройства 128.82.18.7
он проверяет 6
пакетов за последние 10 минут и т.д. c.
+-----------+----------+--------+-------------------+----------+---------+
| src| dst|protocol| time| timestamp| count |
+-----------+----------+--------+-------------------+----------+---------+
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|1580288509| 6 |
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|1580288509| 6 |
|128.82.18.7|10.128.0.3| ip:ssl|2020-01-29 04:01:49|1580288509| 6 |
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889| 6 |
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889| 6 |
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 6 |
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889| 3 |
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 3 |
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 3 |
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4 |
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4 |
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4 |
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4 |
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 2 |
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 2 |
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:30:20|1580290220| 1 |
+-----------+----------+--------+-------------------+----------+---------+
Я пытался сделать это с помощью функции org.apache.spark.sql.expressions.Window
с rangeBetween(-600, 0)
. -600
означает 10
минут в unix отметке времени. Но это не сработало.
val w = Window
.partitionBy("src")
.orderBy("timestamp")
.rangeBetween(-600, 0)
val dfc = dft.withColumn("count", count("src").over(w))
dfc.show()
Ниже приведен вывод, который не является ожидаемым.
+-----------+----------+--------+-------------------+----------+-----+
| src| dst|protocol| time| timestamp|count|
+-----------+----------+--------+-------------------+----------+-----+
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 2|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 2|
|128.82.18.8|10.128.0.3| ip:tcp|2020-01-29 04:30:20|1580290220| 1|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4|
|128.82.18.5|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 4|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|1580288509| 3|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:01:49|1580288509| 3|
|128.82.18.7|10.128.0.3| ip:ssl|2020-01-29 04:01:49|1580288509| 3|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889| 5|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889| 5|
|128.82.18.7|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 6|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:09|1580288889| 1|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 3|
|128.82.18.6|10.128.0.3| ip:tcp|2020-01-29 04:08:14|1580288894| 3|
+-----------+----------+--------+-------------------+----------+-----+