Вы объявляете как переменную уровня класса с именем publisher, так и переменную внутри putMessageOnGCP
, называемую publisher. Вы можете использовать только переменную уровня класса и заполнить ее, когда она равна нулю:
@Slf4j
public class GCPMessagePublisher {
private static final String PROJECT_ID = "myprojectId";
Publisher publisher = null;
public void putMessageOnGCP(String message) throws Exception
{
log.info("The outgoing message to GCP PUBSUB is : "+message);
// topic id, eg. "my-topic"
String topicId = "topic_name";
int messageCount = 10;
ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
List<ApiFuture<String>> futures = new ArrayList<>();
try {
if (publisher == null) {
GoogleCredentials credentials = GoogleCredentials.fromStream(
new FileInputStream("gcp credential here ........"));
publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
}
for (int i = 0; i < messageCount; i++) {
// convert message to bytes
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(data)
.build();
// Schedule a message to be published. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} catch (Exception e) {
System.err.println("Could not publish messages: " + e);
} finally {
// Wait on any pending requests
List<String> messageIds = ApiFutures.allAsList(futures).get();
for (String messageId : messageIds) {
System.out.println(messageId);
}
}
}
}
Обратите внимание, что не будет вызывать завершение работы в конце putMessageOnGCP
, так как это приведет к тому, что издатель не будет использоваться. Если вам нужно закрыть издателя в какой-то момент, вам потребуется другой метод, который вы бы вызвали в нужное время выключения.