У меня есть поток твитов в текстовом формате (TwitterStream) и поток настроений для каждого твита (SentimentStream). SentimentStream подписывается на TwitterStream, выполняет анализ настроений и публикует новое сообщение с результатом и порядковым номером TwitterStream.
Я пытаюсь объединить эти два потока, где SentimentStream.seq равен порядковому номеруиз твитов. У меня проблема в том, что я не могу получить «дескриптор» порядкового номера из TwitterStream.
Я пытался найти способ получить «метаданные» события, которые могли бы дать некоторое представление опозиция / порядковый номер события.
@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')
@sink(type = 'log',
@map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);
@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);
@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);
-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;
Любая помощь будет оценена.