РЕДАКТИРОВАТЬ: обновить образцы кода для соответствия последней версии балерины (от V0.990.0 и выше).
Вы можете
- Создать новыйtopic
Если вы отправляете данные, используя Kafka producer
, они будут публиковать данные по этой конкретной теме, а если тема недоступна, она будет создавать тему и публиковать.Считайте, что вы хотите опубликовать в теме test
от производителя.Вы можете создать конечную точку производителя с именем sampleProducer
и отправить данные в определенную тему, используя функцию send()
.
kafka:SimpleProducer sampleProducer = new ({
bootstrapServers: "localhost:9090",
acks: "all",
});
string topic = "test";
string msg = "Your Message";
byte[] messageToPublish = msg.toByteArray("UTF-8");
sampleProducer->send(messageToPublish, topic);`
Теперь, если существует тема с названием test
, доступнадля брокера Kafka, размещенного на localhost:9090
, он опубликует сообщение в теме или создаст тему, если ее не существует.
Подписаться на новую тему
Вы можете использовать Kafka:SimpleConsumer.subscribe()
для подписки на тему.
listener kafka:SimpleConsumer sampleConsumer = new ({
bootstrapServers: "localhost:9090",
groupId: "test-consumers",
autoCommit: false
});
string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);
Обратите вниманиечто subscribe()
принимает string[]
в качестве входного параметра, следовательно, вы должны передать ему string[]
.
Существуют и другие функции, такие как subscribeToPattern()
, subscribeWithPartitionRebalance()
, которые также можно использоватьЧтобы подписать потребителя на тему, вы можете найти больше о них в Документация API .
Но для перечисления доступных тем вам нужно получить список тем от самого зоопарка.Но вы можете получить список тем, на которые в данный момент подписан конкретный потребитель, используя ballerina.
string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
// Your logic for handling the error
} else {
subscribedTopics = result;
}
Обязательно обработайте здесь ошибку, так как getSubscription()
может вернуть либоstring[]
или error
.Охранник типа балерины может помочь вам.