Как можно применить внешнюю функцию для искры потоковой передачи данных - PullRequest
0 голосов
/ 28 апреля 2020

Я хочу прочитать потоковые данные из скользящего окна в искре, Я хочу применить другую внешнюю функцию к потоковым данным, Когда я применяю разностную функцию, но она не применяется, Я хочу применить вечную функцию к потоковым данным

Пример У меня есть внешняя функция

scaler1 = MinMaxScaler(feature_range=(-1, 1))
def difference(dataset, interval=1):
    diff = list()
    for i in range(interval, len(dataset)):
            value = dataset[i] - dataset[i - interval]
            diff.append(value)
    return Series(diff)

, когда я применяю эту функцию к потоковым данным, есть какой-либо результат, отображается

  c=list()

ssc = StreamingContext(scc, 1)

activeUsers = [[120.92187299645627],
               [121.84247351449525],
               [122.87717906432528],
               [ 123.07419758947418],
               [ 124.83203764216505],
               [123.278584495919],
               [123.04382133819664],
               [120.92187299645627],
               [121.84247351449525],
               [122.87717906432528],
               [ 123.07419758947418],
               [ 124.83203764216505],
               [123.278584495919],
               [123.04382133819664],
               [120.92187299645627],
               [121.84247351449525],
               [122.87717906432528],
               [ 123.07419758947418],
               [ 124.83203764216505],
               [123.278584495919],
               [123.04382133819664]]

rddQueue = []
for datum in activeUsers:
    rddQueue += [ssc.sparkContext.parallelize(datum)]    
inputStream = ssc.queueStream(rddQueue)
k=inputStream.window(5, 1).map(lambda x: [x])
s=k.map(difference)
s.pprint()


ssc.start()
time.sleep(1) 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...