Получение порядкового номера потокового события NATS в Siddhi - PullRequest
0 голосов
/ 26 октября 2019

У меня есть поток твитов в текстовом формате (TwitterStream) и поток настроений для каждого твита (SentimentStream). SentimentStream подписывается на TwitterStream, выполняет анализ настроений и публикует новое сообщение с результатом и порядковым номером TwitterStream.

Я пытаюсь объединить эти два потока, где SentimentStream.seq равен порядковому номеруиз твитов. У меня проблема в том, что я не могу получить «дескриптор» порядкового номера из TwitterStream.

Я пытался найти способ получить «метаданные» события, которые могли бы дать некоторое представление опозиция / порядковый номер события.

@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')

@sink(type = 'log', 
    @map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);

@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);

@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);

-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;

Любая помощь будет оценена.

1 Ответ

1 голос
/ 28 октября 2019

Мы создали запрос на улучшение функции через https://github.com/siddhi-io/siddhi-io-nats/issues/25, чтобы обеспечить эту поддержку.

...