Pyspark объединяет фрейм данных на основе функции - PullRequest
1 голос
/ 29 мая 2020

У меня есть 2 фрейма данных, которые выглядят так

сети

+----------------+-------+
|    Network     | VLAN  |
+----------------+-------+
| 192.168.1.0/24 | VLAN1 |
| 192.168.2.0/24 | VLAN2 |
+----------------+-------+

потоки

+--------------+----------------+
|  source_ip   | destination_ip |
+--------------+----------------+
| 192.168.1.11 | 192.168.2.13   |
+--------------+----------------+

В идеале я хотел бы получить что-то вроде этого

+--------------+----------------+-------------+------------------+
|  source_ip   | destination_ip | source_vlan | destination_vlan |
+--------------+----------------+-------------+------------------+
| 192.168.1.11 | 192.168.2.13   | VLAN1       | VLAN2            |
+--------------+----------------+-------------+------------------+

К сожалению, фрейм данных потоков не содержит маски подсети. То, что я пробовал до сих пор без pyspark

  1. Получить отдельный список подсетей (в этом примере [24])
  2. Для каждого su bnet и source_ip вычислить сеть ipaddress.ip_network('{}/{}'.format(ip,sub), strict=False)
  3. Найдите, существует ли этот su bnet в «сетях» фрейма данных, и верните VLAN, в противном случае верните пустую строку

Я попытался сделать аналогичный подход с pyspark, но он не работает а также, как я думаю, могут быть лучшие способы сделать это?

def get_available_subnets(df):
    split_col = split(df['network'], '/')
    df = df.withColumn('sub', split_col.getItem(1))
    return df.select('sub').distinct()

def get_vlan_by_ip(ip, infoblox, subnets):
    for sub in subnets:
        net = ipaddress.ip_network('{}/{}'.format(ip,sub), strict=False)
        if net:
            search = infoblox.filter(infoblox.network == str(net))

            if not search.head(1).isEmpty():
                return search.first.vlan
    return hashlib.sha1(str.encode(ip)).hexdigest()

subnets = get_available_subnets(infoblox_networks_df).select('sub').rdd.flatMap(lambda x: x).collect()


short = flows_filtered_prepared_df.limit(1000)


partial_vlan_func = partial(get_vlan_by_ip, infoblox=infoblox_networks_df, subnets=subnets)
get_vlan_udf = udf(lambda ip: partial_vlan_func(ip), StringType())

short.select('source_ip', 'destination_ip', get_vlan_udf('source_ip').alias('source_vlan')).show()

1 Ответ

2 голосов
/ 29 мая 2020

Этот метод полностью исключает использование udf, используя split и slice, но, возможно, есть способ лучше. Преимущество этого подхода заключается в том, что он напрямую использует биты, присутствующие в маске su bnet, и что он записан исключительно в PySpark.

Контекст решения: IP-адреса могут быть разделены и замаскированы su bnet. Это означает, что 8, 16, 24, 32 сообщает вам, какие части IP имеют значение - это мотивирует деление на 8 и использование полученного столбца для разделения столбца IP-адреса ArrayType после его отделения от исходного StringType.

NB: pyspark.sql.functions.slice будет работать в более новой версии PySpark >= 2.4, некоторые старые должны использовать f.expr("slice(...)").

Настройка:

flows = spark.createDataFrame([
    (1, "192.168.1.1", "192.168.2.1"),
    (2, "192.168.2.1", "192.168.3.1"), 
    (3, "192.168.3.1", "192.168.1.1"), 
], ['id', 'source_ip', 'destination_ip'] 
)
networks = spark.createDataFrame([
    (1, "192.168.1.0/24", "VLAN1"),
    (2, "192.168.2.0/24", "VLAN2"), 
    (3, "192.168.3.0/24", "VLAN3"), 
], ['id', 'network', 'vlan'] 
)

Некоторые предварительная обработка:

networks_split = networks.select(
    "*",
    (f.split(f.col("network"), "/")[1] / 8).cast("int").alias("bits"),
    f.split(f.split(f.col("network"), "/")[0], "\.").alias('segmented_ip')
)
networks_split.show()
+---+--------------+-----+----+----------------+
| id|       network| vlan|bits|    segmented_ip|
+---+--------------+-----+----+----------------+
|  1|192.168.1.0/24|VLAN1|   3|[192, 168, 1, 0]|
|  2|192.168.2.0/24|VLAN2|   3|[192, 168, 2, 0]|
|  3|192.168.3.0/24|VLAN3|   3|[192, 168, 3, 0]|
+---+--------------+-----+----+----------------+

networks_masked = networks_split.select(
    "*",
    f.expr("slice(segmented_ip, 1, bits)").alias("masked_bits"),
)
networks_masked.show()
+---+--------------+-----+----+----------------+-------------+
| id|       network| vlan|bits|    segmented_ip|  masked_bits|
+---+--------------+-----+----+----------------+-------------+
|  1|192.168.1.0/24|VLAN1|   3|[192, 168, 1, 0]|[192, 168, 1]|
|  2|192.168.2.0/24|VLAN2|   3|[192, 168, 2, 0]|[192, 168, 2]|
|  3|192.168.3.0/24|VLAN3|   3|[192, 168, 3, 0]|[192, 168, 3]|
+---+--------------+-----+----+----------------+-------------+

flows_split = flows.select(
    "*",
    f.split(f.split(f.col("source_ip"), "/")[0], "\.").alias('segmented_source_ip'),
    f.split(f.split(f.col("destination_ip"), "/")[0], "\.").alias('segmented_destination_ip')
)
flows_split.show()
+---+-----------+--------------+-------------------+------------------------+
| id|  source_ip|destination_ip|segmented_source_ip|segmented_destination_ip|
+---+-----------+--------------+-------------------+------------------------+
|  1|192.168.1.1|   192.168.2.1|   [192, 168, 1, 1]|        [192, 168, 2, 1]|
|  2|192.168.2.1|   192.168.3.1|   [192, 168, 2, 1]|        [192, 168, 3, 1]|
|  3|192.168.3.1|   192.168.1.1|   [192, 168, 3, 1]|        [192, 168, 1, 1]|
+---+-----------+--------------+-------------------+------------------------+

Наконец, I crossJoin и фильтрация на срезе на основе bits моей маски, например:

flows_split.crossJoin(
    networks_masked.select("vlan", "bits", "masked_bits")
).where(
    f.expr("slice(segmented_source_ip, 1, bits)") == f.col("masked_bits")
).show()
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
| id|  source_ip|destination_ip|segmented_source_ip|segmented_destination_ip| vlan|bits|  masked_bits|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
|  1|192.168.1.1|   192.168.2.1|   [192, 168, 1, 1]|        [192, 168, 2, 1]|VLAN1|   3|[192, 168, 1]|
|  2|192.168.2.1|   192.168.3.1|   [192, 168, 2, 1]|        [192, 168, 3, 1]|VLAN2|   3|[192, 168, 2]|
|  3|192.168.3.1|   192.168.1.1|   [192, 168, 3, 1]|        [192, 168, 1, 1]|VLAN3|   3|[192, 168, 3]|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+

Точно такой же подход может быть выполнено для destination_ip, например:

flows_split.crossJoin(
    networks_masked.select("vlan", "bits", "masked_bits")
).where(
    f.expr("slice(segmented_destination_ip, 1, bits)") == f.col("masked_bits")
).show()
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
| id|  source_ip|destination_ip|segmented_source_ip|segmented_destination_ip| vlan|bits|  masked_bits|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+
|  1|192.168.1.1|   192.168.2.1|   [192, 168, 1, 1]|        [192, 168, 2, 1]|VLAN2|   3|[192, 168, 2]|
|  2|192.168.2.1|   192.168.3.1|   [192, 168, 2, 1]|        [192, 168, 3, 1]|VLAN3|   3|[192, 168, 3]|
|  3|192.168.3.1|   192.168.1.1|   [192, 168, 3, 1]|        [192, 168, 1, 1]|VLAN1|   3|[192, 168, 1]|
+---+-----------+--------------+-------------------+------------------------+-----+----+-------------+

Наконец, вы либо объединяете две результирующие таблицы вместе на source_ip и destination_ip (поскольку у вас есть информация vlan, прикрепленная по мере необходимости ), или вы объедините два предыдущих шага вместе и crossJoin и filter дважды.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...