Я хочу прочитать потоковые данные из скользящего окна в искре, Я хочу применить другую внешнюю функцию к потоковым данным, Когда я применяю разностную функцию, но она не применяется, Я хочу применить вечную функцию к потоковым данным
Пример У меня есть внешняя функция
scaler1 = MinMaxScaler(feature_range=(-1, 1))
def difference(dataset, interval=1):
diff = list()
for i in range(interval, len(dataset)):
value = dataset[i] - dataset[i - interval]
diff.append(value)
return Series(diff)
, когда я применяю эту функцию к потоковым данным, есть какой-либо результат, отображается
c=list()
ssc = StreamingContext(scc, 1)
activeUsers = [[120.92187299645627],
[121.84247351449525],
[122.87717906432528],
[ 123.07419758947418],
[ 124.83203764216505],
[123.278584495919],
[123.04382133819664],
[120.92187299645627],
[121.84247351449525],
[122.87717906432528],
[ 123.07419758947418],
[ 124.83203764216505],
[123.278584495919],
[123.04382133819664],
[120.92187299645627],
[121.84247351449525],
[122.87717906432528],
[ 123.07419758947418],
[ 124.83203764216505],
[123.278584495919],
[123.04382133819664]]
rddQueue = []
for datum in activeUsers:
rddQueue += [ssc.sparkContext.parallelize(datum)]
inputStream = ssc.queueStream(rddQueue)
k=inputStream.window(5, 1).map(lambda x: [x])
s=k.map(difference)
s.pprint()
ssc.start()
time.sleep(1)