Я пытаюсь реализовать мультитенантный микро сервис с использованием Spring Boot. Я уже реализовал веб-слой и постоянный слой. На веб-уровне я реализовал фильтр, который устанавливает идентификатор клиента в компоненте-прототипе (используя ThreadLocalTargetSource), на уровне персистентности я использовал конфигурацию мультитенантности Hibernate (схема на клиента), они работают нормально, данные сохраняются в соответствующая схема. В настоящее время я реализую то же поведение на уровне обмена сообщениями, используя библиотеку spring-kaka, и пока все работает так, как я ожидал, но я хотел бы знать, есть ли лучший способ сделать это.
Здесь мой код:
Это класс, который управляет KafkaMessageListenerContainer:
@Component
public class MessagingListenerContainer {
private final MessagingProperties messagingProperties;
private KafkaMessageListenerContainer<String, String> container;
@PostConstruct
public void init() {
ContainerProperties containerProps = new ContainerProperties(
messagingProperties.getConsumer().getTopicsAsList());
containerProps.setMessageListener(buildCustomMessageListener());
container = createContainer(containerProps);
container.start();
}
@Bean
public MessageListener<String, String> buildCustomMessageListener() {
return new CustomMessageListener();
}
private KafkaMessageListenerContainer<String, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
…
return container;
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
…
return props;
}
@PreDestroy
public void finish() {
container.stop();
}
}
Это CustomMessageListener:
@Slf4j
public class CustomMessageListener implements MessageListener<String, String> {
@Autowired
private TenantStore tenantStore; // Prototype Bean
@Autowired
private List<ServiceListener> services;
@Override
public void onMessage(ConsumerRecord<String, String> record) {
log.info(“Tenant {} | Payload: {} | Record: {}", record.key(),
record.value(), record.toString());
tenantStore.setTenantId(record.key()); // Currently tenant is been setting as key
services.stream().forEach(sl -> sl.onMessage(record.value()));
}
}
Это тестовая служба, которая будет использовать данные сообщения и арендатор:
@Slf4j
@Service
public class ConsumerService implements ServiceListener {
private final MessagesRepository messages;
private final TenantStore tenantStore;
@Override
public void onMessage(String message) {
log.info("ConsumerService {}, tenant {}", message, tenantStore.getTenantId());
messages.save(new Message(message));
}
}
Спасибо за ваше время!