Как объединить темы с потоком Кафка - PullRequest
3 голосов
/ 15 января 2020

Поток Кафки Я пытаюсь транслировать, но у меня есть некоторые проблемы, это не работает. Для начала у меня есть три разъема, но я не могу использовать свои собственные ключи. Мне нужны ключи, чтобы присоединиться к ним, верно? Как я могу присоединиться с 2 или более ключами? Я пытаюсь повторить что-то вроде этого: выберите * из (выберите. * Из пользователей внутреннее соединение b на a.dep = b.dep и a.group = b.group) внутреннее объединение user_afy на a.id = b .id

Я хочу сохранить данные внутреннего объединения в топи c и использовать их для внешнего объединения. Это пример, который у меня есть.

Свойства соединителя:

....
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=timestamp
query=select id, user, dep,tal, group,time from users
numeric.mapping=best_fit
table.types=TABLE
topic=users  
// I try use this with 1 or more fields but not worked  
transforms=createKey, extractInt  
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey  
transforms.createKey.fields=dep, group  
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key  
transforms.extractInt.field=dep, group  

standalone.properties

bootstrap.servers=localhost:9092  
key.converter=org.apache.kafka.connect.json.JsonConverter  
value.converter=org.apache.kafka.connect.json.JsonConverter  
key.converter.schemas.enable=false  
value.converter.schemas.enable=false  
offset.storage.file.filename=D:/tmp/connect.offsets  
plugin.path=D:/connector/lib  

Темы:

Topic users    
{"id":"0001", "user":"Alex", "dep":"ofi", "postal":170, group="ingen",time:"xxx"}    
{"id":"0002", "user":"Emy", "dep":"lab", "postal":170, group="itn",time:"xxx"}    
{"id":"0003", "user":"Lea", "dep":"lab", "postal":170, group="itn",time:"xxx"}    
{"id":"0004", "user":"Silva", "dep":"cent", "postal":170, group="ingen",time:"xxx"}    
{"id":"0005", "user":"Foxy", "dep":"cent", "postal":170, group="ete",time:"xxx"}    

topic user_afy
{"id":"0001", name="bask"}
{"id":"0001", name="Silf"}
{"id":"0002", name="BTT"}
{"id":"0005", name="butf"}


Topic deps  
{"id_dep":"1", "dep":"ofi", "sind"="worker", "group"="ingen."}  
{"id_dep":"2", "dep":"lab", "sind"="worker", "group"="iti."}  
{"id_dep":"3", "dep":"cent", "sind"="worker", "group"="etc."} 

Мой код это пример официального сайта, но я не могу его проверить

public static void main(String[] Args) {
        Properties props = new Properties();
        props.put(......);

    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
        StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, JsonNode> left = builder.stream("user", consumed);
        KTable<String, JsonNode> right = builder.table("deps", consumed);
        KStream<String, String> joined = left.join(right,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
            Joined.with(Serdes.String(), jsonSerde, jsonSerde)
        );
//Edit
       joined.foreach((k, v) -> {
          System.out.println("key="+k+ ", val=" + v);
       });

}

Вывод, как бы он показывал? Для создания новой топи c предпочтительнее ли использовать хеш-карту со значениями, которые вы хотите сохранить в формате json? Позже я буду создавать пользовательские Serdes

1 Ответ

0 голосов
/ 24 января 2020

Что вы подразумеваете под «Я не могу использовать свои собственные ключи?». В Kafka Streams вы всегда можете установить новый ключ, необходимый для вашей обработки.

Если вы хотите прочитать данные в таблицу KTable, вы не можете изменить ключ простым способом. Вам нужно прочитать topi c как KStream, установить новый ключ и преобразовать KStream в KTable (cf Kafka Streams API: KStream в KTable ).

Для нескольких последовательные объединения, вы можете просто «связать» соответствующие операции вместе.

builder.stream("topic-1").selectKey(...).to("table-topic-1");
KTable t1 = builder.table("table-topic-1");

KStream firstJoinResult = builder.stream(...).selectKey(...).join(t1, ...).

builder.stream("topic-2").selectKey(...).to("table-topic-2");
KTable t2 = builder.table("table-topic-2");

firstJoinResult.selectKey(...).join(t2, ...).to("result-topic");
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...