打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
MQTT协议之生产者消费者实现

上一节我们讲解了mosquitto的安装和使用,下面我们使用手机测试一下。将AndroidPushNotificationsDemomosquitto的服务器IP和port修改为对应的地址,再讲apk安装到手机中,通过命令行发送消息到手机。apk demo中订阅的地址为tokudu/yzq124.

# mosquitto_pub -h 192.168.36.102 -p 1883 -q 0 -t tokudu/yzq124 -m  "MQTT 测试文本"

手机收到推送消息:


下面我们使用java实现生产者和消费者(MQTT使用wmqtt.jar,需导入java项目中):

1.消息处理handler,实现MqttSimpleCallback,也可以对高级回调接口实现MqttAdvancedCallback

  1. package cn.smartslim.mqtt.demo.wmqtt;  
  2.   
  3. import com.ibm.mqtt.MqttSimpleCallback;  
  4.   
  5. //简单回调函数,处理server接收到的主题消息   
  6. public class SimpleCallbackHandler implements MqttSimpleCallback {  
  7.   
  8.     /**  
  9.      * 当客户机和broker意外断开时触发 可以再此处理重新订阅  
  10.      */  
  11.     public void connectionLost() throws Exception {  
  12.          System.out.println("客户机和broker已经断开");    
  13.     }  
  14.   
  15.     public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {  
  16.          System.out.println("订阅主题: " + topicName);    
  17.          System.out.println("消息数据: " + new String(payload));    
  18.          System.out.println("消息级别(0,1,2): " + Qos);    
  19.          System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "    
  20.                  + retained);    
  21.     }    
  22. }  
2.生产者(发布消息):

  1. package cn.smartslim.mqtt.demo.wmqtt;  
  2.   
  3. import com.ibm.mqtt.MqttClient;  
  4. import com.ibm.mqtt.MqttException;  
  5.   
  6. //生产者  可以发送与接收消息   
  7. public class Producer {  
  8.     private final static String CONNECTION_STRING = "tcp://192.168.36.102:1883";    
  9.     private final static boolean CLEAN_START = true;    
  10.     private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s    
  11.     private final static String CLIENT_ID = "producer";// 客户端标识    
  12.     private final static int[] QOS_VALUES = { 0, 2};// 对应主题的消息级别    
  13.     private final static String[] TOPICS = { "consumer/topic","tokudu/yzq124"};    
  14.       
  15.     private static Producer instance = new Producer();    
  16.       
  17.     private MqttClient mqttClient;    
  18.     /**  
  19.      * 返回实例对象  
  20.      *   
  21.      * @return  
  22.      */    
  23.     public static Producer getInstance() {    
  24.         return instance;    
  25.     }    
  26.     /**  
  27.      * 重新连接服务  
  28.      */    
  29.     private void connect() throws MqttException {    
  30.         System.out.println("connect to mqtt broker.");    
  31.         mqttClient = new MqttClient(CONNECTION_STRING);    
  32.         System.out.println("***********register Simple Handler***********");    
  33.         SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();    
  34.         mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法  可以做为消费者  
  35.         mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);    
  36.         System.out.println("***********subscribe receiver topics***********");    
  37.         mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题    
  38.     
  39.         System.out.println("***********CLIENT_ID:" + CLIENT_ID);    
  40.         /**  
  41.          * 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息  
  42.          */    
  43.         mqttClient.publish(CLIENT_ID+"/keepalive", "keepalive".getBytes(), QOS_VALUES[0],    
  44.                 true);// 增加心跳,保持网络通畅    
  45.     }    
  46.       
  47.     /**  
  48.      * 发送消息  
  49.      *   
  50.      * @param clientId  
  51.      * @param messageId  
  52.      */    
  53.     public void sendMessage(String clientId, String message) {    
  54.         try {    
  55.             if (mqttClient == null || !mqttClient.isConnected()) {    
  56.                 connect();    
  57.             }    
  58.             System.out.println("send message to " + clientId + ", message is "    
  59.                     + message);    
  60.             // 发布自己的消息    
  61.             mqttClient.publish(clientId, message.getBytes(),    
  62.                     0, false);    
  63.         } catch (MqttException e) {    
  64.              System.out.println(e.getCause());    
  65.             e.printStackTrace();    
  66.         }    
  67.     }    
  68.       
  69.     public static void main(String[] args) {    
  70.       // new Producer().sendMessage("tokudu/yzq124", "java程序发的消息");    
  71.        new Producer().sendMessage("consumer/topic", "java程序发的消息");    
  72.     }    
  73. }  
3.消费者(订阅消息):

  1. package cn.smartslim.mqtt.demo.wmqtt;  
  2.   
  3. import com.ibm.mqtt.MqttClient;  
  4. import com.ibm.mqtt.MqttException;  
  5.   
  6. //消费者  
  7. public class Consumer {  
  8.     private final static String CONNECTION_STRING = "tcp://192.168.36.102:1883";    
  9.     private final static boolean CLEAN_START = true;    
  10.     private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s    
  11.     private final static String CLIENT_ID = "consumer";// 客户端标识    
  12.     private final static int[] QOS_VALUES = { 0};// 对应主题的消息级别    
  13.     private final static String[] TOPICS = { "consumer/topic"};    
  14.       
  15.     /**  
  16.      * 重新连接服务  
  17.      */    
  18.     public void connect() throws MqttException {    
  19.         System.out.println("connect to mqtt broker.");    
  20.         MqttClient mqttClient = new MqttClient(CONNECTION_STRING);    
  21.         System.out.println("***********register Simple Handler***********");    
  22.         mqttClient.registerSimpleHandler(new MyMqttAdvancedCallback());// 注册接收消息方法  可以做为消费者  
  23.         mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);    
  24.         System.out.println("***********subscribe receiver topics***********");    
  25.         mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题    
  26.     
  27.         System.out.println("***********CLIENT_ID:" + CLIENT_ID);    
  28.         /**  
  29.          * 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息  
  30.          */    
  31.         mqttClient.publish(CLIENT_ID+"/keepalive", "keepalive".getBytes(), QOS_VALUES[0],    
  32.                 true);// 增加心跳,保持网络通畅    
  33.     }    
  34.       
  35.     public static void main(String[] args) throws MqttException {    
  36.         new Consumer().connect();  
  37.     }    
  38. }  
运行一下,看一下手机和消费者是否收到消息吧。。

说明:

MQTT 成功连接之后,就可以发布消息了。应用程序通过 MQTT 客户对象发布消息。发布消息的方法签名是 int publish(String, MqttPayload, byte, Boolean)。这四个参数详细解释如下:
  String:主题参数的类型是字符串,代理用此字符串来针对订阅者的兴趣(使用之前描述的订阅主题语法指定)匹配发布。
  MqttPayload:第二个参数是一个 MqttPayload 对象。这个 MqttPayload 对象包含应用程序数据和任何此发布的协议头。此外,还提供了一个偏移量以便应用程序能够决定数据在 MqttPayload 中从何处开始。这实现了对底层字节数组的访问,而无需在数据写到网络后创建额外的副本。提供这种访问是为了在对象构成后和进行传输之前直接在载荷内操纵数据。
  Byte:第三个参数,是此发布的服务质量(Quality of Service,QoS)。QoS 有三级:0、1 或 2:
  QoS 为 0 表明发布程序和代理都只尝试消息的一次传递,不会采取任何额外的步骤,亦不会超越 TCP/IP 去确保消息的传递。这一级有时又称作 “发后即忘”,原因是向目的地发送消息后并不验证消息的接收。
  QoS 为 1 表明消息会被确保传递到代理;但消息可能会传递多次。
  QoS 为 2 则告知 MQTT 传递消息且只传递一次。
  QoS 级别增加会导致额外的处理器和网络开销。QoS 的设置会影响到消息传递解决方案的整体可扩展性。而且还会增加客户机存储未传递消息的负担。因而,在为每个所发布的消息选择合适的 QoS 级别时要格外注意。总的来说,若非迫切的传递保证所需,尽量使用较低级别的 QoS。随消息提供的 QoS 值指定了在客户机和代理之间发布消息的服务质量。此外,该值还决定了代理用来向其订阅者发布消息的最大 QoS 级别。
  订阅者可以基于每个主题指定消息传递的最大 QoS,所以若消息在 QoS 2 发布,此消息有可能不是在该级别发布给其订阅者。订阅者可以请求为它所收到的消息提供降级 QoS。也许您会觉得消息的端到端 QoS 不由发布程序控制多少有些奇怪,但这样做会提高消息使用者的灵活性。当发布后的消息发送给订阅者时,代理会以订阅过程中由此订阅者指定的最高的 QoS 级别或是以此发布消息的 QoS 级别(如果此级别更低)传递此消息。例如,以 QoS 2 向订阅者发布的消息,若订阅者指定此主题的级别为 QoS 1,那么主题就会以 QoS 1 传递。对于同样级别的标题,若向相同的订阅者发布 QoS 0 级别的消息,那么此消息就会以 QoS 0 级别发送给订阅者。
  Boolean:第四个参数是 Boolean 标志,该标志表明它是否是保留发布。保留发布存在于代理之内,作为针对给定主题收到的最后一个消息。保留发布让后续订阅者可以在订阅之后立即收到关于某主题的最新消息,即使是他们在此消息发布之后才连接。对于启动之后即刻填充某个显示应用程序并随后用对此信息的后续更改更新它,这种方式十分有效。如果此标志设为 false,只有当前订阅到该主题的订阅者可以收到消息。清单 2 中所示的示例使用的是一个非保留发布。
  这种发布方法会返回一个整型消息 ID。它可以与已注册的 MqttAdvancedCallback 方法结合使用来检查消息是否已由代理接收。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
采用MQTT协议实现Android消息推送
MQTT moquette 的 Blocking API 发布消息服务端使用
构建Android Push Notification Service服务端及客户端[含代码] - 48 views
mqtt协议 学习笔记
MQTT协议的简单介绍和服务器的安装
如何在Arduino开发板上使用基本MQTT
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服