Bolts
首先看一下这个拓扑中的标准bolt:
01 | public class UserSplitterBolt implements IBasicBolt{ |
02 | private static final long serialVersionUID = 1L; |
05 | public void declareOutputFields(OutputFieldsDeclarer declarer) { |
06 | declarer.declareStream( "users" , new Fields( "txid" , "tweet_id" , "user" )); |
10 | public Map<String, Object> getComponentConfiguration() { |
15 | public void prepare(Map stormConf, TopologyContext context) {} |
18 | public void execute(Tuple input, BasicOutputCollector collector) { |
19 | String tweet = input.getStringByField( "tweet" ); |
20 | String tweetId = input.getStringByField( "tweet_id" ); |
21 | StringTokenizer strTok = new StringTokenizer(tweet, " " ); |
22 | HashSet<String> users = new HashSet<String>(); |
24 | while (strTok.hasMoreTokens()) { |
25 | String user = strTok.nextToken(); |
28 | if (user.startsWith( "@" ) && !users.contains(user)) { |
29 | collector.emit( "users" , new Values(tx, tweetId, user)); |
36 | public void cleanup(){} |
正如本章前面提到的,UserSplitterBolt接收元组,解析tweet文本,分发@开头的单词————tweeter用户。HashtagSplitterBolt的实现也非常相似。
01 | public class HashtagSplitterBolt implements IBasicBolt{ |
02 | private static final long serialVersionUID = 1L; |
05 | public void declareOutputFields(OutputFieldsDeclarer declarer) { |
06 | declarer.declareStream( "hashtags" , new Fields( "txid" , "tweet_id" , "hashtag" )); |
10 | public Map<String, Object> getComponentConfiguration() { |
15 | public void prepare(Map stormConf, TopologyContext context) {} |
18 | public void execute(Tuple input, BasicOutputCollector collector) { |
19 | String tweet = input.getStringByField( "tweet" ); |
20 | String tweetId = input.getStringByField( "tweet_id" ); |
21 | StringTokenizer strTok = new StringTokenizer(tweet, " " ); |
22 | TransactionAttempt tx = (TransactionAttempt)input.getValueByField( "txid" ); |
23 | HashSet<String> words = new HashSet<String>(); |
25 | while (strTok.hasMoreTokens()) { |
26 | String word = strTok.nextToken(); |
28 | if (word.startsWith( "#" ) && !words.contains(word)){ |
29 | collector.emit( "hashtags" , new Values(tx, tweetId, word)); |
36 | public void cleanup(){} |
现在看看UserHashTagJoinBolt的实现。首先要注意的是它是一个BaseBatchBolt。这意味着,execute方法会操作接收到的元组,但是不会分发新的元组。批次完成时,Storm会调用finishBatch方法。
01 | public void execute(Tuple tuple) { |
02 | String source = tuple.getSourceStreamId(); |
03 | String tweetId = tuple.getStringByField( "tweet_id" ); |
05 | if ( "hashtags" .equals(source)) { |
06 | String hashtag = tuple.getStringByField( "hashtag" ); |
07 | add(tweetHashtags, tweetId, hashtag); |
08 | } else if ( "users" .equals(source)) { |
09 | String user = tuple.getStringByField( "user" ); |
10 | add(userTweets, user, tweetId); |
既然要结合tweet中提到的用户为出现的所有话题计数,就需要加入前面的bolts创建的两个数据流组。这件事要以批次为单位进程,在批次处理完成时,调用finishBatch方法。
02 | public void finishBatch() { |
03 | for (String user:userTweets.keySet()){ |
04 | Set<String> tweets = getUserTweets(user); |
05 | HashMap<String, Integer> hashtagsCounter = new HashMap<String, Integer>(); |
06 | for (String tweet:tweets){ |
07 | Set<String> hashtags=getTweetHashtags(tweet); |
09 | for (String hashtag:hashtags){ |
10 | Integer count=hashtagsCounter.get(hashtag); |
11 | if (count== null ){count= 0 ;} |
13 | hashtagsCounter.put(hashtag,count); |
17 | for (String hashtag:hashtagsCounter.keySet()){ |
18 | int count=hashtagsCounter.get(hashtag); |
19 | collector.emit( new Values(id,user,hashtag,count)); |
这个方法计算每对用户-话题出现的次数,并为之生成和分发元组。
你可以在GitHub上找到并下载完整代码。(译者注:https://github.com/storm-book/examples-ch08-transactional-topologies这个仓库里没有代码,谁知道哪里有代码麻烦说一声。)
提交者bolts
我们已经学习了,批次通过协调器和分发器怎样在拓扑中传递。在拓扑中,这些批次中的元组以并行的,没有特定次序的方式处理。
协调者bolts是一类特殊的批处理bolts,它们实现了IComh mitter或者通过TransactionalTopologyBuilder调用setCommiterBolt设置了提交者bolt。它们与其它的批处理bolts最大的不同在于,提交者bolts的finishBatch方法在提交就绪时执行。这一点发生在之前所有事务都已成功提交之后。另外,finishBatch方法是顺序执行的。因此如果同时有事务ID1和事务ID2两个事务同时执行,只有在ID1没有任何差错的执行了finishBatch方法之后,ID2才会执行该方法。
下面是这个类的实现
01 | public class RedisCommiterCommiterBolt extends BaseTransactionalBolt implements ICommitter { |
02 | public static final String LAST_COMMITED_TRANSACTION_FIELD = "LAST_COMMIT" ; |
03 | TransactionAttempt id; |
04 | BatchOutputCollector collector; |
08 | public void prepare(Map conf, TopologyContext context, |
09 | BatchOutputCollector collector, TransactionAttempt id) { |
11 | this .collector = collector; |
12 | this .jedis = new Jedis( "localhost" ); |
15 | HashMap<String, Long> hashtags = new HashMap<String,Long>(); |
16 | HashMap<String, Long> users = new HashMap<String, Long>(); |
17 | HashMap<String, Long> usersHashtags = new HashMap<String, Long>(); |
19 | private void count(HashMap<String, Long> map, String key, int count) { |
20 | Long value = map.get(key); |
21 | if (value == null ){value = ( long ) 0 ;} |
27 | public void execute(Tuple tuple) { |
28 | String origin = tuple. getSourceComponent(); |
29 | if ( "sers-splitter" .equals(origin)) { |
30 | String user = tuple.getStringByField( "user" ); |
31 | count(users, user, 1 ); |
32 | } else if ( "hashtag-splitter" .equals(origin)) { |
33 | String hashtag = tuple.getStringByField( "hashtag" ); |
34 | count(hashtags, hashtag, 1 ); |
35 | } else if ( "user-hashtag-merger" .quals(origin)) { |
36 | String hashtag = tuple.getStringByField( "hashtag" ); |
37 | String user = tuple.getStringByField( "user" ); |
38 | String key = user + ":" + hashtag; |
39 | Integer count = tuple.getIntegerByField( "count" ); |
40 | count(usersHashtags, key, count); |
45 | public void finishBatch() { |
46 | String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD); |
47 | String currentTransaction = "" +id.getTransactionId(); |
49 | if (currentTransaction.equals(lastCommitedTransaction)) { return ;} |
51 | Transaction multi = jedis.multi(); |
53 | multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction); |
55 | Set<String> keys = hashtags.keySet(); |
56 | for (String hashtag : keys) { |
57 | Long count = hashtags.get(hashtag); |
58 | multi.hincrBy( "hashtags" , hashtag, count); |
61 | keys = users.keySet(); |
62 | for (String user : keys) { |
63 | Long count =users.get(user); |
64 | multi.hincrBy( "users" ,user,count); |
67 | keys = usersHashtags.keySet(); |
68 | for (String key : keys) { |
69 | Long count = usersHashtags.get(key); |
70 | multi.hincrBy( "users_hashtags" , key, count); |
77 | public void declareOutputFields(OutputFieldsDeclarer declarer) {} |
这个实现很简单,但是在finishBatch有一个细节。
2 | multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction); |
在这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。
分区的事务Spouts
对一个spout来说,从一个分区集合中读取批次是很普通的。接着这个例子,你可能有很多redis数据库,而tweets可能会分别保存在这些redis数据库里。通过实现IPartitionedTransactionalSpout,Storm提供了一些工具用来管理每个分区的状态并保证重播的能力。
下面我们修改TweetsTransactionalSpout,使它可以处理数据分区。
首先,继承BasePartitionedTransactionalSpout,它实现了IPartitionedTransactionalSpout。
1 | public class TweetsPartitionedTransactionalSpout extends |
2 | BasePartitionedTransactionalSpout<TransactionMetadata> { |
然后告诉Storm谁是你的协调器。
01 | public static class TweetsPartitionedTransactionalCoordinator implements Coordinator { |
03 | public int numPartitions() { |
08 | public boolean isReady() { |
13 | public void close() {} |
在这个例子里,协调器很简单。numPartitions方法,告诉Storm一共有多少分区。而且你要注意,不要返回任何元数据。对于IPartitionedTransactionalSpout,元数据由分发器直接管理。
下面是分发器的实现:
01 | public static class TweetsPartitionedTransactionalEmitter |
02 | implements Emitter<TransactionMetadata> { |
03 | PartitionedRQ rq = new ParttionedRQ(); |
06 | public TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx, |
07 | BatchOutputCollector collector, int partition, |
08 | TransactionMetadata lastPartitioonMeta) { |
11 | if (lastPartitionMeta == null ) { |
12 | nextRead = rq.getNextRead(partition); |
14 | nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity; |
15 | rq.setNextRead(partition, nextRead); |
18 | long quantity = rq.getAvailableToRead(partition, nextRead); |
19 | quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity; |
20 | TransactionMetadata metadata = new TransactionMetadata(nextRead, ( int )quantity); |
22 | emitPartitionBatch(tx, collector, partition, metadata); |
27 | public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, |
28 | int partition, TransactionMetadata partitionMeta) { |
29 | if (partitionMeta.quantity <= 0 ){ |
33 | List<String> messages = rq.getMessages(partition, partitionMeta.from, |
34 | partitionMeta.quantity); |
36 | long tweetId = partitionMeta.from; |
37 | for (String msg : messages) { |
38 | collector.emit( new Values(tx, "" +tweetId, msg)); |
44 | public void close() {} |
这里有两个重要的方法,emitPartitionBatchNew,和emitPartitionBatch。对于emitPartitionBatchNew,从Storm接收分区参数,该参数决定应该从哪个分区读取批次。在这个方法中,决定获取哪些tweets,生成相应的元数据对象,调用emitPartitionBatch,返回元数据对象,并且元数据对象会在方法返回时立即保存到zookeeper。
Storm会为每一个分区发送相同的事务ID,表示一个事务贯穿了所有数据分区。通过emitPartitionBatch读取分区中的tweets,并向拓扑分发批次。如果批次处理失败了,Storm将会调用emitPartitionBatch利用保存下来的元数据重复这个批次。
NOTE: 完整的源码请见:https://github.com/storm-book/examples-ch08-transactional-topologies(译者注:原文如此,实际上这个仓库里什么也没有)
模糊的事务性拓扑
到目前为止,你可能已经学会了如何让拥有相同事务ID的批次在出错时重播。但是在有些场景下这样做可能就不太合适了。然后会发生什么呢?
事实证明,你仍然可以实现在语义上精确的事务,不过这需要更多的开发工作,你要记录由Storm重复的事务之前的状态。既然能在不同时刻为相同的事务ID得到不同的元组,你就需要把事务重置到之前的状态,并从那里继续。
比如说,如果你为收到的所有tweets计数,你已数到5,而最后的事务ID是321,这时你多数了8个。你要维护以下三个值——previousCount=5,currentCount=13,以及lastTransactionId=321。假设事物ID321又发分了一次,而你又得到了4个元组,而不是之前的8个,提交器会探测到这是相同的事务ID,它将会把结果重置到previousCount的值5,并在此基础上加4,然后更新currentCount为9。
另外,在之前的一个事务被取消时,每个并行处理的事务都要被取消。这是为了确保你没有丢失任何数据。
你的spout可以实现IOpaquePartitionedTransactionalSpout,而且正如你看到的,协调器和分发器也很简单。
01 | public static class TweetsOpaquePartitionedTransactionalSpoutCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator { |
03 | public boolean isReady() { |
08 | public static class TweetsOpaquePartitionedTransactionalSpoutEmitter |
09 | implements IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> { |
10 | PartitionedRQ rq = new PartitionedRQ(); |
13 | public TransactionMetadata emitPartitionBatch(TransactionAttempt tx, |
14 | BatchOutputCollector collector, int partion, |
15 | TransactionMetadata lastPartitonMeta) { |
18 | if (lastPartitionMeta == null ) { |
19 | nextRead = rq.getNextRead(partition); |
21 | nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity; |
22 | rq.setNextRead(partition, nextRead); |
25 | long quantity = rq.getAvailabletoRead(partition, nextRead); |
26 | quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity; |
27 | TransactionMetadata metadata = new TransactionMetadata(nextRead, ( int )quantity); |
28 | emitMessages(tx, collector, partition, metadata); |
32 | private void emitMessage(TransactionAttempt tx, BatchOutputCollector collector, |
33 | int partition, TransactionMetadata partitionMeta) { |
34 | if (partitionMeta.quantity <= 0 ){ return ;} |
36 | List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity); |
37 | long tweetId = partitionMeta.from; |
38 | for (String msg : messages) { |
39 | collector.emit( new Values(tx, "" +tweetId, msg)); |
45 | public int numPartitions() { |
50 | public void close() {} |
最有趣的方法是emitPartitionBatch,它获取之前提交的元数据。你要用它生成批次。这个批次不需要与之前的那个一致,你可能根本无法创建完全一样的批次。剩余的工作由提交器bolts借助之前的状态完成。
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。