Как обновить поток ответом из другого потока, где тип приемника "http-response" - PullRequest
2 голосов
/ 01 октября 2019

Я пытаюсь обогатить мой входной поток дополнительным атрибутом, который заполняется через приемник ответов "http-response".

Я пытался использовать "join" с атрибутом окна и с ключевым словом "each", чтобыобъедините два потока и вставьте полученный объединенный поток в другой поток, чтобы обогатить его.

Атрибуты окна (window.time (1 сек) или window.length (1)) и ключевое слово «каждый» хорошо работают, когдавходящие события происходят с регулярным интервалом в 1 секунду или более.

Когда (например, 10 или 100) события отправляются одновременно (в течение секунды). Тогда результат слияния не соответствует ожидаемым.

Тот, у кого атрибут "window" (объединение)

**

from EventInputStreamOne#window.time(1 sec) as i
        join EventInputStreamTwo as s
        on i.variable2 == s.variable2
select i.variable1 as variable1, i.variable2 as variable2, s.variable2 as variable2
insert into EventOutputStream;

**

Тот, что с ключевым словом «каждый»

**

from every e1=EventInputStream,e2=EventResponseStream
select e1.variable1 as variable1, e1.variable2 as variable2, e2.variable3 as variable3
insert into EventOutputStream;

**

Есть ли лучший способ объединить два потока для обновлениятретий поток?

Ответы [ 2 ]

1 голос
/ 02 октября 2019

Чтобы получить исходные атрибуты запроса, вы можете использовать пользовательское сопоставление следующим образом:

@source(type='http-call-response', sink.id='source-1'
       @map(type='json',@attributes(name='name', id='id', volume='trp:volume', price='trp:price')))
define stream responseStream(name String, id int, headers String, volume long, price float);

Здесь к атрибутам запроса можно получить доступ с помощью trp:attributeName, в этом примере только имя взято из ответа,цена и объем от запроса.

0 голосов
/ 11 октября 2019

Синтаксис в вашем подходе к каждому ключевому слову не совсем правильный. Вы пробовали что-то вроде этого:

from every (e1 = event1) -> e2=event2[e1.variable == e2.variable]
select e1.variable1, e2.variable1, e2.variable2
insert into outputEvent;

Этот документ может помочь.

...