У меня есть следующая проблема, которую я пытаюсь решить с помощью каскадирования: у меня есть CSV-файл записей со структурой: o, a, f, i, c
Мне нужно агрегировать записи по o, a, f и суммировать i и c по группе.
Например:
100,200,300,5,1
100,200,300,6,2
101,201,301,20,5
101,201,301,21,6
должен дать:
100,200,300,11,3
101,201,301,41,11
Я не мог понять, как объединить 2 экземпляра Every, которые у меня есть (могу ли я объединить оба поля одновременно?).
Есть идеи?
Йоси
public class CascMain {
public static void main(String[] args){
Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new Lfs(sourceScheme, "/tmp/casc/group.csv");
Scheme sinkScheme = new TextDelimited(new Fields("o", "a", "f", "ti", "tc"), ",");
Tap sink = new Lfs(sinkScheme, "/tmp/casc/output/", SinkMode.REPLACE);
Pipe assembly = new Pipe("agg-pipe");
Function function = new RegexSplitter(new Fields("o", "a", "f", "i", "c"), ",");
assembly = new Each(assembly, new Fields("line"), function);
Pipe groupAssembly = new GroupBy("group", assembly, new Fields("o", "a", "f"));
Sum impSum = new Sum(new Fields("ti"));
Pipe i = new Every(groupAssembly, new Fields("i"), impSum);
Sum clickSum = new Sum(new Fields("tc"));
Pipe c = new Every(groupAssembly, new Fields("c"), clickSum);
// WHAT SHOULD I DO HERE
Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties, CascMain.class);
FlowConnector flowConnector = new FlowConnector(properties);
Flow flow = flowConnector.connect("agg", source, sink, assembly);
flow.complete();
}
}