Java – grouping in a simple aggregation storm topology
I'm trying to write a topology that does the following:
>Subscribe to a twitter feed (based on keywords) > an aggregation bolt, which is used to aggregate some tweets (such as n) in the collection and send them to the printer bolt > a simple bolt, which prints the collection to the console at once
In reality, I want to do more with my collection
I tested it locally and it looked like I was working However, I don't know if I set the grouping on the bolt correctly and can work properly when deployed on the actual storm cluster I would appreciate it if someone could help you view this topology and suggest any errors, changes or improvements
thank you.
This is my topology
builder.setSpout("spout",new TwitterFilterSpout("pittsburgh")); builder.setBolt("sampleaggregate",new SampleAggregatorBolt()) .shuffleGrouping("spout"); builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
Polymerization bolt
public class SampleAggregatorBolt implements IRichBolt { protected OutputCollector collector; protected Tuple currentTuple; protected Logger log; /** * Holds the messages in the bolt till you are ready to send them out */ protected List<Status> statusCache; @Override public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { this.collector = collector; log = Logger.getLogger(getClass().getName()); statusCache = new ArrayList<Status>(); } @Override public void execute(Tuple tuple) { currentTuple = tuple; Status currentStatus = null; try { currentStatus = (Status) tuple.getValue(0); } catch (ClassCastException e) { } if (currentStatus != null) { //add it to the status cache statusCache.add(currentStatus); collector.ack(tuple); //check the size of the status cache and pass it to the next stage if you have enough messages to emit if (statusCache.size() > 10) { collector.emit(new Values(statusCache)); } } } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweets")); } @Override public Map<String,Object> getComponentConfiguration() { return null; //To change body of implemented methods use File | Settings | File Templates. } protected void setupNonSerializableAttributes() { } }
Printer bolt
public class PrinterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple,BasicOutputCollector collector) { System.out.println(tuple.size() + " " + tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } }
Solution
From what I can see, it looks good The devil's details, though I don't know what your aggregator bolt is, but if you make any assumptions about the value passed to it, you should consider an appropriate field grouping This may not make much difference when you use the default parallelism hint 1, but if you decide to scale implicit logical assumptions using multiple aggregate bolt instances, you may need non random grouping