打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
利用Kafka发送/消费消息

当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方。

依赖配置

        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka_2.11</artifactId>            <version>0.8.2.1</version>        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

注意kafka的artifact_Id(比如我用的kafka_2.11)后面的版本号一定要和本机装的Scala版本一致,否则会报以下错误。

Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class    at kafka.utils.Pool.<init>(Pool.scala:28)    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:91)    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:69)    at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:105)    at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)    at com.vonzhou.learn.message.KafkaConsumer.<init>(KafkaConsumer.java:23)    at com.vonzhou.learn.message.KafkaConsumer.main(KafkaConsumer.java:72)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:497)    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)    ... 13 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

Producer发送消息

public class MessageProducer {    private Producer<String,String> producer;    public static void main(String[] args) {        new MessageProducer().start();    }    public void init(){        Properties props = new Properties();        /**         * 用于自举(bootstrapping ),producer只是用它来获得元数据(topic, partition, replicas)         * 实际用户发送消息的socket会根据返回的元数据来确定         */        props.put("metadata.broker.list", "localhost:9092");        /**         * 消息的序列化类         * 默认是 kafka.serializer.DefaultEncoder, 输入时 byte[] 返回是同样的字节数组         */        props.put("serializer.class", "kafka.serializer.StringEncoder");        /**         * producer发送消息后是否等待broker的ACK,默认是0         * 1 表示等待ACK,保证消息的可靠性         */        props.put("request.required.acks", "1");        ProducerConfig config = new ProducerConfig(props);        // 泛型参数分别表示 The first is the type of the Partition key, the second the type of the message        producer = new Producer<String, String>(config);    }    public void produceMsg(){        // 构建发送的消息        long timestamp = System.currentTimeMillis();        String msg = "Msg" + timestamp;        String topic = "test";  // 确保有这个topic        System.out.println("发送消息" + msg);        String key = "Msg-Key" + timestamp;        /**         * topic: 消息的主题         * key:消息的key,同时也会作为partition的key         * message:发送的消息         */        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, msg);        producer.send(data);    }    public void start() {        System.out.println("开始发送消息 ...");        Executors.newSingleThreadExecutor().execute(new Runnable() {            public void run() {                init();                while (true) {                    try {                        produceMsg();                        Thread.sleep(2000);                    } catch (Throwable e) {                        if (producer != null) {                            try {                                producer.close();                            } catch (Throwable e1) {                                System.out.println("Turn off Kafka producer error! " + e);                            }                        }                    }                }            }        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

Consumer消费消息

public class MessageConsumer {    private ConsumerConnector consumer;    private String topic;    public static void main(String[] arg) {        new MessageConsumer().start();    }    public void init(){        // 指定 zookeeper 的地址        String zookeeper = "localhost:2181";        String topic = "test";        String groupId = "test-group";        Properties props = new Properties();        /**         * 必须的配置         */        props.put("zookeeper.connect", zookeeper);        /**         * 必须的配置, 代表该消费者所属的 consumer group         */        props.put("group.id", groupId);        /**         * 多长时间没有发送心跳信息到zookeeper就会认为其挂掉了,默认是6000         */        props.put("zookeeper.session.timeout.ms", "6000");        /**         * 可以允许zookeeper follower 比 leader慢的时长         */        props.put("zookeeper.sync.time.ms", "200");        /**         * 控制consumer offsets提交到zookeeper的频率, 默认是60 * 1000         */        props.put("auto.commit.interval.ms", "1000");        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));        this.topic = topic;    }    public void consume() {        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();        topicCountMap.put(topic, 1);        /**         * createMessageStreams 为每个topic创建 message stream         */        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();        while (iterator.hasNext()) {            try {                String message = new String(iterator.next().message());                System.out.println("收到消息" + message);            } catch (Throwable e) {                System.out.println(e.getCause());            }        }    }    public void start() {        System.out.println("开始消费消息...");        Executors.newSingleThreadExecutor().execute(new Runnable() {            public void run() {                init();                while (true) {                    try {                        consume();                    } catch (Throwable e) {                        if (consumer != null) {                            try {                                consumer.shutdown();                            } catch (Throwable e1) {                                System.out.println("Turn off Kafka consumer error! " + e);                            }                        }                    }                }            }        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

参考

配置参数说明
Kafka 0.8.2 Documentation

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Kafka Java客户端代码示例
Kafka详解三:开发Kafka应用
漫游kafka实战篇之搭建Kafka开发环境
Apache Kafka:下一代分布式消息系统
Kafka基本原理
大数据——kafka的相关笔记
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服