消息队列概述
消息队列是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。
消息队列提供了异步的通信协议,消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。
Producer:消息生产者,负责产生和发送消息到 Broker。
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue。
Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。
MQ的基础特性,MQTT也有:
异步 --- 消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。
解耦 --- 消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。
广播 --- 消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。
流量削峰与流控 --- 当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的”载体”。在下游有能力处理的时候,再进行分发与处理。
1.精简,不添加可有可无的功能。以发布/订阅(Pub/Sub)模式为中心,方便消息在传感器之间传递。
只需要创建一个Client就够了,通过Client可以调用所以的方法和属性
运用MQTT协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景,如下图所示:
2.把传输量降到最低以提高传输效率。因为MQTT协议的头很小。
消息架构如下:
把低带宽、高延迟、不稳定的网络等因素考虑在内。使用的是TCP协议。
支持连续的会话控制。对于需要实时监控的设备,可以建立连续的会话。
假设数据不可知,不强求传输数据的类型与格式,保持灵活性。
3.MQTT提供层级主题:
MQTT是通过主题对消息进行分类的,本质上就是一个UTF-8的字符串,不过可以通过反斜杠表示多个层级关系。主题并不需要创建,直接使用就是了。
主题还可以通过通配符进行过滤。其中,+可以过滤一个层级,而#只能出现在主题最后表示过滤任意级别的层级。
举个例子:
注意,MQTT允许使用通配符订阅主题,但是并不允许使用通配符广播。
4.MQTT提供多个QoS选项(exact once、at least once、at most once):
为了满足不同的场景,MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:
级别0:尽力而为。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
级别1:至少一次。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
级别2:恰好一次。保证这种语义肯定会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。
订阅: subscribe (topic , qos = 0 )
发布:publish(topic , payload = None , qos = 0 , retain = False )
5.MQTT代理加上了对WebSockets的支持,可以方便地实现如下场景:
6.MQTT提供了多个层次的安全特性:
应用层的安全特性设置密码:
Client.username_pw_set(username, password=None)
验证密码:
auth = {'username':“<username>”,'password':“<password>”}
7.设置了一个Will,如果客户端异常断开链接,borker会发送一个遗嘱信息
will_set (topic , payload = None , qos = 0 , retain = False )
设置要发送给经纪人的遗嘱。如果客户端在未调用disconnect()的情况下 断开连接,则代理将代表其发布消息。
一个Client异常断开连接的时候或者Server处理失败的时候,Server会把Client的will信息,当做Publish处理,Publish的topic就是Will topic,消息就是will message。
8.当执行loop时,才能对消息进行处理
loop(timeout=1.0, max_packets=1)
如果未调用它们,则不会处理传入的网络数据,并且可能无法及时发送传出的网络数据。
9.MQTT为开发者提供了大量的回调函数,实时的可以监控当前状态,可以对发生的事件进行及时的应对。
对于异常的状态信息进行返回,比如publish。
10.对于只需要建立一次连接,发送大量数据的情况,MQTT提供了simple方法供开发者调用。
single(topic, payload=None, qos=0, retain=False, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")
联系客服