Переиндексация (добавление строк) в Pyspark в масштабе - PullRequest
0 голосов
/ 30 мая 2018

Этот вопрос касается обработки большого набора данных наблюдений во времени.Работа на более позднем этапе требует общего временного шага между наблюдениями, но на практике необработанные данные часто пропускают временные шаги.При заданном временном шаге (скажем, 1 секунда) цель этого вопроса состоит в том, чтобы в полном диапазоне, наблюдаемом в необработанных данных, добавить строки, соответствующие любым пропущенным временным шагам, с использованием Pyspark .

Я достиг этого путем:

  1. Генерация новой последовательности значений времени, используя минимальное и максимальное наблюдаемое время и предполагаемый общий шаг по времени в Python
  2. Создание нового фрейма данных Spark из этой последовательности и соединение его с необработанными данными.

Мой вопрос заключается в том, существует ли более эффективный или естественный способ решения этой проблемы в Pyspark (или, если нет, то есть лиЕсть ли какие-нибудь очевидные улучшения в моем подходе?) *

Меня особенно интересует, может ли это быть эффективно решено в Pyspark, а не в Spark с кодом на Java, как в этот вопрос .

Ниже подробно описано мое решение, а также настройка и создание воспроизводимых тестовых данных.

Мое решение

spark = SparkSession \
.builder \
.appName("Spark StackOverflow Test") \
.getOrCreate()

df = spark.read\
.options(header=True, inferSchema=True)\
.csv('test_data.csv')

# find min and max observed times after timesteps have been subsampled
df.createOrReplaceTempView('test_view')
tmin = spark.sql('select min(date) from test_view').collect()[0]['min(date)']
tmax = spark.sql('select max(date) from test_view').collect()[0]['max(date)']

# create full second-by-second index
new_date_index = takewhile(lambda x: x <= tmax,
        date_seq_generator(tmin, datetime.timedelta(seconds=1)))

# create Spark dataframe for new time index
index_schema = StructType([StructField("date", StringType())])
time_rdd = sc.parallelize([datetime.datetime.strftime(t, '%Y-%m-%d %H:%M:%S')
                       for t in new_date_index])
df_dates = spark.createDataFrame(time_rdd.map(lambda s: s.split(',')),
                                 schema=index_schema)
# cast new index type from string to timestamp
df_dates = df_dates.withColumn("date", df_dates["date"].cast(TimestampType()))

# join the spark dataframes to reindex
reindexed = df_dates.join(df,
                      how='left',
                      on= df_dates.date == df.date).select([df_dates.date, df.foo])

Настройка и создание фиктивных воспроизводимых данных

Базовая форма:

                  date       foo
0  2018-01-01 00:00:00  0.548814
1  2018-01-01 00:00:01  0.715189
2  2018-01-01 00:00:02  0.602763
3  2018-01-01 00:00:03  0.544883
4  2018-01-01 00:00:04  0.423655
5  2018-01-01 00:00:05  0.645894
6  2018-01-01 00:00:08  0.963663
...

Код:

import datetime
import pandas as pd
import numpy as np
from itertools import takewhile
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import col

# set seed for data
np.random.seed(0)

def date_seq_generator(start, delta):
    """
    Generator function for time observations.

    :param start: datetime start time
    :param delta: timedelta between observations
    :returns: next time observation
    """
    current = start - delta
    while True:
        current += delta
        yield current

def to_datetime(datestring):
    """Convert datestring to correctly-formatted datetime object."""
    return datetime.datetime.strptime(datestring, '%Y-%m-%d %H:%M:%S')

# choose an arbitrary time period
start_time = to_datetime('2018-01-01 00:00:00')
end_time = to_datetime('2018-01-02 00:00:00')

# create the full time index between the start and end times
initial_times = list(takewhile(lambda x: x <= end_time,
            date_seq_generator(start_time, datetime.timedelta(seconds=1))))

# create dummy dataframe in Pandas
pd_df = pd.DataFrame({'date': initial_times,
                      'foo': np.random.uniform(size =len(initial_times))})

# emulate missing time indices
pd_df = pd_df.sample(frac=.7)

# save test data
pd_df.to_csv('test_data.csv', index=False)

1 Ответ

0 голосов
/ 19 марта 2019

Полная дата на Spark с Scala:

    import org.joda.time._
    import org.joda.time.format._
    import org.joda.time.format.DateTimeFormat
    import org.joda.time.DateTime
    import org.joda.time.Days
    import org.joda.time.Duration
    import org.apache.spark.sql.functions._
    import org.joda.time.LocalDate

      def dateComplete(dataFrameDate0: DataFrame, colName: String): DataFrame ={  
    def dayIterator(start: LocalDate, end: LocalDate) = Iterator.iterate(start)(_ plusDays 1) takeWhile (_ isBefore end)

    def dateSeries( date1 : String,date2 : String) : Array[String]= {
    val fromDate = new LocalDate(date1)
    val toDate = new LocalDate(date2)
    val series = dayIterator(fromDate,toDate).toArray
    val arr = series.map(a => a.toString())
    arr
    }
    val rangos = dataFrameDate0.agg(min($"invoicedate").as("minima_fecha"),         
    max($"invoicedate").as("maxima_fecha") )
    val serie_date = spark.sparkContext.parallelize(dateSeries( 
    rangos.select("minima_fecha", "maxima_fecha").take(1)(0)(0).toString, 
    rangos.select("minima_fecha", "maxima_fecha").take(1)(0)(1).toString )).toDF(colName)
    serie_date.join(dataFrameDate0, Seq(colName), "left")
    }

    val pivoteada=dateComplete(prod_group_day,"invoicedate").groupBy("key_product").pivot("invoicedate").agg(sum("cantidad_prod").as("cantidad"))
...