GCP PubSub: создание класса конфигурации для публикации сообщений sh - PullRequest
1 голос
/ 27 февраля 2020

Я пытаюсь создать отдельный класс только для подключения к GCPPUB. У меня есть следующий класс, который успешно подключается и сообщения pu sh для gcp. Но проблема в том, что каждый раз, когда он проверяет соединение из моего приложения, используя учетные данные GCP. Я хочу получить соединение один раз после того, как эти сообщения публикуются sh непрерывно? Есть ли способ, я могу отделить свой код так, чтобы получать соединение, только если существующее соединение является нулевым, и после него публикуются sh сообщения? И это правильно получить только одно соединение?

@Slf4j
public class GCPMessagePublisher {

    private static final String PROJECT_ID = "myprojectId";
    Publisher publisher = null;

    public void putMessageOnGCP(String message) throws Exception
    {
        log.info("The outgoing message to GCP PUBSUB is : "+message);
        // topic id, eg. "my-topic"
        String topicId = "topic_name";
        int messageCount = 10;
        ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);

        List<ApiFuture<String>> futures = new ArrayList<>();

        try {
            GoogleCredentials credentials = GoogleCredentials.fromStream(
                    new FileInputStream("gcp credential here ........"));

            publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();

            for (int i = 0; i < messageCount; i++) {

                // convert message to bytes
                ByteString data = ByteString.copyFromUtf8(message);
                PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                        .setData(data)
                        .build();

                // Schedule a message to be published. Messages are automatically batched.
                ApiFuture<String> future = publisher.publish(pubsubMessage);
                futures.add(future);
            }
        } finally {
            // Wait on any pending requests
            List<String> messageIds = ApiFutures.allAsList(futures).get();

            for (String messageId : messageIds) {
                System.out.println(messageId);
            }

            if (publisher != null) {
                // When finished with the partypublisher, shutdown to free up resources.
                publisher.shutdown();
            }
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 29 февраля 2020

Наконец, я публикую решение, которое я использовал для разделения указанного выше класса на два отдельных метода: один для чистого GCP Config, а другой просто для публикации sh сообщений в GCPPUB.

** Примечание: ** I использовал SpringBoot для разработки моего приложения, поэтому мы можем увидеть некоторые аннотации Spring

@Service
public class GCPMessagePublisher
{

   @Value("${gcp.proj.id}")
    private static String projectId;

    @Value(("${gcp.pubsub.topic.name}"))
    private static  String topicId;


    private static  Publisher publisher = null;


    public static void buildGCPConfiguration() throws Exception
    {

        ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
        try
        {
            GoogleCredentials credentials = GoogleCredentials.fromStream(
                    new FileInputStream("gcp credential here use the file path to key.json which you downloaded from google console........"));

            publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();

        }catch (Exception e)
        {
            log.error("Exception Thrown while connecting gcp"+e);
        }
    }

    public void putMessageOnGCP (String message) throws Exception
    {
        log.info("Sending Message to GCP PUBSUB......"+message)
        List<ApiFuture<String>> futures = new ArrayList<>();
        if (publisher == null) {
            buildGCPConfiguration();
        }

        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
        // Schedule a message to be published. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);

    }


}
0 голосов
/ 27 февраля 2020

Вы объявляете как переменную уровня класса с именем publisher, так и переменную внутри putMessageOnGCP, называемую publisher. Вы можете использовать только переменную уровня класса и заполнить ее, когда она равна нулю:

@Slf4j
public class GCPMessagePublisher {

    private static final String PROJECT_ID = "myprojectId";
    Publisher publisher = null;

    public void putMessageOnGCP(String message) throws Exception
    {
        log.info("The outgoing message to GCP PUBSUB is : "+message);
        // topic id, eg. "my-topic"
        String topicId = "topic_name";
        int messageCount = 10;
        ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
        List<ApiFuture<String>> futures = new ArrayList<>();

        try {
            if (publisher == null) {
                GoogleCredentials credentials = GoogleCredentials.fromStream(
                        new FileInputStream("gcp credential here ........"));

                publisher = publisher.newBuilder(topicName).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
            }

            for (int i = 0; i < messageCount; i++) {

                // convert message to bytes
                ByteString data = ByteString.copyFromUtf8(message);
                PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                        .setData(data)
                        .build();

                // Schedule a message to be published. Messages are automatically batched.
                ApiFuture<String> future = publisher.publish(pubsubMessage);
                futures.add(future);
            }
        } catch (Exception e) {
            System.err.println("Could not publish messages: " + e);
        } finally {
            // Wait on any pending requests
            List<String> messageIds = ApiFutures.allAsList(futures).get();

            for (String messageId : messageIds) {
                System.out.println(messageId);
            }
        }
    }
}

Обратите внимание, что не будет вызывать завершение работы в конце putMessageOnGCP, так как это приведет к тому, что издатель не будет использоваться. Если вам нужно закрыть издателя в какой-то момент, вам потребуется другой метод, который вы бы вызвали в нужное время выключения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...