YamlConfigurationLoader не может загрузить конфигурацию cassandra при сокращении задания - PullRequest
0 голосов
/ 17 октября 2018

Я пытаюсь запустить задачу сокращения цикла, которая записывает вывод в таблицу в cassandra.Мое сокращение задания и конфигурация задания выглядят следующим образом:

public static class RankReducer extends Reducer<IntWritable, WidgetHits, ByteBuffer, List<Mutation>> {
    private static MultipleOutputs<ByteBuffer, List<Mutation>> output;

    public void reduce(IntWritable key, Iterable<WidgetHits> values, Context context)
            throws IOException, InterruptedException {
        output = new MultipleOutputs<ByteBuffer, List<Mutation>>(context);

        ArrayList<WidgetHits> ranking = new ArrayList<WidgetHits>();
        for (WidgetHits val : values) {
            for(int i = 0; i < 10; i++) {
                if(i == ranking.size() || val.getHits() > ranking.get(i).getHits()) {
                    ranking.add(i, new WidgetHits(val.getWidget(), val.getHits()));
                    break;
                }
            }
        }

        for(int i = 0; i < ranking.size() && i < 10; i++) {
            List<ByteBuffer> rankByteList = new ArrayList<ByteBuffer>();
            rankByteList.add(ByteBufferUtil.bytes(i + 1));

            ByteBuffer airportBytes = ByteBufferUtil.bytes(ranking.get(i).getWidget());

            output.write(tableName, airportBytes, rankByteList);
        }
    }

    private ByteBuffer bytes(String val) {
        return ByteBufferUtil.bytes(val.toString());
    }
}

Конфигурация задания:

Job rankJob = Job.getInstance(conf, "Widget Ranking Ranker");
    rankJob.setJarByClass(WidgetRanking.class);
    rankJob.setMapperClass(RankMapper.class);
    rankJob.setReducerClass(RankReducer.class);
    rankJob.setInputFormatClass(SequenceFileInputFormat.class);
    rankJob.setMapOutputKeyClass(IntWritable.class);
    rankJob.setMapOutputValueClass(WidgetHits.class);
    rankJob.setOutputKeyClass(ByteBuffer.class);
    rankJob.setOutputValueClass(List.class);
    rankJob.setOutputFormatClass(CqlBulkOutputFormat.class);
    FileInputFormat.addInputPath(rankJob, new Path("temp/" + outputCode + "/"));

    ConfigHelper.setOutputRpcPort(rankJob.getConfiguration(), "9160");

    ConfigHelper.setOutputInitialAddress(rankJob.getConfiguration(), "localhost");

    ConfigHelper.setOutputColumnFamily(rankJob.getConfiguration(), "widgetspace", "widgetRanking");
    ConfigHelper.setOutputPartitioner(rankJob.getConfiguration(), "Murmur3Partitioner");

Приведенный выше код все еще плохо протестирован и, вероятно, содержит ошибки.

IЯ запускаю это на одной машине в псевдораспределенном режиме с намерением развернуть его на реальном кластере позже.HDFS и Yarn активируются в соответствии с инструкциями здесь .

Ошибка при запуске:

org.apache.cassandra.exceptions.ConfigurationException: Expecting URI in variable: [cassandra.config]. Found[cassandra.yaml]. Please prefix the file with [file:///] for local files and [file://<server>/] for remote files. If you are executing this from an external tool, it needs to set Config.setClientMode(true) to avoid loading configuration.
at org.apache.cassandra.config.YamlConfigurationLoader.getStorageConfigURL(YamlConfigurationLoader.java:80)

Несколько очевидных вещей, которые я пытался вставить в свой собственный код:

System.setProperty("cassandra.config", "file:///home/[user]/apache-cassandra-3.9/conf/cassandra.yaml");

Config.setClientMode(true);

rankJob.getConfiguration().set("cassandra.config", "file:///home/[user]/apache-cassandra-3.9/conf/cassandra.yaml");

Но, похоже, они ничего не делают.Тем не менее говорит "Found [cassandra.yaml]".

В настоящее время работает Hadoop 2.9.1 и cassandra 3.9 (я получаю nullpointerexception для объекта конфигурации, который не загружается в cassandra 3.11.3, поэтому в основном та же ошибкано менее четко описано в выходных данных)

Нужно ли указывать путь конфигурации кассандры в другом месте?

...