Они дали мне таблицу, в которой хранятся показания датчика со схемой [TimeStamp, SensorKey, SensorValue]
.
TimeStamp Id Value
2019-01-01 00:00:47 1 66.6
2019-01-01 00:00:47 2 0.66
2019-01-01 00:00:57 1 66.7
2019-01-01 00:00:57 2 0.68
2019-01-01 00:00:57 3 166.6
2019-01-01 00:01:07 3 146.6
Обратите внимание, что он сохраняет только изменения показаний датчика с ограниченной точностью и частотой дискретизации и повторяет значение каждый час.после последнего изменения, если оно не изменилось.
Их запросы означают проверку значения датчика A (и B, и C, и D ...), когда значение датчика Z проходит это условие.И они хотят использовать Python и Spark.
Таким образом, чтобы сравнить значения разных датчиков, я получаю строки для этих клавиш датчиков и объединяю результаты в схему [TimeStamp, ValueOfA,..., значение Z].
df1 = df0.groupBy("TS").pivot("Id", listOfIds).agg(F.last("Value"))
TimeStamp Sensor1 Sensor2 Sensor3
2019-01-01 00:00:47 66.6 0.66 Null
2019-01-01 00:00:57 66.7 0.68 166.6
2019-01-01 00:01:07 Null Null 146.6
Затем я заполняю пробелы (всегда впереди, если у меня нет более старых данных для заполнения первых строк, я их отбрасываю).
window1hour = Window.orderBy('TS').rowsBetween(-360, 0)
# 360 = 1 hour / 0.1 Hz sampling rate.
df2 = df1
for sid in sensorIds:
df2 = df2\
.withColumn(sid, F.last(F.column(sid), ignorenulls=True).over(window1hour))\
.filter(F.column(sid).isNotNull())
Сравнение, столбец за столбцом, теперь тривиально.
Но по сравнению с тем же самым с pandas
это медленнее, настолько, что кажется, что я делаю что-то не так.По крайней мере, для небольших запросов.
Что происходит?И что будет, когда это большой запрос?
О малом и большом: у меня более тысячи различных датчиков и около миллиарда записей в год.Таким образом, данные определенно помещаются на одном сервере, но не в оперативной памяти.Фактически, они начнут только с одного сервера для данных, возможно, второго для второго экземпляра Spark (как многопроцессорного, так и с большим объемом памяти), и, надеюсь, они будут инвестировать в большее количество оборудования, если увидят возврат.Они начнут делать небольшие запросы день ото дня, и они хотят их быстро.Но позже они захотят выполнять запросы в течение нескольких лет, и они не должны взорваться.
Идеи / сомнения: выполняется ли предварительная обработка в одном потоке?Должен ли я сам стабилизировать распараллеливание или позволить Spark справиться с этим?Должен ли я разбивать запросы, охватывающие весь год, на многие, охватывающие целый день (но тогда зачем мне вообще Spark)?Решаю ли я небольшие запросы в пандах и большие в Spark (и могу ли я заранее установить порог)?
Какие еще улучшения можно применить?