打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
【Kafka】

【Kafka】- Java 客户端

撸了今年阿里、腾讯和美团的面试,我有一个重要发现.......>>

旧版本

工作流程:通过ZK集群获取Kafka集群信息,进而进行Kafka相关操作

package com.zhiwei.kafka.base;import kafka.admin.AdminUtils;import kafka.admin.RackAwareMode;import kafka.server.ConfigType;import kafka.utils.ZkUtils;import lombok.extern.slf4j.Slf4j;import org.apache.commons.collections.MapUtils;import org.apache.commons.lang3.StringUtils;import org.apache.kafka.common.security.JaasUtils;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;/** * @author 作者      ZHIWEI.YANG * @function 功能      Kafka主题客户端 * @time 时间      2018年5月9日-下午4:24:18 */@SuppressWarnings("deprecation")@Slf4jpublic class OldKafkaClient {    private String zkUrl = "localhost:2181";    private Integer sessionTimeout = 3000;    private Integer connectTimeout = 3000;    private ZkUtils zkUtils = null;    public OldKafkaClient(String zkUrl) {        if (!StringUtils.isEmpty(zkUrl)) {            this.zkUrl = zkUrl;        }    }    public static void main(String[] args) {        OldKafkaClient client = new OldKafkaClient("debian:2181");        String topicName = "mytopoc";        //创建主题        client.creatTopic(topicName, 1, 1, null);        Map<String, String> properties = new HashMap<String, String>();        // 增加topic级别属性        properties.put("min.cleanable.dirty.ratio", "0.3");        // 删除topic级别属性        //properties.remove("max.message.bytes");        client.updateTopic(topicName, properties, null);        //查询topic属性       log.info("topic:{}, 属性:{}",topicName, client.queryTopic(topicName));        //删除主题        client.deleteTopic(topicName);    }    private void getZkUtils() {        zkUtils = ZkUtils.apply(zkUrl, sessionTimeout, connectTimeout, JaasUtils.isZkSecurityEnabled());    }    /**     * 创建主题     */    public void creatTopic(String topicName, Integer partitionNum, Integer replicationFactor, Properties topicConfig) {        getZkUtils();        AdminUtils.createTopic(zkUtils, topicName, partitionNum, replicationFactor,                topicConfig == null ? new Properties() : topicConfig,                RackAwareMode.Enforced$.MODULE$);        zkUtils.close();    }    /**     * 删除主题     */    public void deleteTopic(String topicName) {        getZkUtils();        AdminUtils.deleteTopic(zkUtils, topicName);        zkUtils.close();    }    /**     * 查询主题     */    @SuppressWarnings("unchecked")    public Map<String, String> queryTopic(String topicName) {        getZkUtils();        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topicName);        Map<String, String> result = MapUtils.fixedSizeMap(props);        zkUtils.close();        return result;    }    /**     * 修改主题     */    public void updateTopic(String topicName, Map<String, String> addProperties, List<String> deleteProps) {        getZkUtils();        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topicName);        props.putAll(addProperties);        if (deleteProps != null) {            for (String key : deleteProps) {                props.remove(key);            }        }        AdminUtils.changeTopicConfig(zkUtils, topicName, props);        zkUtils.close();    }}

新版本

工作流程:直连Kafka Bootstrap Server, 直接操作Kafka集群

package com.zhiwei.kafka.base;import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.admin.*;import org.apache.kafka.common.KafkaFuture;import java.util.*;import java.util.concurrent.ExecutionException;/** * @author 作者 ZHIWEI.YANG * @function 功能 Kafka主题客户端 * @time 时间 2018年5月9日-下午4:24:18 */@Slf4jpublic class NewKafkaClient {    private String host = "localhost";    private Integer port = 9092;    private AdminClient adminClient;    private short replicationFactor = 1;    private Integer numPartitions = 1;    private Map<String, Object> conf = new HashMap<String, Object>();    {        conf.put("bootstrap.servers", host + ":" + port);    }    public NewKafkaClient() {    }    public NewKafkaClient(String host, Integer port) {        this.host = host;        this.port = port;        conf.put("bootstrap.servers", host + ":" + port);        adminClient = AdminClient.create(conf);    }    public NewKafkaClient(Map<String, Object> conf) {        this.conf.putAll(conf);        adminClient = AdminClient.create(conf);    }    public static void main(String[] args) throws InterruptedException, ExecutionException {        NewKafkaClient client = new NewKafkaClient("centos", 9092);        String topicName = "mytopic";       log.info("创建topic:{}, 结果:{}",topicName, client.createSingleTopic(topicName));       log.info("topic:{}, 属性:{}", topicName, client.descriptTopic(topicName));       log.info("所有topic:{}", client.queryAllTopic());    }    /**     * 查询主题名     *     * @return     */    public List<String> queryAllTopic() {        ListTopicsResult listTopicsResult = adminClient.listTopics();        KafkaFuture<Collection<TopicListing>> kafkaFuture = listTopicsResult.listings();        Collection<TopicListing> collections;        List<String> topics = null;        try {            collections = kafkaFuture.get();            if (collections != null && collections.size() != 0) {                topics = new ArrayList<String>(collections.size());                for (TopicListing topicListing : collections) {                    topics.add(topicListing.name());                }            }        } catch (InterruptedException | ExecutionException e) {            e.printStackTrace();        }        return topics;    }    public boolean createSingleTopic(String name) {        NewTopic newTopic = new NewTopic(name, numPartitions, replicationFactor);        List<NewTopic> newTopics = new ArrayList<NewTopic>(1);        newTopics.add(newTopic);        return createTopics(newTopics, null);    }    public boolean createTopics(List<NewTopic> newTopics, CreateTopicsOptions createTopicsOptions) {        if (createTopicsOptions == null) {            createTopicsOptions = new CreateTopicsOptions();            createTopicsOptions.timeoutMs(1000);        }        CreateTopicsResult results = adminClient.createTopics(newTopics, createTopicsOptions);        KafkaFuture<Void> kafkaFuture = results.all();        return kafkaFuture.isDone();    }    public boolean deleteTopic(String... name) {        if (name == null || name.length == 0) {            return true;        }        List<String> topics = Arrays.asList(name);        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topics);        return deleteTopicsResult.all().isDone();    }    public Map<String, TopicDescription> descriptTopic(String... topics) throws InterruptedException, ExecutionException {        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(topics));        return describeTopicsResult.all().get();    }    public void close() {        if (adminClient != null) {            adminClient.close();        }    }}
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Kafka学习整理七(producer和consumer编程实践)
Java 微信支付 发起商家转账API 2022年v3 transfer batches
spring boot2整合shiro安全框架实现前后端分离的JWT token登录验证
ZooKeeper实现生产-消费者队列
利用Flink stream从kafka中写数据到mysql
Flink自定义 Sink 函数从kafka往kudu写数据
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服