Как сравнить определенную часть одного (IP-адрес) с другим IP-адресом в другом столбце в pyspark RDD python без использования collect () и цикла for - PullRequest
0 голосов
/ 08 июля 2019

У меня есть два списка IP-адресов, которые расположены в отдельных текстовых файлах. Я хочу сравнить эти два набора данных, взяв первые три байта из них.

Например:

a='123.43.54.231'
b='123.43.54.50'

, так как первые три байта являются взаимными между a и b, тогда я хочу получить полный a (123.43.54.231).

, поскольку я имею дело с СДР, следует избегать collect(), поскольку это большой набор данных. На самом деле, я написал правильный код, который делает то, что я хочу. Однако то, что я сделал, содержало collect(), что делает процесс невероятно медленным.

Python_3.7.3

from pyspark import SparkContext, SparkConf

  if __name__ == "__main__":
  conf = SparkConf().setAppName("Big_Data_Project").setMaster("local[*]")
  sc = SparkContext(conf = conf)

  Ip_1= sc.textFile("Ip_1.txt")

#Ip_1='''123.34.405.123 153.74.61.65 43.34.65.123 ...... '''
#Ip_2='''123.34.321.143 153.74.61.43 43.34.65.112 ...... '''

  Ip_2= sc.textFile("Ip_2.txt")

  y=[]
  def func():

      for i in Ip_1.collect():
          for x in Ip_2.collect():
              d=i[:i.rfind(".")]
              h=x[:x.rfind(".")]
              if d==h:
                  y.append(i)
              else:
                  pass
      return y
  Wanted_Ip=sc.parallelize(func())
  Wanted_Ip.repartition(1).saveAsTextFile("My Ip List")

как я объяснил, я хочу получить full ip_adress Ip_1, которые совпадают с первыми тремя байтами RDD Ip_2, которые

153.74.61.65
43.34.65.123

Я ищу решение, которое не включает collect().

1 Ответ

0 голосов
/ 08 июля 2019

Вам просто нужно сгенерировать ключ для присоединения, а затем выполнить соединение:

gen_key = lambda x : (x.rsplit('.', 1)[0], x)
Ip_1 = Ip_1.map(gen_key)
Ip_2 = Ip_2.map(gen_key)
common_ip = Ip_1.join(Ip_2)

common_ip - это rdd, где каждая строка представляет собой пару (ключ, значение) с:

  • ключ = 3-значный IP
  • значение = пара IP-адресов от Ip_1 и Ip_2
common_ip.collect()                                                                                                                                                                                                                           

[('123.43.54', ('123.43.54.231', '123.43.54.50'))]

Если вам просто нужны IP-адреса Ip_1, вы можете сделать:

common_ip.map(lambda x : x[1][0]).repartition(1).saveAsTextFile("My Ip List")
...