Этот метод полностью исключает использование udf
, используя split
и slice
, но, возможно, есть способ лучше. Преимущество этого подхода заключается в том, что он напрямую использует биты, присутствующие в маске su bnet, и что он записан исключительно в PySpark
.
Контекст решения: IP-адреса могут быть разделены и замаскированы su bnet. Это означает, что 8, 16, 24, 32
сообщает вам, какие части IP имеют значение - это мотивирует деление на 8 и использование полученного столбца для разделения столбца IP-адреса ArrayType
после его отделения от исходного StringType
.
NB: pyspark.sql.functions.slice
будет работать в более новой версии PySpark >= 2.4
, некоторые старые должны использовать f.expr("slice(...)")
.
Настройка:
flows = spark.createDataFrame([
(1, "192.168.1.1", "192.168.2.1"),
(2, "192.168.2.1", "192.168.3.1"),
(3, "192.168.3.1", "192.168.1.1"),
], ['id', 'source_ip', 'destination_ip']
)
networks = spark.createDataFrame([
(1, "192.168.1.0/24", "VLAN1"),
(2, "192.168.2.0/24", "VLAN2"),
(3, "192.168.3.0/24", "VLAN3"),
], ['id', 'network', 'vlan']
)
Некоторые предварительная обработка:
networks_split = networks.select(
"*",
(f.split(f.col("network"), "/")[1] / 8).cast("int").alias("bits"),
f.split(f.split(f.col("network"), "/")[0], "\.").alias('segmented_ip')
)
networks_split.show()
+---+--------------+-----+----+----------------+
| id| network| vlan|bits| segmented_ip|
+---+--------------+-----+----+----------------+
| 1|192.168.1.0/24|VLAN1| 3|[192, 168, 1, 0]|
| 2|192.168.2.0/24|VLAN2| 3|[192, 168, 2, 0]|
| 3|192.168.3.0/24|VLAN3| 3|[192, 168, 3, 0]|
+---+--------------+-----+----+----------------+
networks_masked = networks_split.select(
"*",
f.expr("slice(segmented_ip, 1, bits)").alias("masked_bits"),
)
networks_masked.show()
+---+--------------+-----+----+----------------+-------------+
| id| network| vlan|bits| segmented_ip| masked_bits|
+---+--------------+-----+----+----------------+-------------+
| 1|192.168.1.0/24|VLAN1| 3|[192, 168, 1, 0]|[192, 168, 1]|
| 2|192.168.2.0/24|VLAN2| 3|[192, 168, 2, 0]|[192, 168, 2]|
| 3|192.168.3.0/24|VLAN3| 3|[192, 168, 3, 0]|[192, 168, 3]|
+---+--------------+-----+----+----------------+-------------+
flows_split = flows.select(
"*",
f.split(f.split(f.col("source_ip"), "/")[0], "\.").alias('segmented_source_ip'),
f.split(f.split(f.col("destination_ip"), "/")[0], "\.").alias('segmented_destination_ip')
)
flows_split.show()
+---+-----------+--------------+-------------------+------------------------+
| id| source_ip|destination_ip|segmented_source_ip|segmented_destination_ip|
+---+-----------+--------------+-------------------+------------------------+
| 1|192.168.1.1| 192.168.2.1| [192, 168, 1, 1]| [192, 168, 2, 1]|
| 2|192.168.2.1| 192.168.3.1| [192, 168, 2, 1]| [192, 168, 3, 1]|
| 3|192.168.3.1| 192.168.1.1| [192, 168, 3, 1]| [192, 168, 1, 1]|
+---+-----------+--------------+-------------------+------------------------+
Наконец, I crossJoin
и фильтрация на срезе на основе bits
моей маски, например:
flows_split.crossJoin(
networks_masked.select("vlan", "bits", "masked_bits")
).where(
f.expr("slice(segmented_source_ip, 1, bits)") == f.col("masked_bits")
).show()
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
| id| source_ip|destination_ip|segmented_source_ip|segmented_destination_ip| vlan|bits| masked_bits|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
| 1|192.168.1.1| 192.168.2.1| [192, 168, 1, 1]| [192, 168, 2, 1]|VLAN1| 3|[192, 168, 1]|
| 2|192.168.2.1| 192.168.3.1| [192, 168, 2, 1]| [192, 168, 3, 1]|VLAN2| 3|[192, 168, 2]|
| 3|192.168.3.1| 192.168.1.1| [192, 168, 3, 1]| [192, 168, 1, 1]|VLAN3| 3|[192, 168, 3]|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
Точно такой же подход может быть выполнено для destination_ip
, например:
flows_split.crossJoin(
networks_masked.select("vlan", "bits", "masked_bits")
).where(
f.expr("slice(segmented_destination_ip, 1, bits)") == f.col("masked_bits")
).show()
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
| id| source_ip|destination_ip|segmented_source_ip|segmented_destination_ip| vlan|bits| masked_bits|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
| 1|192.168.1.1| 192.168.2.1| [192, 168, 1, 1]| [192, 168, 2, 1]|VLAN2| 3|[192, 168, 2]|
| 2|192.168.2.1| 192.168.3.1| [192, 168, 2, 1]| [192, 168, 3, 1]|VLAN3| 3|[192, 168, 3]|
| 3|192.168.3.1| 192.168.1.1| [192, 168, 3, 1]| [192, 168, 1, 1]|VLAN1| 3|[192, 168, 1]|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
Наконец, вы либо объединяете две результирующие таблицы вместе на source_ip
и destination_ip
(поскольку у вас есть информация vlan
, прикрепленная по мере необходимости ), или вы объедините два предыдущих шага вместе и crossJoin
и filter
дважды.
Надеюсь, это поможет!