Функция Apache Pulsar не может считать значение счетчика как context.getCounter()
.
Тестирование с использованием локального компьютера, одной функции и запуска тестировщика-изготовителя и потребителя из CLI.Функция читает подписку и печатает ее имя функции (из контекста), поэтому кажется, что проблема заключается только в доступе к значению счетчика.
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class App implements Function<String, String> {
@Override
public String process(String input, Context context) {
System.out.println(context.getFunctionName()); //this works
context.incrCounter("ABC",1);
Long n = context.getCounter("ABC");
System.out.println(context.getCounter("ABC")); //this doesn't
return n.toString(); //consumer on the topic doesn't get any messages
}
}
(все команды CLI на том же AWS ec2)
bin/pulsar-admin functions localrun \
--jar /home/ubuntu/population_watch/0_ingestion/pulsar-functions-0.0.1-SNAPSHOT.jar \
--className xyz.datapipeline.pulsar_functions.App \
--inputs persistent://public/default/testinput1 \
--output persistent://public/default/testoutput1 \
--name myTestFunction
(потребитель)
bin/pulsar-client consume -n 10 -s “test” persistent://public/default/testoutput1
(производитель)
bin/pulsar-client produce -m “Hello” persistent://public/default/testinput1
Функция должна увеличивать счетчик для клавиши "ABC" на 1 при каждой отправке сообщения, и последующиеgetCounter()
должно возвращать значение (т. Е. 1 ... затем 2 .... затем 3 ... с каждым сообщением, отправленным производителем).
РЕДАКТИРОВАТЬ: Завершение работы моих экземпляров AWS и повторное создание кластерарешил проблему, так что, похоже, это было связано с конфигурацией.