С помощью Pyspark я хотел бы присоединиться / объединить, если IP-адрес в кадре данных A находится в диапазоне IP-сети или достигает того же IP-адреса в кадре данных B.
Фрейм данных A содержит только IP-адреса, а другой имеет IP-адреса или IP-адреса с CIDR. Вот пример.
Dataframe A
+---------------+
| ip_address|
+---------------+
| 192.0.2.2|
| 164.42.155.5|
| 52.95.245.0|
| 66.42.224.235|
| ...|
+---------------+
Dataframe B
+---------------+
| ip_address|
+---------------+
| 123.122.213.34|
| 41.32.241.2|
| 66.42.224.235|
| 192.0.2.0/23|
| ...|
+---------------+
тогда ожидаемый результат будет примерно ниже
+---------------+--------+
| ip_address| is_in_b|
+---------------+--------+
| 192.0.2.2| true| -> This is in the same network range as 192.0.2.0/23
| 164.42.155.5| false|
| 52.95.245.0| false|
| 66.42.224.235| true| -> This is in B
| ...| ...|
+---------------+--------+
Идея, которую я сначала хотел попробовать - использовать udf, сравнивающий один за другим, и проверять диапазон IP-адресов при появлении CIDR, но кажется, что udf не поддерживает несколько фреймов данных. Я также попытался преобразовать DF B в список, а затем сравнить. Однако это очень неэффективно и занимает много времени, так как номер строки A * номер строки B превышает 100 миллионов. Есть ли эффективное решение?
Отредактировано:
Для более подробной информации я использовал следующий код для проверки без pyspark и использования любой библиотеки.
def cidr_to_netmask(c):
cidr = int(c)
mask = (0xffffffff >> (32 - cidr)) << (32 - cidr)
return (str((0xff000000 & mask) >> 24) + '.' + str((0x00ff0000 & mask) >> 16) + '.' + str((0x0000ff00 & mask) >> 8) + '.' + str((0x000000ff & mask)))
def ip_to_numeric(ip):
ip_num = 0
for i, octet in enumerate(ip.split('.')):
ip_num += int(octet) << (24 - (8 * i))
return ip_num
def is_in_ip_network(ip, network_addr):
if len(network_addr.split('/')) < 2:
return ip == network_addr.split('/')[0]
else:
network_ip, cidr = network_addr.split('/')
subnet = cidr_to_netmask(cidr)
return (ip_to_numeric(ip) & ip_to_numeric(subnet)) == (ip_to_numeric(network_ip) & ip_to_numeric(subnet))