Я использую storm + kafka + protobuf для создания своей системы потоковых процессов.
Проблема в том, что KafkaTridentSpoutOpaque неоднократно потребляет последнее сообщение.Мне нужен только один потребитель для каждого сообщения в kafka.
Ниже приведены некоторые детали:
Зависимость Java
storm-kafka-client1.2.2
Штурмовик 1.2.2
kafka_2.10 0.10.2.0
Компонент
kafka_2.12-2.0.0
apache-storm-1.2.2
Создание кода экземпляра KafkaTridentSpoutOpaque
protected static KafkaSpoutConfig<String, byte[]> newKafkaSpoutConfig(String bootstrapServers, String topic) {
KafkaSpoutConfig.Builder<String, byte[]> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setRecordTranslator(new JustValueFunc(), new Fields("str"))
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setProcessingGuarantee(AT_MOST_ONCE)
.build();
}
private static KafkaTridentSpoutOpaque<String, byte[]> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private static class JustValueFunc implements Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, byte[]> record) {
Values res = null;
try {
res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return res;
}
}
Вот мой код топологии
public static void main(String[] args) throws Exception {
StormTopology topology = getTridentTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
}
public static StormTopology getTridentTopology() {
final TridentTopology tridentTopology = new TridentTopology();
KafkaSpoutConfig<String, byte[]> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);
spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));
return tridentTopology.build();
}
Выходной журнал
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
Я просто создаю одно сообщение в Кафке, и оно должно быть только одноодин выход, но на самом деле их много.И это будет повторяться примерно каждые 45 минут.
Любая помощь приветствуется.
Спасибо.