Штормовой болт выполняет больше, чем его родитель - PullRequest
1 голос
/ 20 февраля 2020

У меня есть топология, которая содержит KafkaSpout и 2 болта.

BoltParseJsonInput и метод его выполнения:

public void execute(Tuple input) {
    // TODO Auto-generated method stub
    String data = input.getString(4);
    js = new JSONObject(data);

    String userId = js.getString("userId");
    String timestamp = js.getString("timestamp");
    counter++;
    System.out.println(counter);
    collector.emit(input, new Values(userId, timestamp));
    collector.ack(input);
}

BoltInsertRedis и метод его выполнения

    public void execute(Tuple input) {
    // TODO Auto-generated method stub
    String userId = input.getStringByField("userId");
    int timestamp = 0;
    try {

        timestamp = convertTimestampToEpoch(input.getStringByField("timestamp"));
    } catch (ParseException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

    String timestep = this.prefix + timestamp/10;
    String curTimestamp = jedis.hget(timestep, userId);
    if(curTimestamp == null || Integer.parseInt(curTimestamp) < timestamp) {
        jedis.hset(timestep, userId, Integer.toString(timestamp));
    }
    collector.ack(input);
}

BoltInsertRedis получает ввод от BoltParseJsonInput

builder.setBolt("ParseJsonInput-Bolt", new BoltParseJsonInput()).shuffleGrouping("Kafka-Spout");
    builder.setBolt("BoltRedisUserLastActive-Bolt", new BoltRedisUserLastActive()).shuffleGrouping("ParseJsonInput-Bolt");

Но когда я отправляю эту топологию в Storm, BoltInsertRedis выполняет больше, чем BoltParseJsonInput storm ui

Можете ли вы объяснить мне, что здесь проблема?

1 Ответ

0 голосов
/ 21 февраля 2020

Я обнаружил, что мой ParseJsonBolt сделал исключение в сообщении 25700, и в этот момент он продолжает повторное выполнение. Когда я сделал попытку поймать, это хорошо работает

...