Этот вопрос касается обработки большого набора данных наблюдений во времени.Работа на более позднем этапе требует общего временного шага между наблюдениями, но на практике необработанные данные часто пропускают временные шаги.При заданном временном шаге (скажем, 1 секунда) цель этого вопроса состоит в том, чтобы в полном диапазоне, наблюдаемом в необработанных данных, добавить строки, соответствующие любым пропущенным временным шагам, с использованием Pyspark .
Я достиг этого путем:
- Генерация новой последовательности значений времени, используя минимальное и максимальное наблюдаемое время и предполагаемый общий шаг по времени в Python
- Создание нового фрейма данных Spark из этой последовательности и соединение его с необработанными данными.
Мой вопрос заключается в том, существует ли более эффективный или естественный способ решения этой проблемы в Pyspark (или, если нет, то есть лиЕсть ли какие-нибудь очевидные улучшения в моем подходе?) *
Меня особенно интересует, может ли это быть эффективно решено в Pyspark, а не в Spark с кодом на Java, как в этот вопрос .
Ниже подробно описано мое решение, а также настройка и создание воспроизводимых тестовых данных.
Мое решение
spark = SparkSession \
.builder \
.appName("Spark StackOverflow Test") \
.getOrCreate()
df = spark.read\
.options(header=True, inferSchema=True)\
.csv('test_data.csv')
# find min and max observed times after timesteps have been subsampled
df.createOrReplaceTempView('test_view')
tmin = spark.sql('select min(date) from test_view').collect()[0]['min(date)']
tmax = spark.sql('select max(date) from test_view').collect()[0]['max(date)']
# create full second-by-second index
new_date_index = takewhile(lambda x: x <= tmax,
date_seq_generator(tmin, datetime.timedelta(seconds=1)))
# create Spark dataframe for new time index
index_schema = StructType([StructField("date", StringType())])
time_rdd = sc.parallelize([datetime.datetime.strftime(t, '%Y-%m-%d %H:%M:%S')
for t in new_date_index])
df_dates = spark.createDataFrame(time_rdd.map(lambda s: s.split(',')),
schema=index_schema)
# cast new index type from string to timestamp
df_dates = df_dates.withColumn("date", df_dates["date"].cast(TimestampType()))
# join the spark dataframes to reindex
reindexed = df_dates.join(df,
how='left',
on= df_dates.date == df.date).select([df_dates.date, df.foo])
Настройка и создание фиктивных воспроизводимых данных
Базовая форма:
date foo
0 2018-01-01 00:00:00 0.548814
1 2018-01-01 00:00:01 0.715189
2 2018-01-01 00:00:02 0.602763
3 2018-01-01 00:00:03 0.544883
4 2018-01-01 00:00:04 0.423655
5 2018-01-01 00:00:05 0.645894
6 2018-01-01 00:00:08 0.963663
...
Код:
import datetime
import pandas as pd
import numpy as np
from itertools import takewhile
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import col
# set seed for data
np.random.seed(0)
def date_seq_generator(start, delta):
"""
Generator function for time observations.
:param start: datetime start time
:param delta: timedelta between observations
:returns: next time observation
"""
current = start - delta
while True:
current += delta
yield current
def to_datetime(datestring):
"""Convert datestring to correctly-formatted datetime object."""
return datetime.datetime.strptime(datestring, '%Y-%m-%d %H:%M:%S')
# choose an arbitrary time period
start_time = to_datetime('2018-01-01 00:00:00')
end_time = to_datetime('2018-01-02 00:00:00')
# create the full time index between the start and end times
initial_times = list(takewhile(lambda x: x <= end_time,
date_seq_generator(start_time, datetime.timedelta(seconds=1))))
# create dummy dataframe in Pandas
pd_df = pd.DataFrame({'date': initial_times,
'foo': np.random.uniform(size =len(initial_times))})
# emulate missing time indices
pd_df = pd_df.sample(frac=.7)
# save test data
pd_df.to_csv('test_data.csv', index=False)