Для этого вам нужно
прежде всего отключить автоконфигурацию GCP для pubsub
@SpringBootApplication(exclude = {
GcpPubSubAutoConfiguration.class,
GcpPubSubReactiveAutoConfiguration.class
})
public class PubsubApplication {
public static void main(String[] args) {
SpringApplication.run(PubsubApplication.class, args);
}
}
, затем создать конфигурацию для первого проекта
@Configuration
public class Project1Config {
private static final Logger LOGGER = LoggerFactory.getLogger(Project1Config.class);
@Bean(name = "project1_IdProvider")
public GcpProjectIdProvider project1_IdProvider() {
return new DefaultGcpProjectIdProvider() {
@Override
public String getProjectId() {
return "YOURPROJECTID";
}
};
}
@Bean(name = "project1_credentialsProvider")
public CredentialsProvider project1_credentialsProvider() throws IOException {
return new CredentialsProvider() {
@Override
public Credentials getCredentials() throws IOException {
return ServiceAccountCredentials.fromStream(
new ClassPathResource("YOURCREDENTIALS").getInputStream());
}
};
}
@Bean("project1_pubSubSubscriberTemplate")
public PubSubSubscriberTemplate pubSubSubscriberTemplate(
@Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory) {
return new PubSubSubscriberTemplate(subscriberFactory);
}
@Bean("project1_publisherFactory")
public DefaultPublisherFactory publisherFactory(
@Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
return defaultPublisherFactory;
}
@Bean("project1_subscriberFactory")
public DefaultSubscriberFactory subscriberFactory(
@Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
return defaultSubscriberFactory;
}
@Bean(name = "project1_pubsubInputChannel")
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean(name = "project1_pubSubTemplate")
public PubSubTemplate project1_PubSubTemplate(
@Qualifier("project1_publisherFactory") PublisherFactory publisherFactory,
@Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory,
@Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
if (publisherFactory instanceof DefaultPublisherFactory) {
((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
}
return new PubSubTemplate(publisherFactory, subscriberFactory);
}
@Bean(name = "project1_messageChannelAdapter")
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("project1_pubsubInputChannel") MessageChannel inputChannel,
@Qualifier("project1_pubSubTemplate") PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "YOURSUBSCRIPTIONNAME");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean("project1_messageReceiver")
@ServiceActivator(inputChannel = "project1_pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
LOGGER.info("Message headers {}", message.getHeaders());
BasicAcknowledgeablePubsubMessage originalMessage =
message
.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}
@Bean("project1_messageSender")
@ServiceActivator(inputChannel = "project1_pubsubOutputChannel")
public MessageHandler messageSender(
@Qualifier("project1_pubSubTemplate") PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "YOURTOPICNAME");
}
}
Далее - создать конфигурацию для проекта2
@Configuration
public class Project2Config {
private static final Logger LOGGER = LoggerFactory.getLogger(Project2Config.class);
@Bean(name = "project2_IdProvider")
public DefaultGcpProjectIdProvider project2_IdProvider() {
return new DefaultGcpProjectIdProvider() {
@Override
public String getProjectId() {
return "project-id-lksjfkalsdjfkl";
}
};
}
@Bean(name = "project2_credentialsProvider")
public CredentialsProvider project2_credentialsProvider() throws IOException {
return new CredentialsProvider() {
@Override
public Credentials getCredentials() throws IOException {
return ServiceAccountCredentials.fromStream(
new ClassPathResource("project2.json").getInputStream());
}
};
}
@Bean("project2_pubSubSubscriberTemplate")
public PubSubSubscriberTemplate pubSubSubscriberTemplate(
@Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory) {
return new PubSubSubscriberTemplate(subscriberFactory);
}
@Bean("project2_publisherFactory")
public DefaultPublisherFactory publisherFactory(
@Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
return defaultPublisherFactory;
}
@Bean("project2_subscriberFactory")
public DefaultSubscriberFactory subscriberFactory(
@Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
return defaultSubscriberFactory;
}
@Bean(name = "project2_pubsubInputChannel")
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean(name = "project2_pubSubTemplate")
public PubSubTemplate project2_PubSubTemplate(
@Qualifier("project2_publisherFactory") PublisherFactory publisherFactory,
@Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory,
@Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
if (publisherFactory instanceof DefaultPublisherFactory) {
((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
}
return new PubSubTemplate(publisherFactory, subscriberFactory);
}
@Bean(name = "project2_messageChannelAdapter")
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("project2_pubsubInputChannel") MessageChannel inputChannel,
@Qualifier("project2_pubSubTemplate") PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "project2-testSubscription");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean("project2_messageReceiver")
@ServiceActivator(inputChannel = "project2_pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
LOGGER.info("Message Payload: " + new String((byte[]) message.getPayload()));
LOGGER.info("Message headers {}", message.getHeaders());
BasicAcknowledgeablePubsubMessage originalMessage =
message
.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}
@Bean(name = "project2_messageSender")
@ServiceActivator(inputChannel = "project2_pubsubOutputChannel")
public MessageHandler messageSender(
@Qualifier("project2_pubSubTemplate") PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "project2-testTopic");
}
}
Создать исходящий шлюз для проекта 1
project1_pubsubOutputChannel - указывается в Project1Config
@Service
@MessagingGateway(defaultRequestChannel = "project1_pubsubOutputChannel")
public interface Project1PubsubOutboundGateway {
void sendToPubsub(String text);
}
Создать исходящий шлюз для проекта 2
project2_pubsubOutputChannel - указывается в Project2Config
@Service
@MessagingGateway(defaultRequestChannel = "project2_pubsubOutputChannel")
public interface Project2PubsubOutboundGateway {
void sendToPubsub(String text);
}
сейчас мы успешны:
@RestController
public class WebAppController {
// tag::autowireGateway[]
@Autowired private Project1PubsubOutboundGateway project1PubsubOutboundGateway;
@Autowired private Project2PubsubOutboundGateway project2PubsubOutboundGateway;
// end::autowireGateway[]
@PostMapping("/publishMessage")
public ResponseEntity<String> publishMessage(@RequestParam("message") String message) {
project1PubsubOutboundGateway.sendToPubsub(message);
project2PubsubOutboundGateway.sendToPubsub(message);
return ResponseEntity.ok("OK");
}
}
Проверьте журналы, чтобы увидеть, что обмен сообщениями работает
оформить заказ git проект для более подробной информации: https://github.com/olgmaks/spring-gcppubsub-multiproject