打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Storm入门之第8章事务性拓扑-2

Bolts

首先看一下这个拓扑中的标准bolt

01public class UserSplitterBolt implements IBasicBolt{
02    private static final long serialVersionUID = 1L;
03 
04    @Override
05    public void declareOutputFields(OutputFieldsDeclarer declarer) {
06        declarer.declareStream("users"new Fields("txid","tweet_id","user"));
07    }
08 
09    @Override
10    public Map<String, Object> getComponentConfiguration() {
11        return null;
12    }
13 
14    @Override
15    public void prepare(Map stormConf, TopologyContext context) {}
16 
17    @Override
18    public void execute(Tuple input, BasicOutputCollector collector) {
19        String tweet = input.getStringByField("tweet");
20        String tweetId = input.getStringByField("tweet_id");
21        StringTokenizer strTok = new StringTokenizer(tweet, " ");
22        HashSet<String> users = new HashSet<String>();
23 
24        while(strTok.hasMoreTokens()) {
25            String user = strTok.nextToken();
26 
27            //确保这是个真实的用户,并且在这个tweet中没有重复
28            if(user.startsWith("@") && !users.contains(user)) {
29                collector.emit("users"new Values(tx, tweetId, user));
30                users.add(user);
31            }
32        }
33    }
34 
35    @Override
36    public void cleanup(){}
37}

正如本章前面提到的,UserSplitterBolt接收元组,解析tweet文本,分发@开头的单词————tweeter用户。HashtagSplitterBolt的实现也非常相似。

01public class HashtagSplitterBolt implements IBasicBolt{
02    private static final long serialVersionUID = 1L;
03 
04    @Override
05    public void declareOutputFields(OutputFieldsDeclarer declarer) {
06        declarer.declareStream("hashtags"new Fields("txid","tweet_id","hashtag"));
07    }
08 
09    @Override
10    public Map<String, Object> getComponentConfiguration() {
11        return null;
12    }
13 
14    @Override
15    public void prepare(Map stormConf, TopologyContext context) {}
16 
17    @Oerride
18    public void execute(Tuple input, BasicOutputCollector collector) {
19        String tweet = input.getStringByField("tweet");
20        String tweetId = input.getStringByField("tweet_id");
21        StringTokenizer strTok = new StringTokenizer(tweet, " ");
22        TransactionAttempt tx = (TransactionAttempt)input.getValueByField("txid");
23        HashSet<String> words = new HashSet<String>();
24 
25        while(strTok.hasMoreTokens()) {
26            String word = strTok.nextToken();
27 
28            if(word.startsWith("#") && !words.contains(word)){
29                collector.emit("hashtags"new Values(tx, tweetId, word));
30                words.add(word);
31            }
32        }
33    }
34 
35    @Override
36    public void cleanup(){}
37}

现在看看UserHashTagJoinBolt的实现。首先要注意的是它是一个BaseBatchBolt。这意味着,execute方法会操作接收到的元组,但是不会分发新的元组。批次完成时,Storm会调用finishBatch方法。

01public void execute(Tuple tuple) {
02    String source = tuple.getSourceStreamId();
03    String tweetId = tuple.getStringByField("tweet_id");
04 
05    if("hashtags".equals(source)) {
06        String hashtag = tuple.getStringByField("hashtag");
07        add(tweetHashtags, tweetId, hashtag);
08    else if("users".equals(source)) {
09        String user = tuple.getStringByField("user");
10        add(userTweets, user, tweetId);
11    }
12}

既然要结合tweet中提到的用户为出现的所有话题计数,就需要加入前面的bolts创建的两个数据流组。这件事要以批次为单位进程,在批次处理完成时,调用finishBatch方法。

01@Override
02public void finishBatch() {
03    for(String user:userTweets.keySet()){
04        Set<String> tweets = getUserTweets(user);
05        HashMap<String, Integer> hashtagsCounter = new HashMap<String, Integer>();
06        for(String tweet:tweets){
07            Set<String> hashtags=getTweetHashtags(tweet);
08            if(hashtags!=null){
09                for(String hashtag:hashtags){
10                    Integer count=hashtagsCounter.get(hashtag);
11                    if(count==null){count=0;}
12                    count++;
13                    hashtagsCounter.put(hashtag,count);
14                }
15            }
16        }
17        for(String hashtag:hashtagsCounter.keySet()){
18            int count=hashtagsCounter.get(hashtag);
19            collector.emit(new Values(id,user,hashtag,count));
20        }
21    }
22}

这个方法计算每对用户-话题出现的次数,并为之生成和分发元组。

你可以在GitHub上找到并下载完整代码。(译者注:https://github.com/storm-book/examples-ch08-transactional-topologies这个仓库里没有代码,谁知道哪里有代码麻烦说一声。)

提交者bolts

我们已经学习了,批次通过协调器和分发器怎样在拓扑中传递。在拓扑中,这些批次中的元组以并行的,没有特定次序的方式处理。

协调者bolts是一类特殊的批处理bolts,它们实现了IComh mitter或者通过TransactionalTopologyBuilder调用setCommiterBolt设置了提交者bolt。它们与其它的批处理bolts最大的不同在于,提交者boltsfinishBatch方法在提交就绪时执行。这一点发生在之前所有事务都已成功提交之后。另外,finishBatch方法是顺序执行的。因此如果同时有事务ID1和事务ID2两个事务同时执行,只有在ID1没有任何差错的执行了finishBatch方法之后,ID2才会执行该方法。

下面是这个类的实现

01public class RedisCommiterCommiterBolt extends BaseTransactionalBolt implementsICommitter {
02    public static final String LAST_COMMITED_TRANSACTION_FIELD = "LAST_COMMIT";
03    TransactionAttempt id;
04    BatchOutputCollector collector;
05    Jedis jedis;
06 
07    @Override
08    public void prepare(Map conf, TopologyContext context,
09                        BatchOutputCollector collector, TransactionAttempt id) {
10        this.id = id;
11        this.collector = collector;
12        this.jedis = new Jedis("localhost");
13    }
14 
15    HashMap<String, Long> hashtags = new HashMap<String,Long>();
16    HashMap<String, Long> users = new HashMap<String, Long>();
17    HashMap<String, Long> usersHashtags = new HashMap<String, Long>();
18 
19    private void count(HashMap<String, Long> map, String key, int count) {
20        Long value = map.get(key);
21        if(value == null){value = (long)0;}
22        value += count;
23        map.put(key,value);
24    }
25 
26    @Override
27    public void execute(Tuple tuple) {
28        String origin = tuple. getSourceComponent();
29        if("sers-splitter".equals(origin)) {
30            String user = tuple.getStringByField("user");
31            count(users, user, 1);
32        else if("hashtag-splitter".equals(origin)) {
33            String hashtag = tuple.getStringByField("hashtag");
34            count(hashtags, hashtag, 1);
35        else if("user-hashtag-merger".quals(origin)) {
36            String hashtag = tuple.getStringByField("hashtag");
37            String user = tuple.getStringByField("user");
38            String key = user + ":" + hashtag;
39            Integer count = tuple.getIntegerByField("count");
40            count(usersHashtags, key, count);
41        }
42    }
43 
44    @Override
45    public void finishBatch() {
46        String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);
47        String currentTransaction = ""+id.getTransactionId();
48 
49        if(currentTransaction.equals(lastCommitedTransaction)) {return;}
50 
51        Transaction multi = jedis.multi();
52 
53        multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);
54 
55        Set<String> keys = hashtags.keySet();
56        for (String hashtag : keys) {
57            Long count = hashtags.get(hashtag);
58            multi.hincrBy("hashtags", hashtag, count);
59        }
60 
61        keys = users.keySet();
62        for (String user : keys) {
63            Long count =users.get(user);
64            multi.hincrBy("users",user,count);
65        }
66 
67        keys = usersHashtags.keySet();
68        for (String key : keys) {
69            Long count = usersHashtags.get(key);
70            multi.hincrBy("users_hashtags", key, count);
71        }
72 
73        multi.exec();
74    }
75 
76    @Override
77    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
78}

这个实现很简单,但是在finishBatch有一个细节。

1...
2multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);
3...

在这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。

分区的事务Spouts
对一个spout来说,从一个分区集合中读取批次是很普通的。接着这个例子,你可能有很多redis数据库,而tweets可能会分别保存在这些redis数据库里。通过实现IPartitionedTransactionalSpout,Storm提供了一些工具用来管理每个分区的状态并保证重播的能力。
下面我们修改TweetsTransactionalSpout,使它可以处理数据分区。
首先,继承BasePartitionedTransactionalSpout,它实现了IPartitionedTransactionalSpout

1public class TweetsPartitionedTransactionalSpout extends
2       BasePartitionedTransactionalSpout<TransactionMetadata> {
3...
4}

然后告诉Storm谁是你的协调器。

01public static class TweetsPartitionedTransactionalCoordinator implements Coordinator {
02    @Override
03    public int numPartitions() {
04        return 4;
05    }
06 
07    @Override
08    public boolean isReady() {
09        return true;
10    }
11 
12    @Override
13    public void close() {}
14}

在这个例子里,协调器很简单。numPartitions方法,告诉Storm一共有多少分区。而且你要注意,不要返回任何元数据。对于IPartitionedTransactionalSpout,元数据由分发器直接管理。
下面是分发器的实现:

01public static class TweetsPartitionedTransactionalEmitter
02       implements Emitter<TransactionMetadata> {
03    PartitionedRQ rq = new ParttionedRQ();
04 
05    @Override
06    public TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,
07            BatchOutputCollector collector, int partition,
08            TransactionMetadata lastPartitioonMeta) {
09        long nextRead;
10 
11        if(lastPartitionMeta == null) {
12            nextRead = rq.getNextRead(partition);
13        }else{
14            nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;
15            rq.setNextRead(partition, nextRead); //移动游标
16        }
17 
18        long quantity = rq.getAvailableToRead(partition, nextRead);
19        quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;
20        TransactionMetadata metadata = new TransactionMetadata(nextRead, (int)quantity);
21 
22        emitPartitionBatch(tx, collector, partition, metadata);
23        return metadata;
24    }
25 
26    @Override
27    public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,
28            int partition, TransactionMetadata partitionMeta) {
29        if(partitionMeta.quantity <= 0){
30            return;
31        }
32 
33        List<String> messages = rq.getMessages(partition, partitionMeta.from,
34               partitionMeta.quantity);
35 
36        long tweetId = partitionMeta.from;
37        for (String msg : messages) {
38            collector.emit(new Values(tx, ""+tweetId, msg));
39            tweetId++;
40        }
41    }
42 
43    @Override
44    public void close() {}
45}

这里有两个重要的方法,emitPartitionBatchNew,和emitPartitionBatch。对于emitPartitionBatchNew,从Storm接收分区参数,该参数决定应该从哪个分区读取批次。在这个方法中,决定获取哪些tweets,生成相应的元数据对象,调用emitPartitionBatch,返回元数据对象,并且元数据对象会在方法返回时立即保存到zookeeper。
Storm会为每一个分区发送相同的事务ID,表示一个事务贯穿了所有数据分区。通过emitPartitionBatch读取分区中的tweets,并向拓扑分发批次。如果批次处理失败了,Storm将会调用emitPartitionBatch利用保存下来的元数据重复这个批次。

NOTE: 完整的源码请见:https://github.com/storm-book/examples-ch08-transactional-topologies(译者注:原文如此,实际上这个仓库里什么也没有)

模糊的事务性拓扑

到目前为止,你可能已经学会了如何让拥有相同事务ID的批次在出错时重播。但是在有些场景下这样做可能就不太合适了。然后会发生什么呢?

事实证明,你仍然可以实现在语义上精确的事务,不过这需要更多的开发工作,你要记录由Storm重复的事务之前的状态。既然能在不同时刻为相同的事务ID得到不同的元组,你就需要把事务重置到之前的状态,并从那里继续。

比如说,如果你为收到的所有tweets计数,你已数到5,而最后的事务ID是321,这时你多数了8个。你要维护以下三个值——previousCount=5,currentCount=13,以及lastTransactionId=321。假设事物ID321又发分了一次,而你又得到了4个元组,而不是之前的8个,提交器会探测到这是相同的事务ID,它将会把结果重置到previousCount的值5,并在此基础上加4,然后更新currentCount为9。

另外,在之前的一个事务被取消时,每个并行处理的事务都要被取消。这是为了确保你没有丢失任何数据。

你的spout可以实现IOpaquePartitionedTransactionalSpout,而且正如你看到的,协调器和分发器也很简单。

01public static class TweetsOpaquePartitionedTransactionalSpoutCoordinator implementsIOpaquePartitionedTransactionalSpout.Coordinator {
02    @Override
03    public boolean isReady() {
04        return true;
05    }
06}
07 
08public static class TweetsOpaquePartitionedTransactionalSpoutEmitter
09       implements IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {
10    PartitionedRQ rq  = new PartitionedRQ();
11 
12    @Override
13    public TransactionMetadata emitPartitionBatch(TransactionAttempt tx,
14           BatchOutputCollector collector, int partion,
15           TransactionMetadata lastPartitonMeta) {
16        long nextRead;
17 
18        if(lastPartitionMeta == null) {
19            nextRead = rq.getNextRead(partition);
20        }else{
21            nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;
22            rq.setNextRead(partition, nextRead);//移动游标
23        }
24 
25        long quantity = rq.getAvailabletoRead(partition, nextRead);
26        quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;
27        TransactionMetadata metadata = new TransactionMetadata(nextRead, (int)quantity);
28        emitMessages(tx, collector, partition, metadata);
29        return metadata;
30    }
31 
32    private void emitMessage(TransactionAttempt tx, BatchOutputCollector collector,
33                 int partition, TransactionMetadata partitionMeta) {
34        if(partitionMeta.quantity <= 0){return;}
35 
36        List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity);
37        long tweetId = partitionMeta.from;
38        for(String msg : messages) {
39            collector.emit(new Values(tx, ""+tweetId, msg));
40            tweetId++;
41        }
42    }
43 
44    @Override
45    public int numPartitions() {
46        return 4;
47    }
48 
49    @Override
50    public void close() {}
51}

最有趣的方法是emitPartitionBatch,它获取之前提交的元数据。你要用它生成批次。这个批次不需要与之前的那个一致,你可能根本无法创建完全一样的批次。剩余的工作由提交器bolts借助之前的状态完成。


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Storm 集群搭建及编写WordCount
kafka+storm+hbase架构设计
Flink Stream Windows Join
「Flink」Flink中的时间类型
Spark 项目api功能解析和含义记录
storm定时任务【tick】
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服