Java – cascade – merge 2 aggregations
•
Java
I have the following problem whicj I try to solve with cascading: I have structured record CSV files: O, a, F, I, C
I need to summarize the records by O and F and add the I and C of each group
For example:
100,200,300,5,1
100,6,2
101,201,301,20,5
101,21,6
Output should be:
100,11,3
101,41,11
I can't understand how to merge every instance I own (can I aggregate two fields at the same time?)
Do you have any ideas?
Yosi
public class CascMain {
public static void main(String[] args){
Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new Lfs(sourceScheme,"/tmp/casc/group.csv");
Scheme sinkScheme = new TextDelimited(new Fields("o","a","f","ti","tc"),",");
Tap sink = new Lfs(sinkScheme,"/tmp/casc/output/",SinkMode.REPLACE);
Pipe assembly = new Pipe("agg-pipe");
Function function = new RegexSplitter(new Fields("o","i","c"),");
assembly = new Each(assembly,new Fields("line"),function);
Pipe groupAssembly = new GroupBy("group",assembly,new Fields("o","f"));
Sum impSum = new Sum(new Fields("ti"));
Pipe i = new Every(groupAssembly,new Fields("i"),impSum);
Sum clickSum = new Sum(new Fields("tc"));
Pipe c = new Every(groupAssembly,new Fields("c"),clickSum);
// WHAT SHOULD I DO HERE
Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties,CascMain.class);
FlowConnector flowConnector = new FlowConnector(properties);
Flow flow = flowConnector.connect("agg",source,sink,assembly);
flow.complete();
}
}
Solution
Aggregate multiple fields simultaneously using aggregateby:
SumBy impSum = new SumBy(new Fields("i"),new Fields("ti"),long.class);
SumBy clickSum = new SumBy(new Fields("c"),new Fields("tc"),long.class);
assembly = new AggregateBy("totals",Pipe.pipes(assembly),"f"),2,impSum,clickSum);
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
二维码
