Я общался между своими облачными сервисами. Я отправлял человека из одного сервиса и получал от другого сервиса, и он работал без какого-либо сериализатора или десериализатора.
Мой код такой:
@EnableBinding({ PersonStream.class })
public class StreamConfiguration {
}
public interface PersonStream {
String OUTPUT = "person-topic-out";
String INPUT = "person-topic-in";
@Input(INPUT)
SubscribableChannel inboundPerson();
@Output(OUTPUT)
MessageChannel outboundPerson();
}
@Service
public class PersonProducer {
@Autowired
private PersonStream personStream;
@Scheduled(fixedRate = 2000, initialDelay = 10000)
public void publishPerson() {
MessageChannel messageChannel = personStream.outboundPerson();
Person person = new Person("Omer", "Celik");
messageChannel.send(
MessageBuilder.withPayload(person)
.build());
}
}
@Service
public class PersonListener {
@StreamListener(value = PersonStream.INPUT)
private void personBulkReceiver(Person person) {
System.out.println(person.getName());
}
}
spring:
cloud:
stream:
kafka:
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
person-topic-in:
binder: defaultKafka
destination: person-topic
contentType: application/person
group : omercelik
person-topic-out:
binder: defaultKafka
destination: person-topic
contentType: application/json
После этого мне нужно было использовать данные в пакетном режиме. Но когда я использую как пакет, типы данных будут List. Я решил эту проблему, написав десериализатор и типы данных будут List. Однако таким образом я должен написать десериализатор для всех данных. Есть ли универсальный десериализатор c? Как мне написать дерисализатор generi c? Я не использую авро схему. Жду ваших предложений ...
@Service
public class PersonListener {
@StreamListener(value = PersonStream.INPUT)
private void personBulkReceiver(List<Person> person) {
System.out.println(person.get(0).getName());
System.out.println("personBulkReceiver : " + person.size());
}
}
public class PersonDeserializer implements Deserializer<Person> {
@Override
public Person deserialize(String s, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
Person p = objectMapper.readValue(bytes, Person.class);
return p;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
spring:
cloud:
stream:
kafka:
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bulkKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
max.poll.records: 1500
fetch.min.bytes: 1000000
fetch.max.wait.ms: 10000
value.deserializer: tr.cloud.stream.examples.PersonDeserializer
bindings:
person-topic-in:
binder: bulkKafka
destination: person-topic
contentType: application/person
group : omercelik
consumer:
batch-mode: true
person-topic-out:
binder: defaultKafka
destination: person-topic
contentType: application/json
Коды: https://github.com/omercelikceng/spring-cloud-stream-batch-consumer