Классическая топология подсчета слов в Storm с использованием 2 очередей RabbitMQ - PullRequest
1 голос
/ 29 июня 2019

Мне нужно написать простую топологию подсчета слов в Java и Storm.В частности, у меня есть внешний источник данных, генерирующий строку CSV (через запятую), такую ​​как

Даниэль, 0.5654, 144543, пользователь, 899898, комментарий ,,,

Этистроки вставляются в очередь RabbitMQ, называемую «вход».Этот источник данных работает хорошо, и я вижу строки в очереди.


Теперь я изменил классическую топологию, добавив RabbitMQSpout.Цель состоит в том, чтобы подсчитать количество слов для первого поля каждой строки CSV и опубликовать результаты в новую очередь с именем «output».Проблема в том, что я не вижу кортежей внутри новой очереди, но топология была отправлена ​​и RUNNING .

Итак, суммируя:

  1. внешние данныеисточник помещает элементы в input queue
  2. RabbitMQSpout принимает элементы из input очереди и вставляет их в топологию
  3. classic word-счетная топология выполнена
  4. последний болт помещает результаты в output queue

Проблема: я могу видеть элементы в input очереди, но ничегов вывод , даже если я использовал тот же метод для отправки элемента в очередь во внешнем источнике данных (и это работает) и в RabbitMQExporter (не работает ...)

Некоторый код ниже


RabbitMQSpout

public class RabbitMQSpout extends BaseRichSpout {

    public static final String DATA = "data";

    private SpoutOutputCollector _collector;
    private RabbitMQManager rabbitMQManager;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        _collector = _collector;
        rabbitMQManager = new RabbitMQManager("localhost", "rabbitmq", "rabbitmq", "test");
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);

        String data = rabbitMQManager.receive("input");

        if (data != null) {
            _collector.emit(new Values(data));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(DATA));
    }
}


SplitBolt

public class SplitBolt extends BaseRichBolt {

    private OutputCollector _collector;

    public SplitSentenceBolt() { }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this._collector = collector;
        this.SPACE = Pattern.compile(",");
    }

    @Override
    public void execute(Tuple input) {
        String sentence = input.getStringByField(RabbitMQSpout.DATA);
        String[] words = sentence.split(",");

        if (words.length > 0)
            _collector.emit(new Values(words[0]));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

WordCountBolt

public class WordCountBolt extends BaseBasicBolt {

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        counts.put(word, count);
        System.out.println(count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

RabbitMQExporterBolt

    public RabbitMQExporterBolt(String rabbitMqHost, String rabbitMqUsername, String rabbitMqPassword,
                                String defaultQueue) {
        super();
        this.rabbitMqHost = rabbitMqHost;
        this.rabbitMqUsername = rabbitMqUsername;
        this.rabbitMqPassword = rabbitMqPassword;
        this.defaultQueue = defaultQueue;
    }

    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

        this.collector=outputCollector;
        this.rabbitmq = new RabbitMQManager(rabbitMqHost, rabbitMqUsername, rabbitMqPassword, defaultQueue);

    }

    @Override
    public void execute(Tuple tuple) {

        String word = tuple.getString(0);
        Integer count = tuple.getInteger(1);

        String output = word + " " + count;
        rabbitmq.send(output);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }

}

Топология

public class WordCountTopology {

    private static final String RABBITMQ_HOST = "rabbitmq";
    private static final String RABBITMQ_USER = "rabbitmq";
    private static final String RABBITMQ_PASS = "rabbitmq";
    private static final String RABBITMQ_QUEUE = "output";

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RabbitMQSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 1)
                .shuffleGrouping("spout");

        builder.setBolt("count", new WordCountBolt(), 1)
                .fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {

            builder.setBolt("exporter",
                    new RabbitMQExporterBolt(
                            RABBITMQ_HOST, RABBITMQ_USER,
                            RABBITMQ_PASS, RABBITMQ_QUEUE ),
                    1)
               .shuffleGrouping("count");

            conf.setNumWorkers(3);

            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

        } else {

            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();

        }
    }



}

RabbitMQManager

public class RabbitMQManager {

    private String host;
    private String username;
    private String password;
    private ConnectionFactory factory;
    private Connection connection;

    private String defaultQueue;

    public RabbitMQManager(String host, String username, String password, String queue) {
        super();
        this.host = host;
        this.username = username;
        this.password = password;

        this.factory = null;
        this.connection = null;
        this.defaultQueue = queue;

        this.initialize();
        this.initializeQueue(defaultQueue);

    }

    private void initializeQueue(String queue){

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        Connection connection;
        try {
            connection = factory.newConnection();
            Channel channel = connection.createChannel();

            boolean durable = false;
            boolean exclusive = false; 
            boolean autoDelete = false;

            channel.queueDeclare(queue, durable, exclusive, autoDelete, null);

            channel.close();
            connection.close();

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

    }

    private void initialize(){

        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        try {

            connection = factory.newConnection();

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void terminate(){

        if (connection != null && connection.isOpen()){
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private boolean reopenConnectionIfNeeded(){

        try {

            if (connection == null){
                connection = factory.newConnection();
                return true;
            }

            if (!connection.isOpen()){
                connection = factory.newConnection();
            }

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            return false;
        }

        return true;

    }

    public boolean send(String message){
        return this.send(defaultQueue, message);
    }

    public boolean send(String queue, String message){

        try {

            reopenConnectionIfNeeded();
            Channel channel = connection.createChannel();
            channel.basicPublish("", queue, null, message.getBytes());
            channel.close();

            return true;

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

        return false;

    }

    public String receive(String queue) {

        try {
            reopenConnectionIfNeeded();
            Channel channel = connection.createChannel();
            Consumer consumer = new DefaultConsumer(channel);
            return channel.basicConsume(queue, true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}
...