Я пытаюсь запустить задачу сокращения цикла, которая записывает вывод в таблицу в cassandra.Мое сокращение задания и конфигурация задания выглядят следующим образом:
public static class RankReducer extends Reducer<IntWritable, WidgetHits, ByteBuffer, List<Mutation>> {
private static MultipleOutputs<ByteBuffer, List<Mutation>> output;
public void reduce(IntWritable key, Iterable<WidgetHits> values, Context context)
throws IOException, InterruptedException {
output = new MultipleOutputs<ByteBuffer, List<Mutation>>(context);
ArrayList<WidgetHits> ranking = new ArrayList<WidgetHits>();
for (WidgetHits val : values) {
for(int i = 0; i < 10; i++) {
if(i == ranking.size() || val.getHits() > ranking.get(i).getHits()) {
ranking.add(i, new WidgetHits(val.getWidget(), val.getHits()));
break;
}
}
}
for(int i = 0; i < ranking.size() && i < 10; i++) {
List<ByteBuffer> rankByteList = new ArrayList<ByteBuffer>();
rankByteList.add(ByteBufferUtil.bytes(i + 1));
ByteBuffer airportBytes = ByteBufferUtil.bytes(ranking.get(i).getWidget());
output.write(tableName, airportBytes, rankByteList);
}
}
private ByteBuffer bytes(String val) {
return ByteBufferUtil.bytes(val.toString());
}
}
Конфигурация задания:
Job rankJob = Job.getInstance(conf, "Widget Ranking Ranker");
rankJob.setJarByClass(WidgetRanking.class);
rankJob.setMapperClass(RankMapper.class);
rankJob.setReducerClass(RankReducer.class);
rankJob.setInputFormatClass(SequenceFileInputFormat.class);
rankJob.setMapOutputKeyClass(IntWritable.class);
rankJob.setMapOutputValueClass(WidgetHits.class);
rankJob.setOutputKeyClass(ByteBuffer.class);
rankJob.setOutputValueClass(List.class);
rankJob.setOutputFormatClass(CqlBulkOutputFormat.class);
FileInputFormat.addInputPath(rankJob, new Path("temp/" + outputCode + "/"));
ConfigHelper.setOutputRpcPort(rankJob.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(rankJob.getConfiguration(), "localhost");
ConfigHelper.setOutputColumnFamily(rankJob.getConfiguration(), "widgetspace", "widgetRanking");
ConfigHelper.setOutputPartitioner(rankJob.getConfiguration(), "Murmur3Partitioner");
Приведенный выше код все еще плохо протестирован и, вероятно, содержит ошибки.
IЯ запускаю это на одной машине в псевдораспределенном режиме с намерением развернуть его на реальном кластере позже.HDFS и Yarn активируются в соответствии с инструкциями здесь .
Ошибка при запуске:
org.apache.cassandra.exceptions.ConfigurationException: Expecting URI in variable: [cassandra.config]. Found[cassandra.yaml]. Please prefix the file with [file:///] for local files and [file://<server>/] for remote files. If you are executing this from an external tool, it needs to set Config.setClientMode(true) to avoid loading configuration.
at org.apache.cassandra.config.YamlConfigurationLoader.getStorageConfigURL(YamlConfigurationLoader.java:80)
Несколько очевидных вещей, которые я пытался вставить в свой собственный код:
System.setProperty("cassandra.config", "file:///home/[user]/apache-cassandra-3.9/conf/cassandra.yaml");
Config.setClientMode(true);
rankJob.getConfiguration().set("cassandra.config", "file:///home/[user]/apache-cassandra-3.9/conf/cassandra.yaml");
Но, похоже, они ничего не делают.Тем не менее говорит "Found [cassandra.yaml]".
В настоящее время работает Hadoop 2.9.1 и cassandra 3.9 (я получаю nullpointerexception для объекта конфигурации, который не загружается в cassandra 3.11.3, поэтому в основном та же ошибкано менее четко описано в выходных данных)
Нужно ли указывать путь конфигурации кассандры в другом месте?