Установить сходство объединить с помощью Spark - PullRequest
0 голосов
/ 10 октября 2018

У меня есть два текстовых файла, каждая строка в форме (id, последовательность чисел).У меня также есть пороговое значение.

Файл 1 выглядит следующим образом, где в первой строке 0 - это идентификатор, а остальные - это последовательность чисел.

0 1 4 5 6
1 2 3 6
2 4 5 6

Аналогичнофайл 2 со следующим содержанием.

0 1 4 6
1 2 5 6
2 3 5

Я должен найти те строки, у которых значение сходства больше или равно пороговому значению.Значение сходства можно рассчитать как пересечение двух линий, деленное на объединение двух линий.Например, строка id-0 файла file1 имеет seq 1,4,5,6, а строка id-0 файла file2 имеет seq 1,4,6.У них есть размер пересечения = 3 и размер объединения = 4. Поэтому их сходство будет 3/4 = 0,75, что превышает пороговое значение.

Я написал код Python для этой задачи и пытаюсь преобразовать его вScala.

    with open("file1.txt") as f1:
        content1 = f1.readlines()
    content1 = [x.strip() for x in content1]

    with open("file2.txt") as f2:
        content2 = f2.readlines()
    content2 = [x.strip() for x in content2]

    threshold = 0.5
    final_index_list_with_similarity = []

    for i in range(len(content1)):
        for j in range(len(content2)):
            index_content1 = content1[i][0]
            index_content2 = content2[j][0]
            s = set(content1[i][1:])
            t = set(content2[j][1:])
            intersect = s & t
            intersect_size = len(intersect) - 1
            union_size = len(s) + len(t) - intersect_size - 2 #substracting two because I am getting two extra space.
            similarity = intersect_size/union_size
            if similarity >= threshold:
                final_index_list_with_similarity.append((index_content1, index_content2, similarity))
    print(final_index_list_with_similarity)

Вывод: [('0', '0', 0.75), ('1', '1', 0.5), ('2', '0', 0.5), ('2', '1', 0.5)]

То, что я до сих пор пробовал в scala, выглядит примерно так:

val inputFile1 = args(0)
val inputFile2 = args(1)
val threshold = args(2).toDouble
val ouputFolder = args(3)

val conf = new SparkConf().setAppName("SetSimJoin").setMaster("local")
val sc = new SparkContext(conf)

val lines1 = sc.textFile(inputFile1).flatMap(line => line.split("\n"))
val lines2 = sc.textFile(inputFile2).flatMap(line => line.split("\n"))

val elements1 = lines1.map { x => x.drop(x.split(" ")(0).length.toInt + 1) }.flatMap { x => x.split(" ") }.map { x => (x, 1) }.reduceByKey(_+_)
val elements2 = lines2.map { x => x.drop(x.split(" ")(0).length.toInt + 1) }.flatMap { x => x.split(" ") }.map { x => (x, 1) }.reduceByKey(_+_)

Это дает мне частотукаждого числа во всем файле.

Любая помощь или руководство будет высоко ценится.

1 Ответ

0 голосов
/ 10 октября 2018

Оба файла можно объединить как RDD, а затем применить формулу: «размер пересечения / размер объединения»:

val lines1 = sparkContext.textFile("inputFile1")
val lines2 = sparkContext.textFile("inputFile2")

val rdd1 = lines1.map(_.split(" ")).map(v => (v(0), v.tail))
val rdd2 = lines2.map(_.split(" ")).map(v => (v(0), v.tail))

val result = rdd1.join(rdd2).map(r => (
  r._1,
  r._2._1.intersect(r._2._2).size * 1.0 /
    r._2._1.union(r._2._2).distinct.size
)
)

result.foreach(println)

Вывод:

(1,0.5)
(0,0.75)
(2,0.25)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...