<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> <scope>provided</scope></dependency><dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.0.2</version></dependency><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version></dependency>
public class JsonBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory .getLogger(JsonBolt.class); private Fields fields; private OutputCollector collector; public JsonBolt() { this.fields = new Fields('hostIp', 'instanceName', 'className', 'methodName', 'createTime', 'callTime', 'errorCode'); } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String spanDataJson = tuple.getString(0); LOG.info('source data:{}', spanDataJson); Map<String, Object> map = (Map<String, Object>) JSONValue .parse(spanDataJson); Values values = new Values(); for (int i = 0, size = this.fields.size(); i < size; i ) { values.add(map.get(this.fields.get(i))); } this.collector.emit(tuple, values); this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(this.fields); }}
public class MyTopology { private static final String TOPOLOGY_NAME = 'SPAN-DATA-TOPOLOGY'; private static final String KAFKA_SPOUT_ID = 'kafka-stream'; private static final String JsonProject_BOLT_ID = 'jsonProject-bolt'; public static void main(String[] args) throws Exception { String zks = '132.122.252.51:2181'; String topic = 'span-data-topic'; String zkRoot = '/kafka-storm'; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, KAFKA_SPOUT_ID); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.zkServers = Arrays.asList(new String[] { '132.122.252.51' }); spoutConf.zkPort = 2181; JsonBolt jsonBolt = new JsonBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConf)); builder.setBolt(JsonProject_BOLT_ID, jsonBolt).shuffleGrouping( KAFKA_SPOUT_ID); Config config = new Config(); config.setNumWorkers(1); if (args.length == 0) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); Utils.waitForSeconds(100); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } else { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } }}
========广告时间========
公众号的菜单已分为“分布式”、“机器学习”、“深度学习”、“NLP”、“Java深度”、“Java并发核心”、“JDK源码”、“Tomcat内核”等,可能有一款适合你的胃口。
鄙人的新书《Tomcat内核设计剖析》已经在京东销售了,有需要的朋友可以购买。感谢各位朋友。
=========================
欢迎关注:
联系客服