打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议 | TsonTec:测量解决方案提供者

最近真是的好一个劲的折腾,算是完全搞明白了如何在STM32上实现MQTT协议了。

目录 [显示]

一、本教程中说明的内容

先说说本文化的适用范围吧:

一、使用的芯片是STM32F103C8T6,但是并没有任何与平台相关的代码,应该在所有STM32芯片中都是可以用的。

二、本文使用的是SIM800C模块,驱动是用C++实现的,基于ARM mbed平台写的。但是从原理上来讲,C和C++差别不大,本文的代码经过修改也可以直接用于其他平台的使用。

三、本文数据传输使用的是“透传模式”,对于所有的透传模块,本文都有很大的参考意义。

二、MQTT的使用

首先,推荐一个MQTT的库:Paho,这个库支持非常多的平台,当然也包括了嵌入式平台:GitHub – paho.mqtt.embedded-c。将该库中的MQTTPacket文件夹下载下来,MQTTPacket文件夹下面主要有三个文件夹,我们使用的文件主要集中在src文件夹和samples文件夹中。

src文件夹中存放着MQTT核心功能的代码,而samples中存放着三个例子:pub00sub1、pub0sub1_nc、qos0pub和网络驱动(transport.c和transport.h)。

由于三个驱动都有一个main函数,所以无法同时存在,本文中只使用了pub0sub,所以将此文件夹内容精减到只有pub0sub1.c、transport.c、transport.h三个文件。

当然,在实现使用时可能会改变目录结构,使目录结构更加清楚,可以根据自己的喜好来进行更改,并不影响使用。

将transport.h的内容精减到以下内容:

1
2
3
4
int transport_sendPacketBuffer(unsigned char* buf, int buflen);
int transport_getdata(unsigned char* buf, int count);
int transport_open(char* host, int port);
int transport_close();

主要的工作有:

1、为了方便表示,删除了版权信息,有实际使用时请保留。

2、没有使用pub0sub1_nc这个例子,所以将transport_getdatanb方法去除。

3、透传模块中使用不到socket,所以将与socket相关的参数去掉。

这些方法实现的主要功能是:

1、transport_open的作用是初始化模块连网的信息、transport_close作用是关闭链接。

2、transport_sendPacketBuffer用于发送数据、transport_getdata用于接收数据。

然后用transport.c来实现transport.h中声明的4个函数。

三、在ARM mbed中使用MQTT

首先说句题外话,自我感觉mbed是一个非常不错的平台,很大程度上提高了代码的可重用性。但也有一个问题,就是其支持是以开发板为单位的,所以并不是对每一种芯片的支持都很好。

首先介绍一个例子,HelloMQTT – a mercurial repository | mbed。但这个例子其中有很多不完善的地方,而且该例子使用的网络驱动也不是GPRS模块。

如果要用不同的连网方式,那么就写一个驱动,驱动中至少要包含以下两个方法:

1
2
int read(unsigned char* buffer, int len, int timeout);
int write(unsigned char* buffer, int len, int timeout);

这两个方法会在MQTTClient中自动调用,timeout表示毫秒。返回值为读或写的字节数。

对此,我写了驱动程序:

MQTTGRPSEthernet.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#pragma once
#if !defined(MQTTGPRSETHERNET_H)
#define MQTTGPRSETHERNET_H
#define DEFAULT_GPRS_TIMEOUT 6000000  //6s
#define SERIAL_BUFFER_SIZE 256
#include "mbed.h"
class MQTTGPRSEthernet
{
public:
MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate = 115200);
~MQTTGPRSEthernet();
bool initNet(const char* apn, const char* userName = "", const char* passWord = "", int timeout = DEFAULT_GPRS_TIMEOUT, bool isReconnect = false);
bool connect(char* hostname, int port, int timeout = DEFAULT_GPRS_TIMEOUT);
int read_line(char* buffer, int timeout = DEFAULT_GPRS_TIMEOUT);
int read(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
int write(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
bool disconnect();
private:
bool initNet();
Serial eth;
bool command(const char* cmd, const char* ack = "");
bool connected = false;
bool initialized = false;
char* localIP;
const char *_apn;
const char *_passWord;
const char *_userName;
};
#endif

MQTTGRPSEthernet.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
#ifdef DEBUG
Serial pc(PB_10, PB_11, 115200);
#define LOG(args...)    pc.printf(args)
#define LOGC(args...)   pc.putc(args)
#else
#define LOG(args...)    (args)
#define LOGC(args...)   (args)
#endif // DEBUG
//还需要增加一个表示已经初始化成功的变量
//后续可能要增加表示6条连接的东东,如果host,port,连接状态等
MQTTGPRSEthernet::MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate)
: eth(tx, rx, baudrate){}
bool MQTTGPRSEthernet::initNet(const char* apn, const char* userName, const char* passWord, int timeout, bool isReconnect)
{
_apn = apn;
_userName = userName;
_passWord = passWord;
command("ATE0");
wait_ms(800);
command("AT+CIPMUX=0");
wait_ms(800);
//检查 GPRS 附着状态
while (!command("AT+CGATT?\r\n", "+CGATT: 1"))
{
LOG("GPRS NOT ATTACHED!\r\n");
wait_ms(1200);
eth.printf("+++");
wait_ms(1200);
command("AT");
command("AT+CIPSHUT\r\n");
}
//Select multiple connection
//单链路模式
//command("AT+CIPMUX=0");
//wait_ms(800);
//透传模式
command("AT+CIPMODE=1");
wait_ms(800);
command("AT+CIPCCFG=5,2,1024,1,0,1460,50");
wait_ms(800);
// Set APN
command(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"", apn, userName, passWord));
wait_ms(800);
LOG(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"\r\n", apn, userName, passWord));
uint32_t start = us_ticker_read();
do
{
// Brings up wireless connection
//建立无线链路(GPRS 或者 CSD)
bool flag = command("AT+CIICR", "OK");
// Get local IP address
eth.printf("AT+CIFSR\r\n");
char ip_addr_buf[32];
if (read_line(ip_addr_buf) <= 0) {
//LOG("failed to join network\r\n");
initialized = false;
}
else if (StringHelper::CheckIP(ip_addr_buf))
{
LOG("IP ADDRESS:");
LOG(ip_addr_buf);
LOG("\r\n");
localIP = ip_addr_buf;
initialized = true;
break;
}
else
{
initialized = false;
}
if (flag == false && initialized == false && isReconnect == false)
{
//表明此时建立无线链路返回error,得不到IP地址
//此时应使用AT+CIPSHUT关闭PDP上下文后再重新进行连接
//防止在数据状态
wait_ms(1200);
eth.printf("+++");
wait_ms(1200);
command("AT");
command("AT+CIPSHUT\r\n");
initialized = false;
return initNet(_apn, _userName, _passWord, timeout, true);
}
} while (us_ticker_read() - start < timeout);
//command("AT+CRPRXGET=0");
return initialized;
}
bool MQTTGPRSEthernet::initNet()
{
return initNet(_apn, _userName, _passWord);
}
bool MQTTGPRSEthernet::connect(char* hostname, int port, int timeout)
{
uint32_t start = us_ticker_read();
do
{
if (initialized == false)
initNet();
else
break;
} while (us_ticker_read() - start < timeout);
start = us_ticker_read();
do
{
//AT+CIPSTART=0,”TCP”,”116.228.221.51”,”8500”
char strPort[10];
itoa(port, strPort, 10);
char response[64] = { 0, };
int connectStart = us_ticker_read();
LOG(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
eth.printf(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
do
{
read_line(response);
if (strstr(response, "CONNECT") != NULL)
{
connected = true;
return connected;
}
} while (us_ticker_read() - connectStart < timeout);
} while (us_ticker_read() - start < timeout);
connected = false;
return connected;
}
//向SIM800C发送命令
//cmd:发送的命令字符串(不需要添加回车了),当cmd<0XFF的时候,发送数字(比如发送0X1A),大于的时候发送字符串.
//ack:期待的应答结果,如果为空,则表示不需要等待应答,对于那些并不关心返回结果的直接返回true
//此函数只能对应立即进行读取的情况。
//返回值:0,发送成功(得到了期待的应答结果)
//       1,发送失败
bool MQTTGPRSEthernet::command(const char* cmd, const char* ack)
{
char response[64] = { 0, };
if (StringHelper::EndWith(cmd, "\r\n"))
{
LOG(cmd);
eth.printf(cmd);
}
else if (StringHelper::EndWith(cmd, "\r"))
{
LOG(StringHelper::Add(cmd, "\n"));
eth.printf(StringHelper::Add(cmd, "\n"));
}
else
{
LOG(StringHelper::Add(cmd, "\r\n"));
eth.printf(StringHelper::Add(cmd, "\r\n"));
}
read_line(response);
if (strstr(response, ack) != NULL) {
return true;
}
return false;
}
int MQTTGPRSEthernet::read_line(char* buffer, int timeout)
{
int bytes = 0;
uint32_t start = us_ticker_read();
while (true) {
if (eth.readable()) {
char ch = eth.getc();
if ((ch == '\n' || ch == '\r') && bytes == 0)
{
//此时说明以\r\n开头,在无回显的情况下经常会出现
//此时忽略空行\r\n
continue;
}
if (ch == '\n') {
if (bytes > 0 && buffer[bytes - 1] == '\r') {
//表明读取到\r\n,此行结束
bytes--;
}
if (bytes > 0) {
buffer[bytes] = '\0';
return bytes;
}
}
else {
buffer[bytes] = ch;
bytes++;
}
}
else {
if ((uint32_t)(us_ticker_read() - start) > timeout) {
return bytes;
}
}
}
//此时表示读到最后一个字节还没有读到\n
//有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
return bytes;
}
int MQTTGPRSEthernet::read(unsigned char* buffer, int len, int timeout)
{
int bytes = 0;
uint32_t start = us_ticker_read();
while (bytes < len) {
if (eth.readable()) {
char ch = eth.getc();
buffer[bytes] = ch;
bytes++;
}
else {
if ((uint32_t)(us_ticker_read() - start) > timeout) {
return bytes;
}
}
}
//此时表示读到最后一个字节还没有读到\n
//有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
return bytes;
}
int MQTTGPRSEthernet::write(unsigned char* buffer, int len, int timeout)
{
uint32_t start = us_ticker_read();
while (!connected)
{
if ((us_ticker_read() - start) > timeout)
{
return -1;
}
initNet();
}
for (size_t i = 0; i < len; i++)
{
LOGC(buffer[i]);
eth.putc(buffer[i]);
}
return len;
}
bool MQTTGPRSEthernet::disconnect()
{
wait_ms(1200);
eth.printf("+++");
wait_ms(1200);
eth.printf("AT\r\n");
eth.printf("AT+CIPSHUT\r\n");
connected = false;
return true;
}
MQTTGPRSEthernet::~MQTTGPRSEthernet()
{
if (connected)
{
disconnect();
}
}

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#include "mbed.h"
#define APN         "uninet"
#define USERNAME    NULL
#define PASSWORD    NULL
#define INIT_TIMES  5            //初始化次数
#ifdef DEBUG
Serial pc2(PB_10, PB_11, 115200);
#define LOG(args...)    pc2.printf(args)
#else
#define LOG(args...)    (args)
#endif // DEBUG
#include "MQTTClient.h"
#include "MQTTmbed.h"
#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
int arrivedcount = 0;
void messageArrived(MQTT::MessageData& md)
{
MQTT::Message &message = md.message;
LOG("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
LOG("Payload %.*s\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
int main(int argc, char* argv[])
{
MQTTGPRSEthernet ethernet(PB_6, PB_7);
for (int i = 0; i < INIT_TIMES; i++)
{
if (ethernet.initNet(APN, USERNAME, PASSWORD))
break;
else
LOG("SIM CARD INIT FAILED!\r\n");
}
MQTT::Client<MQTTGPRSEthernet, Countdown> client = MQTT::Client<MQTTGPRSEthernet, Countdown>(ethernet);
char* hostname = "iot.eclipse.org";
int port = 1883;
for (int i = 0; i < INIT_TIMES; i++)
{
if (ethernet.connect(hostname, port))
break;
else
LOG("CAN NOT CONNECT TO SERVER!\r\n");
}
char* topic = "tson-topic-18669888635";
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;      
data.MQTTVersion = 3;
data.clientID.cstring = "tson-client-18669888635";
int rc;
for (int i = 0; i < INIT_TIMES; i++)
{
if ((rc = client.connect(data)) != 0)
//无法连接到MQTT服务器
LOG("CAN NOT CONNECT TO MQTT SERVER!\r\n");
else
break;
}
if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
//无法订阅
LOG("CAN NOT SUBSCRIBE THE TOPIC!\r\n");
MQTT::Message message;
   // QoS 0
char buf[100];
sprintf(buf, "Hello World!  QoS 0 message from tson\r\n");
message.qos = MQTT::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*)buf;
message.payloadlen = strlen(buf) + 1;
rc = client.publish(topic, message);
while (arrivedcount == 0)
client.yield(100000);
// QoS 1
sprintf(buf, "Hello World!  QoS 1 message from tson\n");
message.qos = MQTT::QOS1;
message.payloadlen = strlen(buf) + 1;
rc = client.publish(topic, message);
while (arrivedcount == 1)
client.yield(100000);
// QoS 2
sprintf(buf, "Hello World!  QoS 2 message from tson\n");
message.qos = MQTT::QOS2;
message.payloadlen = strlen(buf) + 1;
rc = client.publish(topic, message);
while (arrivedcount == 2)
client.yield(100000);
if ((rc = client.unsubscribe(topic)) != 0)
printf("rc from unsubscribe was %d\n", rc);
if ((rc = client.disconnect()) != 0)
printf("rc from disconnect was %d\n", rc);
ethernet.disconnect();
return 0;
}

client的yield函数中用调用messageArrived函数,之前的示例给出的是100,可能是由于间太短的缘故,总是调用不了回调函数,所以我将其改的非常大,便于调试。实际使用时可以使用1000。

四、总结

其实paho embeded-c用起来还是挺方便的,但是代码的重要性不高,所以移植起来往往会让人无从下手。但深放研究就会发现其实使用起来是非常简单的。

—————————–可爱的分割线——————————————————

有路过的吃瓜群众说想要StringHelper这个类,当时这个类用处不大,里面多数功能都可以用std::string中的功能来实现。不过自己刚从C转过来自己并不太了解,所以自己又写了一个,功能并不是很完善。也分享给大家:StringHelper

 

原创文章,转载请注明: 转载自TsonTec:测量解决方案提供者

本文链接地址: 在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

相关主题

  • 2017-05-28 阿里云IoT套件中MQTT协议的使用 (0)
    今天,阿里云给我发来邮件,说我申请的IoT套件已经可以申请了,虽然我不记得自己什么时候申请过了,但请我试用就试用一下呗。我看这个阿里云的这个东东有两个节点,一 […]
  • 2017-05-12 通过Paho客户端接入OneNet (1)
    首先说本文想要说明一个什么问题:OneNet平台支持MQTT协议,但给的资料非常有限。而Paho是一个开源的,MQTT的各种服务器、客户端的集成。本文要做的就 […]
  • 2016-02-07 使用代理(有验证)连接TcpClient (2)
    先分享代码给大家:static TcpClient connectViaHTTPProxy( string targetHost, […]
  • 2016-03-01 Git使用心得之在线管理 (0)
    本文是在Git使用心得之Git与GitHub的关系和Git使用心得之本地管理两篇文章的基础上进行的,有不明白的可以参照上面两篇文章。 首先给 […]
  • 2016-03-01 Git使用心得之本地管理 (2)
    如果对Git没有完整的概念,大家可以参考:Git使用心得之Git与GitHub的关系 首先把所有工具都安装上去: 1、Git:h […]
  • 2017-05-15 VisualGDB项目的移植问题 (0)
    很多人可能还不知道VisualGDB是什么东东,我给大家普及一下(自己的理解,不对勿喷):1、VisualGDB是一个Visual […]
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Mosquitto源码分析(一)mosquitto简介
事件处理循环(uloop.c/h)
How to implement a timer event handler callba...
.NET实现模拟MQTT通信(业务逻辑以及关键实现)
CNetClient
H264视频编码成MP4文件
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服