Pyspark: эффективный способ подсчета различных сигналов «On» на основе данных высокой фиксированной частоты - PullRequest
0 голосов
/ 17 февраля 2020

Я использую Pyspark для создания конвейеров данных из источника данных с высокой фиксированной частотой. То есть сигналы сообщают о своих состояниях с заданной частотой, а не когда они меняют состояния. Я знаю, как делать то, что мне нужно, но я чувствую, что есть более эффективный способ сделать это. В настоящее время я работаю над рабочим процессом, в котором мне нужно определить количество различных значений «Вкл» из источника. Данные имеют следующую структуру:

Signal_Value   Timestamp
    Off            1
    Off            2
    On             3
    On             4
    On             5
    Off            6
    Off            7
    On             8
    On             9
    Off            10

Таким образом, в этом случае количество различных сигналов «Вкл.» Равно 2, хотя всего имеется 5 значений «Вкл.» (Я определяю различные сигналы. здесь, как группы On's). Прямо сейчас мой процесс состоит в том, чтобы выбрать данные, упорядочить их по метке времени, а затем выполнить итерацию по всему списку сигналов и добавлять 1 к счетной переменной каждый раз, когда l oop сталкивается с ситуацией, когда текущий сигнал "Вкл" и предыдущий сигнал выключен. Кажется неэффективным l oop по всему списку. Есть ли лучший способ сделать это?

Для более подробной информации, мой код выглядит примерно так:

df = spark.sql("select id, signal_value, timestamp from database.table")

# Following functions used at the bottom for map() & reduceByKey() functions

def key_value_map(x):
     key = x[0]
     value = [tuple((x[1],x[2]))]
     return (key,value)

def reduce(x):
     return x+y # combining tuple-lists

def time_order(x):
     id = x[0]
     signal_list = x[1] # has structure: (signal_value, timestamp)
     sorted_signal_list = sorted(signal_list, key=lambda x: x[1])
     return (id, sorted_signal_list)

def metrics_map(x):
     id = x[0]
     signal_list = x[1] # has structure: (signal_value, timestamp)
     count = 0
     for i in range(1, len(signal_list)):
          value = signal_list[i][0] # 0th element is signal_value
          previous_value = signal_list[i-1][0]
          if value == "On" and previous_value  == "Off":
               count += 1
     return (id, count)

mapped = df.rdd.map(key_value_map)
reduced = mapped.reduceByKey(reduce)
mapped2 = reduced.map(time_order)
mapped3 = mapped2.map(metrics_map)
result = mapped3.collect()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...