Можно ли использовать счетчик в двойном типе для штормового состояния базы / картографического трезубца? - PullRequest
0 голосов
/ 28 августа 2018

Пример подсчета слов использует LONG в качестве счетчика для завершения штормового состояния / топографического потока / топологии. Я протестировал пример OK и изменил тип данных с Long на Double. И результат странный, только ключ строки "the" записан в HBASE OK, другие слова не учитываются. Всякий раз, когда я сканирую hbase в оболочке hbase, меняется только число «the», другие слова вообще не имеют результата.

Вот мой код:

public class HbaseWordCountTridentTopolopgyAggedSumDouble {
    static public class WordCountValueMapper implements HBaseValueMapper {
        @Override
        public List<Values> toValues(ITuple tuple, Result result) throws Exception {
            List<Values> values = new ArrayList<Values>();
            Cell[] cells = result.rawCells();
            for(Cell cell : cells) {

                String colName = Bytes.toString(CellUtil.cloneQualifier(cell));
                Values value = null;
                value = new Values (colName, Bytes.toFloat(CellUtil.cloneValue(cell)));
                values.add(value);
            }
            return values;
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("columnName","columnValue"));
        }

    }

    public static class One extends BaseFunction {

        public One() {
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            String word = tuple.getString(0);
            //RANDOM.nextGaussian()
            collector.emit(new Values(0.333333f));

        }
    }

    public static StormTopology buildTopology() {
        Fields fields = new Fields("word", "count");

        FixedBatchSpout spout2Split = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
        spout2Split.setCycle(true);

        TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
                .withColumnFamily("INFO")
                //.withColumnFamily("info")
                .withColumnFields(new Fields("word"))
                .withCounterFields(new Fields("count"))
                .withRowKeyField("word");

        HBaseValueMapper rowToStormValueMapper = new WordCountValueMapper();

        HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria();
        projectionCriteria.addColumn(new HBaseProjectionCriteria.ColumnMetaData("INFO", "count"));

        HBaseState.Options options = new HBaseState.Options()
                //.withConfigKey(hbaseRoot)
                //.withConfigKey("hbase.conf")
                .withDurability(Durability.SYNC_WAL)
                .withMapper(tridentHBaseMapper)
                .withProjectionCriteria(projectionCriteria)
                .withRowToStormValueMapper(rowToStormValueMapper)
                //.withTableName("WordCount");
                .withTableName("test_HbaseWordCountTridentTopolopgy");

        StateFactory factory = new HBaseStateFactory(options);

        TridentTopology topology = new TridentTopology();

        Stream stream =
                topology.newStream("spout2Split", spout2Split)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .each(new Fields("word"), new One(), new Fields("one"))
                        .groupBy(new Fields("word"))
                        .aggregate(new Fields("one"), new Sum(), new Fields("count"))
                ;
        stream.partitionPersist(factory, fields,  new HBaseUpdater(), new Fields());
        TridentState state = topology.newStaticState(factory);

        stream = stream.stateQuery(state, new Fields("word"), new HBaseQuery(), new Fields("columnName","columnValue"));
        stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
        return topology.build();
    }

    public static void main(String[] args) throws Exception{
        Map<String, Object> hbConf = new HashMap<String, Object>();
        hbConf.put("hbase.rootdir", "/opt/cloudera/parcels/CDH/lib/hbase");
        hbConf.put("hbase.zookeeper.quorum", "beta-hbase02:2181,beta-hbase03:2181,beta-hbase04:2181");
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        boolean checkLocal = Arrays.stream(args).map(arg -> arg.equals("local")).reduce((arg1, arg2) -> arg1 | arg2 ).orElse(false);
        if (checkLocal) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("HbaseWordCountTridentTopolopgy", conf, buildTopology());
            Thread.sleep(60 * 1000);
        }else {//if(args.length == 2) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology("hbase-word-count-trident", conf, buildTopology());
        }
    }

}[![enter image description here][1]][1]

enter image description here

...