Я использую Pyspark для создания конвейеров данных из источника данных с высокой фиксированной частотой. То есть сигналы сообщают о своих состояниях с заданной частотой, а не когда они меняют состояния. Я знаю, как делать то, что мне нужно, но я чувствую, что есть более эффективный способ сделать это. В настоящее время я работаю над рабочим процессом, в котором мне нужно определить количество различных значений «Вкл» из источника. Данные имеют следующую структуру:
Signal_Value Timestamp
Off 1
Off 2
On 3
On 4
On 5
Off 6
Off 7
On 8
On 9
Off 10
Таким образом, в этом случае количество различных сигналов «Вкл.» Равно 2, хотя всего имеется 5 значений «Вкл.» (Я определяю различные сигналы. здесь, как группы On's). Прямо сейчас мой процесс состоит в том, чтобы выбрать данные, упорядочить их по метке времени, а затем выполнить итерацию по всему списку сигналов и добавлять 1 к счетной переменной каждый раз, когда l oop сталкивается с ситуацией, когда текущий сигнал "Вкл" и предыдущий сигнал выключен. Кажется неэффективным l oop по всему списку. Есть ли лучший способ сделать это?
Для более подробной информации, мой код выглядит примерно так:
df = spark.sql("select id, signal_value, timestamp from database.table")
# Following functions used at the bottom for map() & reduceByKey() functions
def key_value_map(x):
key = x[0]
value = [tuple((x[1],x[2]))]
return (key,value)
def reduce(x):
return x+y # combining tuple-lists
def time_order(x):
id = x[0]
signal_list = x[1] # has structure: (signal_value, timestamp)
sorted_signal_list = sorted(signal_list, key=lambda x: x[1])
return (id, sorted_signal_list)
def metrics_map(x):
id = x[0]
signal_list = x[1] # has structure: (signal_value, timestamp)
count = 0
for i in range(1, len(signal_list)):
value = signal_list[i][0] # 0th element is signal_value
previous_value = signal_list[i-1][0]
if value == "On" and previous_value == "Off":
count += 1
return (id, count)
mapped = df.rdd.map(key_value_map)
reduced = mapped.reduceByKey(reduce)
mapped2 = reduced.map(time_order)
mapped3 = mapped2.map(metrics_map)
result = mapped3.collect()