Apache Storm 2.1.0 local DRP C не возвращает никакого ответа, хотя кортеж хорошо испускается в коллектор последним болтом - PullRequest
0 голосов
/ 22 января 2020

У меня проблема при попытке запустить топологию DRP C, содержащую один болт, и запросить ее через локальный кластер. После отладки с IntelliJ болт действительно выполняется, но JCQueue застревает в бесконечном l oop, после этого болт был выполнен и до отправки тайм-аута на сервер.

Вот код, используемый Чтобы построить построитель топологии:

public static LinearDRPCTopologyBuilder createBuilder()
{
    var bolt = new RedisSalesLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
    var builder = new LinearDRPCTopologyBuilder("sales");
    builder.addBolt(bolt, 1).localOrShuffleGrouping();
    return builder;
}

RedisSalesLookupBolt - это очень простая реализация IBasicBolt, выполняющая команду hget против Jedis. execute метод RedisSalesLookupBolt просто генерирует экземпляр Values, содержащий значение для двух полей, которые объявлены так:

declarer.declare(new Fields("id", "Value"));

Топология создается и запрашивается в модульном тесте, подобном этому :

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);

try(LocalDRPC drpc = new LocalDRPC())
{
       LocalCluster cluster = new LocalCluster();
       var builder = BasicRedisRPCTopology.createBuilder();
       LocalCluster.LocalTopology topo = cluster.submitTopology(
              "Sales-fetch", conf, builder.createLocalTopology(drpc));
       var result = drpc.execute("sales", "XXXXX");
       System.out.println("################ Result: " + result);
}
catch (Exception e)
{
       e.printStackTrace();
}

При чтении журналов, я уверен, что данные хорошо красные от болта и что все испускается enter image description here

Но на В конце концов, у меня есть этот отпечаток стека, аккуратно распечатанный моим методом испытаний. Конечно, никакая величина не присваивается переменной результата, и процесс никогда не достигает последних инструкций печати:

enter image description here

Есть что-то, что я могу не понять или что-то, чего мне здесь не хватает. Кажется, что JCQueue, используемый BoltExecutor для получения идентификатора, какой болт для выполнения никогда не заканчивается, хотя в локальный DRP C отправляется только один параметр и только один болт, объявленный в топологии. Я уже пытался добавить больше болтов в топологию или изменить реализацию компоновщика, которая использовалась для его создания, но безуспешно.

1 Ответ

0 голосов
/ 23 января 2020

Ну, я нашел решение, подходящее для моего варианта использования, используя Apache Storm 2.1.0.

Кажется, что вызов метода submitTopology локального кластера, как предложено в документации, не заканчивается Правильно исполнитель с версией 2.1.0, использующий LinearDRPCTopologyBuilder для построения топологии.

Если присмотреться к исходному коду, можно было понять, как применить LinearDRPCTopologyBuilder logi c к TopologyBuilder напрямую.

Вот изменение, примененное к методу createBuilder:

    public static TopologyBuilder createBuilder(Optional<ILocalDRPC> localDRPC)
    {
        var spout = localDRPC
                .map(drpc -> new DRPCSpout("sales", drpc))
                .orElse(new DRPCSpout("sales"));
        var bolt = new RedisSalesLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
        var builder = new TopologyBuilder();
        builder.setSpout("drpc", spout);
        builder.setBolt("redisLookup", bolt, 1)
               .shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults())
               .shuffleGrouping("redisLookup");
        return builder;
    }

А вот как выглядит модульный тест сейчас:

        Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(1);

        try(LocalDRPC drpc = new LocalDRPC())
        {
            LocalCluster cluster = new LocalCluster();
            var builder = BasicRedisRPCTopology.createBuilder(Optional.of(drpc));
            cluster.submitTopology("Sales-fetch", conf, builder.createTopology());
            var result = drpc.execute("sales", "XXXXX");
            System.out.println("################ Result: " + result);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }

К сожалению, это решение не позволяет использовать все встроенные инструменты LinearDRPCTopologyBuilder и подразумевает построение всего потока топологии «вручную». Необходимо изменить поведение сопоставителя, поскольку поля не отображаются в том же порядке, что и раньше.

...