Как сделать некоторые команды kafka в API api балерины - PullRequest
0 голосов
/ 08 октября 2018

Можно ли добиться этого в балерине

  1. Для создания новой темы кафки в балерине
  2. Для просмотра списка доступных тем в балерине
  3. Подписаться на созданныйтема в балерине

Ответы [ 2 ]

0 голосов
/ 16 октября 2018

РЕДАКТИРОВАТЬ: обновить образцы кода для соответствия последней версии балерины (от V0.990.0 и выше).

Вы можете

  1. Создать новыйtopic

Если вы отправляете данные, используя Kafka producer, они будут публиковать данные по этой конкретной теме, а если тема недоступна, она будет создавать тему и публиковать.Считайте, что вы хотите опубликовать в теме test от производителя.Вы можете создать конечную точку производителя с именем sampleProducer и отправить данные в определенную тему, используя функцию send().

kafka:SimpleProducer sampleProducer = new ({
  bootstrapServers: "localhost:9090",
  acks: "all",
});

string topic = "test";
string msg = "Your Message";
byte[] messageToPublish = msg.toByteArray("UTF-8");
sampleProducer->send(messageToPublish, topic);`

Теперь, если существует тема с названием test, доступнадля брокера Kafka, размещенного на localhost:9090, он опубликует сообщение в теме или создаст тему, если ее не существует.

Подписаться на новую тему

Вы можете использовать Kafka:SimpleConsumer.subscribe() для подписки на тему.

listener kafka:SimpleConsumer sampleConsumer = new ({
  bootstrapServers: "localhost:9090",
  groupId: "test-consumers",
  autoCommit: false
});

string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);

Обратите вниманиечто subscribe() принимает string[] в качестве входного параметра, следовательно, вы должны передать ему string[].

Существуют и другие функции, такие как subscribeToPattern(), subscribeWithPartitionRebalance(), которые также можно использоватьЧтобы подписать потребителя на тему, вы можете найти больше о них в Документация API .

Но для перечисления доступных тем вам нужно получить список тем от самого зоопарка.Но вы можете получить список тем, на которые в данный момент подписан конкретный потребитель, используя ballerina.

string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
  // Your logic for handling the error
} else {
    subscribedTopics = result;
}

Обязательно обработайте здесь ошибку, так как getSubscription() может вернуть либоstring[] или error.Охранник типа балерины может помочь вам.

0 голосов
/ 08 октября 2018

Вы можете подписаться на тему, используя следующий код:

import ballerina/log;
import wso2/kafka;
import ballerina/internal;

// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
    bootstrapServers: "localhost:9092, localhost:9093",
    // Consumer group ID
    groupId: "test-group",
    // Listen from topic 'test'
    topics: ["test"],
    // Poll every 1 second
    pollingInterval:1000
};

// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
    // Triggered whenever a message added to the subscribed topic
    onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
        // Dispatched set of Kafka records to service, We process each one by one.
        foreach entry in records {
            byte[] serializedMsg = entry.value;
            // Convert the serialized message to string message
            string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
            log:printInfo("New message received from the product admin");
            // log the retrieved Kafka record
            log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
            // Mock logic
            // Update the database with the new price for the specified product
            log:printInfo("Database updated with the new price of the product");
        }
    }
}

Это Github репо может быть весьма полезным для вас.Он содержит различные примеры для потребителей и производителей.

Что касается ваших вопросов по созданию и перечислению тем, если вам не нужно выполнять эти действия от Ballerina, вы можете сделать это из командной строки:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test
...