Тестирование функций Apache Pulsar во встроенной автономной среде. - PullRequest
0 голосов
/ 09 июня 2019

Для тестирования мне удалось запустить встроенный автономный сервер и клиент.Я также могу отправлять и получать сообщения.Однако я действительно хочу (интеграция) тестировать функции (реализуя org.apache.pulsar.functions.api.Function).Как я могу зарегистрировать функции во встроенной настройке?

package kic.data.stream.pulsar

import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification

import java.util.concurrent.TimeUnit

@Log
class PulsarEmbeddedTest extends Specification {

    static final String TOPIC = "hello";
    static final int NUM_OF_MESSAGES = 100;
    static PulsarStandalone standalone
    static PulsarService pulsarService

    def setupSpec() {
        def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
        def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
        log.info("${PulsarStandalone.properties}")
        standalone = PulsarStandaloneBuilder.instance()
                                            .withConfig(conf)
                                            .withNoStreamStorage(true)
                                            .build()
        standalone.configFile = configFile
        standalone.start()
        pulsarService = new PulsarService(conf)
    }

    def test() {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsarService.brokerServiceUrl)
                .build()

        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic(TOPIC)
                .enableBatching(false)
                .create()

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic(TOPIC)
                //.subscriptionInitialPosition()
                .subscriptionName("test-subs-1")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .messageListener(Mesa)
                .subscribe()



        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            producer.send("Hello_" + i)
        }


        Message<String> message
        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            // This calls blocks until a message is available.
            message = consumer.receive(1, TimeUnit.SECONDS)
            //log.info("Message received : ${message.getValue()}")
            println("Message received : ${message.messageId}:${message.value}")

            consumer.acknowledge(message)
        }

        producer.close()
        consumer.close()
        client.close()

        expect:
        1==1

    }

    def cleanupSpec() {
        standalone.close()
    }

}

1 Ответ

1 голос
/ 09 июня 2019

Вы должны иметь возможность создавать функции Pulsar через API-интерфейс Pulsar Admin так же, как и для обычного кластера Pulsar, например,

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")

pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());

В проекте Apache Pulsar также имеется довольно много интеграционных тестов для тестирования функций Pulsar. Существуют настоящие интеграционные тесты, основанные на докере, и есть тесты на интеграцию с одним процессом. Вот пример одиночного процесса «интеграционных» тестов, на который вы можете сослаться:

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java

...