Pyspark вложил l oop в тот же DataFrame. Как перебрать? - PullRequest
0 голосов
/ 28 марта 2020

EDIT . TL; DR: Я пытаюсь получить вложенный l oop в DataPrame pyspark. Как вы можете видеть, я хочу, чтобы вложенная l oop начиналась со строки NEXT (относительно первой l oop) в каждой итерации, чтобы сократить ненужные итерации. Используя Python, я могу использовать [row.Index + 1:] . Как правило, в простом Python я могу добиться этого с помощью следующего кода:

r=1000
joins=[]
for row in df.itertuples():
    for k in df[row.Index+1:].itertuples():
        a = np.array((row.x, row.y))
        b = np.array((k.x, k.y))
        if row.class!= k.class:
            if row.x < k.x+r:
                if np.linalg.norm(a-b) < r:
                    joins.append((row.name, k.name))
print(joins)

Как я могу добиться того же результата в Spark?

Длинный анализ проблемы : У меня есть список

people=[('john', 35, 54, 'A'), ('george', 94, 84, 'B'), ('nicolas', 7, 9, 'B'), ('tom', 86, 93, 'A'), ('jason', 62, 73, 'B'), ('bill', 15, 58, 'A'), ('william', 9, 3, 'A'), ('brad', 73, 37, 'B'), ('cosmo', 52, 67, 'B'), ('jerry', 73, 30, 'A')]

, и я преобразую его в искровой Dataframe, который я сортирую по 'y', по убыванию:

schema = StructType([
    StructField('name', StringType(), True),
    StructField('x', IntegerType(), True),
    StructField('y', IntegerType(), True),
    StructField('class', StringType(), True),
])
rdd = spark.sparkContext.parallelize(people)
df = spark.createDataFrame(rdd,schema)
df.orderBy('y',ascending=False).show()
+-------+---+---+-----+
|   name|  x|  y|class|
+-------+---+---+-----+
|    tom| 86| 93|    A|
| george| 94| 84|    B|
|  jason| 62| 73|    B|
|  cosmo| 52| 67|    B|
|   bill| 15| 58|    A|
|   john| 35| 54|    A|
|   brad| 73| 37|    B|
|  jerry| 73| 30|    A|
|nicolas|  7|  9|    B|
|william|  9|  3|    A|
+-------+---+---+-----+

Что я ищу for - это объединение имен 'class A' с именами 'class B', когда условие выполнено. Скажем, например, когда евклидово расстояние (элемент A - элемент B) <10. Я считаю, что перекрестное соединение - не лучшая идея, потому что оно может занимать много времени с большими наборами данных. </p>

Я думаю, было бы неплохо, если бы я мог повторить. псевдокод:

start with row1
if row1[class] != row2[class]:
    if row1['y'] - row2['y'] < 10:
        if Euclidean distance(row1.item - row2.item) < 10:
            join row1.name, row2.name
        end
    else break

it keeps iterating until row1['y'] - row['y'] >= 10

then a new iteration starts from row2
if row2[class] != row3[class]:
    etc etc

Таким образом, будет меньше проверок между различными строками. Что противоположно перекрестному соединению, где все проверяется на соответствие всем, что приводит к увеличению времени выполнения. Но как это можно сделать в искре? Есть идеи?

править. Требуемый результат (в любой форме):

brad, jerry

(единственная пара, в которой его элементы принадлежат разным классам и их евклидово расстояние равно 7, что меньше 10)

1 Ответ

2 голосов
/ 28 марта 2020

Если я правильно понимаю проблему, вы можете разделить фрейм данных на два фрейма данных на основе столбца class, а затем объединить их на основе указанного предложения объединения (используя внешнее соединение):

from pyspark.sql.functions import col, collect_list, struct
A_df = df.where(col('class') == 'A').withColumnRenamed('name', 'A.name')
B_df = df.where(col('class') == 'B').withColumnRenamed('name', 'B.name')

join_clause = A_df.y - B_df.y <= 10
result = A_df.join(B_df, join_clause, 'outer')

И с результирующим фреймом данных преобразуйте два столбца в один столбец списка:

result = result.withColumn(collect_list(struct(col('A.name'), col('B.name')))

Обновление

Вот реализация чего-то с использованием mapPartitions, нет присоединяется или преобразуется в DataFrame:

import math

from pyspark.sql import SparkSession


def process_data(rows):
    r = 1000
    joins = []
    for row1 in rows:
        for row2 in rows:
            if row1['class'] != row2['class']:
                if row1['x'] < row2['x'] + r:
                    if math.sqrt((row1['x'] - row2['x']) ** 2 + (row1['y'] - row2['y']) ** 2) <= r:
                        joins.append((row1['name'], row2['name']))
    return joins


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

people = [('john', 35, 54, 'A'),
          ('george', 94, 84, 'B'),
          ('nicolas', 7, 9, 'B'),
          ('tom', 86, 93, 'A'),
          ('jason', 62, 73, 'B'),
          ('bill', 15, 58, 'A'),
          ('william', 9, 3, 'A'),
          ('brad', 73, 37, 'B'),
          ('cosmo', 52, 67, 'B'),
          ('jerry', 73, 30, 'A')]

fields = ('name', 'x', 'y', 'class')

data = [dict(zip(fields, person)) for person in people]

rdd = spark.sparkContext.parallelize(data)

result = rdd.mapPartitions(process_data).collect()
print(result)

Вывод:

[('tom', 'jason'), ('cosmo', 'jerry')]

Обновление 2

Добавлена ​​начальная сортировка перейдите в поле 'y', переделите, чтобы убедиться, что все данные находятся в одном разделе (чтобы можно было сравнить все записи), и изменили вложенный l oop:

import math

from pyspark.sql import SparkSession


def process_data(rows):
    r = 1000
    joins = []
    rows = list(rows)
    for i, row1 in enumerate(rows):
        for row2 in rows[i:]:
            if row1['class'] != row2['class']:
                if row1['x'] < row2['x'] + r:
                    if math.sqrt((row1['x'] - row2['x']) ** 2 + (row1['y'] - row2['y']) ** 2) < r:
                        joins.append((row1['name'], row2['name']))
    return joins


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

people = [('john', 35, 54, 'A'),
          ('george', 94, 84, 'B'),
          ('nicolas', 7, 9, 'B'),
          ('tom', 86, 93, 'A'),
          ('jason', 62, 73, 'B'),
          ('bill', 15, 58, 'A'),
          ('william', 9, 3, 'A'),
          ('brad', 73, 37, 'B'),
          ('cosmo', 52, 67, 'B'),
          ('jerry', 73, 30, 'A')]

fields = ('name', 'x', 'y', 'class')

data = [dict(zip(fields, person)) for person in people]

rdd = spark.sparkContext.parallelize(data)

result = rdd.sortBy(lambda x: x['y'], ascending=False).repartition(1).mapPartitions(process_data).collect()
print(result)

Вывод:

[('william', 'nicolas'), ('william', 'brad'), ('william', 'cosmo'), ('william', 'jason'), ('william', 'george'), ('nicolas', 'jerry'), ('nicolas', 'john'), ('nicolas', 'bill'), ('nicolas', 'tom'), ('jerry', 'brad'), ('jerry', 'cosmo'), ('jerry', 'jason'), ('jerry', 'george'), ('brad', 'john'), ('brad', 'bill'), ('brad', 'tom'), ('john', 'cosmo'), ('john', 'jason'), ('john', 'george'), ('bill', 'cosmo'), ('bill', 'jason'), ('bill', 'george'), ('cosmo', 'tom'), ('jason', 'tom'), ('george', 'tom')]
...