Агрегирование окон по многим столбцам в Spark - PullRequest
1 голос
/ 04 февраля 2020

Не удается выполнить агрегацию по многим столбцам в Pyspark. Существуют сотни логических столбцов, показывающих текущее состояние системы, и строка добавляется каждую секунду. Цель состоит в том, чтобы преобразовать эти данные, чтобы показать количество изменений состояния для каждого окна продолжительностью 10 секунд.

Я планировал сделать это в два этапа: сначала XOR логическое значение со значением предыдущей строки, затем вторая сумма по 10 секундное окно. Вот примерный код, который я придумал:

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time

sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)

# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
    [(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(10000)],
    ['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

# Generate changes
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time')
# data_window = Window.orderBy('Time')
df = df.select('Time', *[F.col(m).bitwiseXOR(F.lag(m, 1).over(data_window)).alias(m) for m in cols])

df = df.groupBy(F.window('Time', '10 seconds')) \
    .agg(*[F.sum(m).alias(m) for m in cols]) \
    .withColumn('start_time', F.col('window')['start']) \
    .drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

С data_window, разделенным по минутам, Spark генерирует 52 этапа, каждый из которых зависит от последнего. Увеличение num_of_cols также увеличивает количество этапов. Мне кажется, это должно быть смущающе распараллеливаемой проблемой. Сравните каждую строку с последней, а затем агрегируйте по 10 секунд. Удаление раздела data_window позволяет выполнять его за один этап, но при этом все данные на одном разделе вызываются для его достижения.

Почему этапы зависят друг от друга, есть ли лучший способ записи это улучшить распараллеливание? Я думаю, что было бы возможно сделать несколько агрегаций по одному и тому же окну одновременно. В конечном счете, для этого потребуется масштабировать до сотен столбцов. Есть ли какие-то хитрости для повышения производительности в этот момент?

1 Ответ

0 голосов
/ 06 февраля 2020

Основываясь на полезном ответе Георга, я придумал следующее:

import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time
import pprint


sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)


@F.pandas_udf(T.ArrayType(T.IntegerType()), F.PandasUDFType.GROUPED_AGG)
def pandas_xor(v):
    values = v.values
    if len(values) == 1:
        return values[0] * False
    elif len(values) == 2:
        return values[0] ^ values[1]
    else:
        raise RuntimeError('Too many values given to pandas_xor: {}'.format(values))


# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
    [(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(100000)],
    ['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

df = df.select('Time', F.array(*cols).alias('data'))

# XOR
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time').rowsBetween(Window.currentRow, 1)
# data_window = Window.orderBy('Time')
df = df.select('Time', pandas_xor(df.data).over(data_window).alias('data'))

df = df.groupBy(F.window('Time', '10 seconds')) \
    .agg(*[F.sum(F.element_at('data', i + 1)).alias(m) for i, m in enumerate(cols)]) \
    .withColumn('start_time', F.col('window')['start']) \
    .drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

Со следующими инструкциями, чтобы запустить его с Spark 3.0.0preview2

  1. Загрузить Spark 3.0.0

    mkdir contrib
    wget -O contrib/spark-3.0.0-preview2.tgz 'https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz'
    tar -C contrib -xf contrib/spark-3.0.0-preview2.tgz
    rm contrib/spark-3.0.0-preview2.tgz
    
  2. В первой оболочке настройте среду для использования Pyspark 3.0.0

    export SPARK_HOME="$(pwd)/contrib/spark-3.0.0-preview2-bin-hadoop2.7"
    export PYTHONPATH="$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip"
    
  3. Начните с задание pyspark

    time python3 so-example.py
    

    Просмотр локального веб-интерфейса Spark Run по адресу http://localhost: 4040

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...