KafkaStream не закрывается правильно - PullRequest
0 голосов
/ 14 ноября 2018

Я закрываю KafkaStream, когда мне нужно, основываясь на определенном условии:

Закрытие:

if(kafkaStream == null) {
            logger.info("KafkaStream already closed?");
        } else {
            boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
            if(closed) {
                kafkaStream = null;
                logger.info("KafkaStream closed");
            } else {
                logger.info("KafkaStream could not closed");
            }
        }

Запуск:

if(kafkaStream == null) {
            logger.info("KafkaStream is starting");
            kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(),
                    this,
                    this.getTopic()
            );
            kafkaStream.start();
            logger.info("KafkaStream is started");
        }

В моей реализацииПроцессор, process(String key, byte[] value) все еще вызывается при успешном закрытии потока:

public abstract class BaseKafkaProcessor implements Processor<String, byte[]> {
    private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class);
    private ProcessorContext context;


    private ProcessorContext getContext() {
        return context;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.schedule(1000);
    }


    @Override
    public void process(String key, byte[] value) {
        try {
            String topic = key.split("-")[0];
            byte[] uncompressed = GzipCompressionUtil.uncompress(value);
            String json = new String(uncompressed, "UTF-8");
            processRecord(topic, json);
            this.getContext().commit();
        } catch (Exception e) {
            logger.error("Error processing json", e);
        }
    }

    protected abstract void processRecord(String topic, String json);

    @Override
    public void punctuate(long timestamp) {
        this.getContext().commit();
    }

    @Override
    public void close() {
        this.getContext().commit();
    }
}

Моя конфигурация для KafkaStreams:

application.id=dv_ws_in_app_activity_dev4
bootstrap.servers=VLXH1
auto.offset.reset=latest
num.stream.threads=1
key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
poll.ms = 100
commit.interval.ms=1000
state.dir=../../temp/kafka-state-dir

Это клиентское приложение использует версию 0.11.0.1 Кафки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...