Java – grouping in a simple aggregation storm topology

I'm trying to write a topology that does the following:

>Subscribe to a twitter feed (based on keywords) > an aggregation bolt, which is used to aggregate some tweets (such as n) in the collection and send them to the printer bolt > a simple bolt, which prints the collection to the console at once

In reality, I want to do more with my collection

I tested it locally and it looked like I was working However, I don't know if I set the grouping on the bolt correctly and can work properly when deployed on the actual storm cluster I would appreciate it if someone could help you view this topology and suggest any errors, changes or improvements

thank you.

This is my topology

builder.setSpout("spout",new TwitterFilterSpout("pittsburgh"));
   builder.setBolt("sampleaggregate",new SampleAggregatorBolt())
                .shuffleGrouping("spout");
   builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

Polymerization bolt

public class SampleAggregatorBolt implements IRichBolt {

    protected OutputCollector collector;
    protected Tuple currentTuple;
    protected Logger log;
    /**
     * Holds the messages in the bolt till you are ready to send them out
     */
    protected List<Status> statusCache;

    @Override
    public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
        this.collector = collector;

        log = Logger.getLogger(getClass().getName());
        statusCache = new ArrayList<Status>();
    }

    @Override
    public void execute(Tuple tuple) {
        currentTuple = tuple;

        Status currentStatus = null;
        try {
            currentStatus = (Status) tuple.getValue(0);
        } catch (ClassCastException e) {
        }
        if (currentStatus != null) {

            //add it to the status cache
            statusCache.add(currentStatus);
            collector.ack(tuple);


            //check the size of the status cache and pass it to the next stage if you have enough messages to emit
            if (statusCache.size() > 10) {
                collector.emit(new Values(statusCache));
            }

        }
    }

    @Override
    public void cleanup() {


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("tweets"));

    }

    @Override
    public Map<String,Object> getComponentConfiguration() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }


    protected void setupNonSerializableAttributes() {

    }

}

Printer bolt

public class PrinterBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple,BasicOutputCollector collector) {
        System.out.println(tuple.size() + " "  + tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

}

Solution

From what I can see, it looks good The devil's details, though I don't know what your aggregator bolt is, but if you make any assumptions about the value passed to it, you should consider an appropriate field grouping This may not make much difference when you use the default parallelism hint 1, but if you decide to scale implicit logical assumptions using multiple aggregate bolt instances, you may need non random grouping

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>