Как написать десериализатор generi c с Spring Cloud Stream? (Batch-Consumer) - PullRequest
0 голосов
/ 12 января 2020

Я общался между своими облачными сервисами. Я отправлял человека из одного сервиса и получал от другого сервиса, и он работал без какого-либо сериализатора или десериализатора.

Мой код такой:

@EnableBinding({ PersonStream.class })
public class StreamConfiguration {
}

public interface PersonStream {

    String OUTPUT = "person-topic-out";
    String INPUT = "person-topic-in";

    @Input(INPUT)
    SubscribableChannel inboundPerson();

    @Output(OUTPUT)
    MessageChannel outboundPerson();
}

@Service
public class PersonProducer {

    @Autowired
    private PersonStream personStream;

    @Scheduled(fixedRate = 2000, initialDelay = 10000)
    public void publishPerson() {
        MessageChannel messageChannel = personStream.outboundPerson();
        Person person = new Person("Omer", "Celik");
        messageChannel.send(
                MessageBuilder.withPayload(person)
                        .build());
    }
}

@Service
public class PersonListener {

    @StreamListener(value = PersonStream.INPUT)
    private void personBulkReceiver(Person person) {
        System.out.println(person.getName());
    }
}

spring:
  cloud:
    stream:
      kafka:
      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
      bindings:
        person-topic-in:
          binder: defaultKafka
          destination: person-topic
          contentType: application/person
          group : omercelik
        person-topic-out:
          binder: defaultKafka
          destination: person-topic
          contentType: application/json

После этого мне нужно было использовать данные в пакетном режиме. Но когда я использую как пакет, типы данных будут List. Я решил эту проблему, написав десериализатор и типы данных будут List. Однако таким образом я должен написать десериализатор для всех данных. Есть ли универсальный десериализатор c? Как мне написать дерисализатор generi c? Я не использую авро схему. Жду ваших предложений ...

@Service
public class PersonListener {

    @StreamListener(value = PersonStream.INPUT)
    private void personBulkReceiver(List<Person> person) {
        System.out.println(person.get(0).getName());
        System.out.println("personBulkReceiver : " + person.size());
    }
}

public class PersonDeserializer implements Deserializer<Person> {

    @Override
    public Person deserialize(String s, byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            Person p = objectMapper.readValue(bytes, Person.class);
            return p;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

spring:
  cloud:
    stream:
      kafka:
      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
        bulkKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        max.poll.records: 1500
                        fetch.min.bytes: 1000000
                        fetch.max.wait.ms: 10000
                        value.deserializer: tr.cloud.stream.examples.PersonDeserializer
      bindings:
        person-topic-in:
          binder: bulkKafka
          destination: person-topic
          contentType: application/person
          group : omercelik
          consumer:
            batch-mode: true
        person-topic-out:
          binder: defaultKafka
          destination: person-topic
          contentType: application/json

Коды: https://github.com/omercelikceng/spring-cloud-stream-batch-consumer

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...