Trident OpaqueKafkaSpout: транзакции не совершены - PullRequest
0 голосов
/ 23 ноября 2018

Итак, я использую storm-kafka-client, чтобы подписать мою топологию на тему Kafka.Так как мне нужна семантика ровно один раз, я использую Opaque Spout, однако первая транзакция никогда не фиксируется, она сохраняется в состоянии и затем периодически переигрывается.Если я добавлю больше данных в Kafka, эти данные также не будут сохранены в состоянии, потому что предыдущая транзакция не была зафиксирована.Используемое мной состояние - это также состояние Кафки.

Я использую Strom 1.2.1 и Kafka 1.0.1, если это имеет значение.

edit: это топология I 'м.

Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);

KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());

TridentTopology topology = new TridentTopology();

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
        .withProducerProperties(props)
        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
        .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));

Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
        .each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());

LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());

И это то, что я получаю в журналах, когда получаю сообщение от Кафки в режиме отладки.

2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1

Активирую отладку, ведя журнал на MasterCoordinator, который я получаюэти ошибки периодически.

2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6

Судя по всему, пакеты с ошибками пусты.Если в теме есть данные при запуске топологии, она работает, потому что заранее нет пустых пакетов, однако после того, как первый пакет зафиксирован (тот, что с данными), пустые пакеты начинают давать сбой, и новые данные Kafka не обрабатываются, потому что есть незафиксированныесделки.Если тема пуста, она постоянно терпит неудачу и ничего не толкает к Кафке.

...