Кафка-стрим: Как переназначить поток, выбрав ключ из списка значений - PullRequest
0 голосов
/ 06 ноября 2019

У меня есть объект A:

public class A {
  String id;
  List<String> otherIds;
  SomeOtherObject object;
}

У меня есть поток kafka, который выглядит следующим образом:

KStream<Integer, A> inputStream

Мне нужно повторно набрать inputStream поток так, чтобы это было сейчас:

KStream<String, A> newStream

Где ключевой частью newStream является otherId из A.otherIds .

Для примера

Let's say, A is like : { id:1, otherIds:[ "ab","bc","ca"],OtherObject: obj1}.

And inputStream if like <1,A>,

Then the newStream should have:
<"ab",A>
<"bc", A>
<"ca",A>

Грубо говоря, чтобы понять, что я пытаюсь, это:

 KStream<String,  A> newStream =
        inputStream
                .map((key,val) ->
                                val.getOtherIds().stream().forEach(e->
                        KeyValue.pair(e,val))
                );

Есть ли способ сделать это (Rekeying, выбравключ из списка значений)?

Ответы [ 2 ]

1 голос
/ 06 ноября 2019

Поскольку вы хотите разделить одну запись на несколько, вы должны использовать flatMap() вместо map(). map() - это операция 1: 1, а flatMap() - 1: n.

. Для возврата нескольких пар KeyValue ваша flatMap() может, например, return a List<KevValue> (любая другая *). 1011 * тип тоже подойдет)

0 голосов
/ 06 ноября 2019

По предложению @Matthias я сделал это следующим образом (это сработало):

KStream<String,  A> newStream =
        inputStream
                .flatMap((key,A)->{

                    return A.getOtherIdsList().stream().map( id ->{
                        return KeyValue.pair(id, A);

                            })
                            .collect(Collectors.toSet());
                 });

Спасибо

...