打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
windows中直接使用kafka和zookeeper,以及zk的图像化工具的使用

文章目录

zookeeper图形化连接工具ZkInspector

我花了一天时间用java的swing库写了一个图形化连接zookeeper的工具,起名叫ZkInspector。为了大家直接上手使用,首先我会介绍如何在windows上直接使用kafka和zookeeper,然后再介绍ZkInspector的使用。

视频演示:

演示windows平台启动和使用kafka和zookeeper

视频地址:https://www.bilibili.com/video/BV1QC4y1W7zv/

下载kafka

由于kafka内置zookeeper,所以只需要下载kafka即可。

我们在http://archive.apache.org/dist/kafka/中选择所需要下载的kafka版本。

由于我的电脑安装的scala版本为2.11.8,所以我选择了一个支持scala2.11的最高kafka版本进行测试:

http://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz

然后解压下载好的包,比如我解压到了D:\jdk中,则配置环境变量KAFKA_HOME=D:\jdk\kafka_2.11-2.4.1

然后将%KAFKA_HOME%\bin\windows加入Path环境变量。

注意:kafka虽然使用scala开发,但本地未安装scala也依然可以使用。

本文涉及的所有软件的下载地址为:

链接:https://pan.baidu.com/s/124d4aeVYZb8_Yb7OkOIijg
提取码:no0y

启动并访问zookeeper

进入D:\jdk\kafka_2.11-2.4.1目录并启动zk:

zookeeper-server-start config\zookeeper.properties

启动zookeeper后,我们就可以访问zookeeper了。

图形化访问zookeeper的工具

为了大家访问方便,我专门花了一天时间用java开发了一个zookeeper的图像化界面的访问工具:

虽然不是很完善,但基本够用,而且非常方便。

源码暂时不给大家,除非具有java开发能力并想要改进本软件的人比较多,有个别想要改进本软件的可以与我私聊。

当然技术还不错的,想要弄本软件的源码也可以直接反编译,我虽然可以对源码进行混淆加密增加反编译的难度,但是想想还是算了,一个小程序而已。当然我更建议,想通过反编译获取源码的朋友,不如把精力花在如何改进这个软件上面,后期我可能会将源码上传到github上。

ZkInspector启动方法

最通用的办法,进入ZkInspector.jar所在的目录,执行:

javaw -jar ZkInspector.jar

这种方法执行会残留cmd黑屏界面,但如果你对jar包的默认打开程序是javaw -jar命令,则可以直接双击运行jar包。凡是通过jdk官网exe安装程序安装的java都可以直接双击打开。

但是我个人就不是,我是直接解压已经安装好的jdk,然后配置环境变量后就直接使用。

如果你也是这种情况,但也想要实现直接双击打开jar包,可以参考下面的方法:

首先,随便找个jar文件右键单击点打开方式–>默认打开方式,点预览,选择%JAVA_HOME%\bin下的javaw.exe文件点确定,让注册表中先生成相应的项和值。

然后修改刚才生成的注册表项(regedit命令打开注册表):

修改HKEY_CLASSES_ROOT\Applications\javaw.exe\shell\open\command的默认值

由
"D:\jdk\jdk1.8.0\bin\javaw.exe" "%1"
改为
"D:\jdk\jdk1.8.0\bin\javaw.exe" -jar "%1"

D:\jdk\jdk1.8.0跟换为你的jdk的配置。

ZkInspector的使用

只需点击一下就可以连接zookeeper了:

随便建了点数据就是这种效果:

启动并访问kafka

也需要先进入kafka的目录D:\jdk\kafka_2.11-2.4.1中:

kafka-server-start config\server.properties

启动完成后:

zookeeper上的变化:

测试kafka

再启动一个cmd进行测试。

创建主题:

kafka-topics --create --zookeeper localhost:2181 --topic test --partitions 3 --replication-factor 1
kafka-topics --create --zookeeper localhost:2181 --topic hello --partitions 2 --replication-factor 1
kafka-topics --create --zookeeper localhost:2181 --topic haha --partitions 1 --replication-factor 1

查看主题:

kafka-topics --describe --zookeeper localhost:2181 --topic test

列出主题:

kafka-topics --list --zookeeper localhost:2181

删除主题:

kafka-topics --delete --zookeeper localhost:2181 --topic haha

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s8TS2ugZ-1588782580319)(http://qiniuimg.xiaoxiaoming.xyz/imgs/image-20200506131950841.png?imageslim)]

提示删除命令只是把主题标记为删除状态,要想彻底删除,需要在config/server.proporties中配置delete.topic.enable=true,不过windows上这个配置似乎不生效。

删除命令的本质仅仅只是在admin节点下面增加了被删除节点的记录:

删除主题后出现问题

然后,kafka服务端程序挂掉,并爆出如下错误:

java.nio.file.AccessDeniedException: D:\tmp\kafka-logs\haha-0 -> D:\tmp\kafka-lo
gs\haha-0.88f2623a9f47483eb65ffd9ba1ce0ab1-delete
        at sun.nio.fs.WindowsException.translateToIOException(WindowsException.j
ava:83)
        at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.jav
a:97)
        at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
        at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.j
ava:287)
        at java.nio.file.Files.move(Files.java:1395)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java
:795)
        at kafka.log.Log$$anonfun$renameDir$1.apply$mcV$sp(Log.scala:982)
        at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:980)
        at kafka.log.Log$$anonfun$renameDir$1.apply(Log.scala:980)
        at kafka.log.Log.maybeHandleIOException(Log.scala:2335)
        at kafka.log.Log.renameDir(Log.scala:980)
        at kafka.log.LogManager.asyncDelete(LogManager.scala:925)
        at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:479)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:261)
        at kafka.cluster.Partition.delete(Partition.scala:470)
        at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:360)
        at kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaMana
ger.scala:404)
        at kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaMana
ger.scala:402)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
        at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:402)
        at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:236)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
        at java.lang.Thread.run(Thread.java:745)
        Suppressed: java.nio.file.AccessDeniedException: D:\tmp\kafka-logs\haha-
0 -> D:\tmp\kafka-logs\haha-0.88f2623a9f47483eb65ffd9ba1ce0ab1-delete
                at sun.nio.fs.WindowsException.translateToIOException(WindowsExc
eption.java:83)
                at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsExcep
tion.java:97)
                at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
                at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemPr
ovider.java:287)
                at java.nio.file.Files.move(Files.java:1395)
                at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Ut
ils.java:792)
                ... 19 more
[2020-05-06 13:19:20,312] INFO [ReplicaManager broker=0] Stopping serving replic
as in dir D:\tmp\kafka-logs (kafka.server.ReplicaManager)
[2020-05-06 13:19:20,318] INFO [ReplicaFetcherManager on broker 0] Removed fetch
er for partitions Set(test-0, hello-1, test-1, hello-0, test-2) (kafka.server.Re
plicaFetcherManager)
[2020-05-06 13:19:20,319] INFO [ReplicaAlterLogDirsManager on broker 0] Removed
fetcher for partitions Set(test-0, hello-1, test-1, hello-0, test-2) (kafka.serv
er.ReplicaAlterLogDirsManager)
[2020-05-06 13:19:20,337] INFO [ReplicaManager broker=0] Broker 0 stopped fetche
r for partitions test-0,hello-1,test-1,hello-0,test-2 and stopped moving logs fo
r partitions  because they are in the failed log directory D:\tmp\kafka-logs. (k
afka.server.ReplicaManager)
[2020-05-06 13:19:20,338] INFO Stopping serving logs in dir D:\tmp\kafka-logs (k
afka.log.LogManager)
[2020-05-06 13:19:20,345] ERROR Shutdown broker because all log dirs in D:\tmp\k
afka-logs have failed (kafka.log.LogManager)
[2020-05-06 13:19:20,348] INFO [ReplicaFetcherManager on broker 0] Removed fetch
er for partitions Set(haha-0) (kafka.server.ReplicaFetcherManager)

大概意思就是kafka没有权限将D:\tmp\kafka-logs\haha-0重命名为D:\tmp\kafka-logs\haha-0.88f2623a9f47483eb65ffd9ba1ce0ab1-delete。再次启动kafka服务端,依然会报同样的错误,无法启动,除非删除zookeeper上的admin/delete_topics/haha节点,取消对它的删除标记。

虽然kafka服务端已经挂了,但kafka客户端还可以通过zookeeper获取一些数据信息:

但我们的目的就是删除这个topic,先直接删除zookeeper上关于haha的节点,按照Ctrl键进行选择:

删除之后,kafka服务已经可以顺利的重新启动了。

然后再手工删除D:\tmp\kafka-logs\haha-0文件夹,就彻底这个主题下的所有数据了。

注意:如果该topic有多个分区则需要删除多个文件夹。

此时我在%KAFKA_HOME%config\server.properties添加

delete.topic.enable=true

后再次重启kafka服务端。

测试数据的写入和读取

生产者

kafka-console-producer --broker-list localhost:9092 --topic hello

消费者

kafka-console-consumer --bootstrap-server localhost:9092 --topic hello  --from-beginning

指定消费者组:

kafka-console-consumer --bootstrap-server localhost:9092 --topic hello --group group1  --from-beginning

查看消费进度:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group1

D:\jdk\kafka_2.11-2.4.1>kafka-consumer-groups--bootstrap-serverlocalhost:9092
--describe--groupgroup1

GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
group1hello0220consumer-group1-1-86024135-f662-4e63-8654-1184a315587c/192.168.40.1consumer-group1-1
group1hello1330consumer-group1-1-86024135-f662-4e63-8654-1184a315587c/192.168.40.1consumer-group1-1

查看所有的消费组:

kafka-consumer-groups --bootstrap-server localhost:9092 --list

测试生产能力:

kafka-producer-perf-test --topic test --num-records 1000000 --record-size 1000 --throughput  -1 --producer-props bootstrap.servers=localhost:9092 acks=-1

主要输出5项指标,发送消息总数,每秒发送消息数(records/sec),每秒发送消息量(MB/sec),平均延迟和最大延迟。

最后的输出表示总共发出1000000条数据,kafka 的平均吞吐量是每秒发送25058条消息,23.9 MB/秒 ,平均延时为1248.44ms,最大延时为3443.00ms,1015ms发送50%的消息,2330ms发送95%的消息,3171ms发送了99%的消息,3375ms发送了99.9%的消息。

测试消费能力:

kafka-consumer-perf-test --broker-list localhost:9092 --topic test --fetch-size 1048576 --messages 500000 --threads 3

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.s
ec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-05-06 14:46:55:323, 2020-05-06 14:47:13:000, 477.0164, 26.9851, 500188, 282
95.9778, 1588747616111, -1588747598434, -0.0000, -0.0003

具体参数解释:

  • data.consumed.in.MB:总消费大小(MB)
  • MB.sec:平均每秒消费(MB/s)
  • data.consumed.in.nMsg:总消费条数(条)
  • nMsg.sec:平均每秒消费(条)
  • rebalance.time.ms:再平衡时间(ms)
  • fetch.time.ms:fetch平均时间(ms)
  • fetch.MB.sec:fatch平均大小(MB/s)
  • fetch.nMsg.sec:fatch平均条数(条)

表示8秒内消费了477MB的消息,吞吐量为26.9851MB/s,共消费500188条消息,每秒28295.9778条。

kafka中的配置文件

参数主要分为以下三类:

broker配置文件参数
http://kafka.apache.org/documentation/#brokerconfigs
其中有一部分是topic级别的:即可以被topic创建时覆盖,参看:
http://kafka.apache.org/documentation/#topicconfigs
生产者配置文件参数
http://kafka.apache.org/documentation.html#producerconfigs
消费者配置文件参数
http://kafka.apache.org/documentation.html#consumerconfigs

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
建哥手把手系列之大数据环境搭建
windows运行kafka_2.11-2.0.0+zookeeper-3.4.12
当我说要做大数据工程师时他们都笑我,直到三个月后……
大数据需要用到的知识
2018大数据培训学习路线图(详细完整版)
kafka 集群安装与安装测试
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服