MQTT
消息队列遥测传输 (Message Queuing Telemetry Transport) 是 ISO 标准 (ISO/IEC PRF 20922) 下基于 发布(Publish)/订阅(Subscribe)
范式的消息协议。MQTT
协议是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的,具有轻量、简单、开放和易于实现的特点,这些特点使它适用范围非常广泛,如 M2M
机器与机器通信和 IoT
物联网。
1. MQTT 概述
1.1 MQTT 网络实体
如图 1.1 所示,MQTT
协议定义了两种网络实体:消息代理 (Message Broker)
与 客户端 (Client)
。其中,消息代理
用于接收来自 客户端
的消息并转发至 目标客户端
,客户端
可以是任何运行有 MQTT
库并通过网络连接至 消息代理
的设备,客户端
既可以是 发布方
也可以是 订阅方
。
1.2 MQTT 消息
MQTT
传输的消息分为 主题 (Topic)
和 载荷 (Payload)
两部分。载荷
是消息携带的具体数据内容; 主题
是用于管理信息传输的,消息代理
接收到 发布者 (客户端)
发布
的消息后会向 订阅
此 主题
的 客户端
分发此消息,如果此 主题
没有任何 订阅
且 发布者
没有将其标记为 保留消息 (Retained)
,那么 消息代理
就会丢弃此消息。
1.3 MQTT 服务质量
MQTT
服务质量 QoS (Quality of Service)
指的是消息发布的服务质量,而不是消息接收的服务质量。MQTT
有三种消息发布服务质量:
- QoS 0:最多一次传送,即只负责发送,发送过后就不管数据的发送情况;
- QoS 1:至少一次传送,确认数据交付,需要握手2次,即发送方要接收到接收方发送的确认收到的回复消息,如图 1.2 所示;
- QoS 2:正好一次传送,确认数据交付,需要握手4次,即接收方需要收到发送方发送的确认收到接收方回复的消息,如图 1.3 所示。
2. MQTT 简单使用
Eclipse Paho 是一个 MQTT
协议的可靠开源实现,要在 ` Java 中使用
Eclipse Paho`,需要先导入依赖包,最新依赖包版本可查看 Eclipse Paho GitHub 项目主页。
<dependencies>
......
<dependency>
<!-- MQTT v3 -->
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
新建类 MqttClientDemo.java
,实现一个 MQTT
客户端的演示程序,如下:
package org.example;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Arrays;
/**
* MQTT 客户端演示
*/
public class MqttClientDemo {
/** MQTT 客户端 */
private IMqttClient mMqttClient;
private final MqttCallback mqttCallback = new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("与服务器连接丢失, errorMsg = " + throwable.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
System.out.println("收到服务器消息, topic = " + topic + ", payload = " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
System.out.println("消息已传递完成且已收到所有确认, topic = " + Arrays.toString(token.getTopics()) + ", payload = " + new String(token.getMessage().getPayload()));
} catch (MqttException e) {
throw new RuntimeException(e);
}
};
/**
* 创建 MQTT 客户端
* @param serverURI - 代理服务器的地址
*/
public void createClient(String serverURI) {
String clientId = MqttClient.generateClientId();
MqttClientPersistence persistence = new MemoryPersistence();
try {
// serverURI - 服务器的地址。支持两种类型的连接 tcp:// 用于 TCP 连接,ssl:// 用于受 SSL/TLS 保护的 TCP 连接。
// clientId - 连接到代理时使用的唯一客户端 ID 字符串。serverURI 参数通常与 clientId 参数一起使用以形成密钥,用于在传递消息时存储和引用消息。
// persistence - 用于存储传输中的消息的持久化类。如果为空,则使用默认的持久化机制。
mMqttClient = new MqttClient(serverURI, clientId, persistence);
System.out.println("创建 MQTT 客户端: serverURI = " + serverURI + ", clientId = " + clientId);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 通过用户名密码连接到 MQTT 代理服务器
* @param username - 用户名
* @param password - 密码
*/
public void connect(String username, String password) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
// 返回如果连接丢失,客户端是否会自动尝试重新连接到服务器
options.setAutomaticReconnect(true);
// 设置客户端和服务器是否应在重新启动和重新连接时记住状态。
options.setCleanSession(true);
// 设置回调用于异步发生的事件,如收到新消息、消息发送成功、与服务器连接丢失
mMqttClient.setCallback(mqttCallback);
try {
mMqttClient.connect(options);
System.out.println("通过用户名密码连接到 MQTT 代理服务器: username = " + username + ", password = " + password);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布消息
* @param topic - 主题
* @param payload - 负载,实际使用的消息内容
* @param qos - 消息发布的服务质量,QoS0 - 0,QoS1 - 1,QoS2 - 2
* @param retain - 发布消息是否应由代理服务器保留
* @return boolean - 消息是否发布成功
*/
public boolean publish(String topic, String payload, int qos, boolean retain) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(retain);
try {
mMqttClient.publish(topic, mqttMessage);
System.out.println("发布消息: topic = " + topic + ", payload = " + payload);
return true;
} catch (MqttException e) {
e.printStackTrace();
return false;
}
}
/**
* 订阅一个主题
* @param topic - 要订阅的主题,可以包含通配符
* @param qos - 订阅的最高服务质量,以较低服务质量发布的消息将以发布的 QoS 接收
*/
public void subscribe(String topic, int qos) {
try {
mMqttClient.subscribe(topic, qos);
System.out.println("订阅一个主题: topic = " + topic + ", qos = " + qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 退订一个主题
* @param topic - 要退订的主题,必须与订阅中指定的主题匹配
*/
public void unsubscribe(String topic) {
try {
mMqttClient.unsubscribe(topic);
System.out.println("退订一个主题: topic = " + topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 断开与服务器的连接
*/
public void disConnect() {
try {
mMqttClient.disconnect();
System.out.println("断开与服务器的连接");
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 重新连接服务器
*/
public void reConnect() {
try {
mMqttClient.reconnect();
System.out.println("重新连接服务器");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
有了 MQTT
客户端程序后,还需要一个 MQTT
代理服务器来转发消息,本文使用 Mosquitto 来搭建 MQTT
代理服务器,Mosquitto
项目主页:https://github.com/eclipse/mosquitto。
安装并运行 Mosquitto
后则运行 MQTT
客户端程序,至少需要两个客户端,一个作为订阅者,一个作为发布者。订阅者演示程序如下所示:
MqttClientDemo client = new MqttClientDemo();
client.createClient("tcp://localhost:1883");
client.connect("user", "123456");
new Thread() {
@Override
public void run() {
super.run();
try {
Thread.sleep(5000);
client.subscribe("test", 0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}.start();
发布者演示程序如下所示:
MqttClientDemo client = new MqttClientDemo();
client.createClient("tcp://localhost:1883");
client.connect("user", "123456");
new Thread() {
@Override
public void run() {
super.run();
try {
Thread.sleep(5000);
client.subscribe("test", 0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}.start();
遵循先运行订阅者,后运行发布者的顺序,得到控制台打印结果如下:
# 订阅者演示程序打印结果
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1174155411898200
通过用户名密码连接到 MQTT 代理服务器: username = user, password = 123456
订阅一个主题: topic = test, qos = 0
收到服务器消息, topic = test, payload = test message
# 发布者演示程序打印结果
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1174159650496000
通过用户名密码连接到 MQTT 代理服务器: username = user1, password = 123456
发布消息: topic = test, payload = test message
消息已传递完成且已收到所有确认, topic = [test], payload = test message
3. MQTT 结构
如图 3.1 所示,MQTT
报文由 固定报头
、可变报头
、有效载荷
组成,其中所有报文都包含 固定报头
,而 可变报头
和 有效载荷
只在部分报文中出现 。本章节基于 MQTT v3.1.1
简述部分报文类型,更多报文类型或其他 MQTT
协议版本可查看官方文档。
3.1 固定报头
如图 3.2 所示,固定报头长度为 16 位,2 字节。第 1 位字节的高 4 位为 MQTT 报文类型
, 低 4 位为 标志位
,不同的 报文类型
下 标志位
有不同的含义。
MQTT
的各个 报文类型
如下表所示:
报文类型 | 类型值 | 方向 | 描述 |
---|---|---|---|
Reserved | 0 | N/A | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 双向 | 发布消息 |
PUBACK | 4 | 双向 | QoS=1 时消息发布收到确认 |
PUBREC | 5 | 双向 | QoS=2 时发布收到(保证交付第一步) |
PUBREL | 6 | 双向 | QoS=2 时发布释放(保证交付第二步) |
PUBCOMP | 7 | 双向 | QoS=2 时消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 客户端到服务端 | 客户端断开连接 |
Reserved | 15 | N/A | 保留 |
MQTT
各个 报文类型
下的 标志位
含义如下表所示:
报文类型 | 报头标志 | Bit 3 | Bit 2 | Bit 1 | Bit 0 |
---|---|---|---|---|---|
CONNECT | 保留 | 0 | 0 | 0 | 0 |
CONNACK | 保留 | 0 | 0 | 0 | 0 |
PUBLISH | 用于 MQTT 3.1.1 | DUP | QoS | QoS | RETAIN |
PUBACK | 保留 | 0 | 0 | 0 | 0 |
PUBREC | 保留 | 0 | 0 | 0 | 0 |
PUBREL | 保留 | 0 | 0 | 1 | 0 |
PUBCOMP | 保留 | 0 | 0 | 0 | 0 |
SUBSCRIBE | 保留 | 0 | 0 | 1 | 0 |
SUBACK | 保留 | 0 | 0 | 0 | 0 |
UNSUBSCRIBE | 保留 | 0 | 0 | 1 | 0 |
UNSUBACK | 保留 | 0 | 0 | 0 | 0 |
PINGREQ | 保留 | 0 | 0 | 0 | 0 |
PINGRESP | 保留 | 0 | 0 | 0 | 0 |
DISCONNECT | 保留 | 0 | 0 | 0 | 0 |
其中:
- DUP:重复发布标志,此标志为
0
表示是第一次发布消息,为1
表示对消息的重新发布; - QoS:消息发布的服务质量,详见 1.3 节;
- RETAIN:发布消息保留标志,此标志为
1
时服务器必须存储客户端发布的PUBLISH
数据包及其QoS
,后面有客户端建立新订阅时必须把主题上的最后保留消息发送给订阅者。
剩余长度
字段使用一个变长度编码方案,且 剩余长度
不包括用于编码 剩余长度
字段本身的字节数。 剩余长度
最小 1
字节,最大 4
个字节,每个字节的最高位为标志位, 标志位为 1
时表示 剩余长度
字段有更多的字节,为 0
时表示 剩余长度
字段没有更多的字节。
单字节的 剩余长度
字段最大值为 0111 0000b = 0x7f = 127
,从第 2
个字节开始,每字节的单位大小为 $128^{n-1}$,例如十进制数 456
可被拆分为 72 + 3 × 128
,此时 剩余长度
字段长度为 2 字节,即 0x48 0x03
。此变长度编码方案的取值范围及其计算如下表所示。
字节数 | 最小值 | 最大值 |
---|---|---|
1 | 0x00 = 0 | 0x7f = 127 |
2 | 0x80,0x01 = 0×1 + 1×128 = 128 | 0xff, 0x7f = 127×1 + 127×128 = 16,383 |
3 | 0x80, 0x80, 0x01 = 0×1 + 0×128 + 1×128^2 = 16,384 |
0xff, 0xff, 0x7f = 127×1 + 127×128 + 127×128^2 = 2,097,151 |
4 | 0x80, 0x80, 0x80, 0x01 = 0×1 + 0×128 + 0×128^2 +1×128^3 = 2,097,152 |
0xff, 0xff, 0xff, 0x7f = 127×1 + 127×128 + 127×128^2 + 127×128^3 = 268,435,455 |
3.2 可变报头
某些 MQTT
控制报文中包含一个 可变报头
,可变报头
的内容根据 报文类型
的不同而不同。如图 3.3 所示,大部分控制报文的 可变报头
部分都包含一个两字节的 报文标识符 (Packet Identifier)
字段,报文标识符
用于保证报文的唯一性以用于报文重发,当客户端处理完此报文对应的确认操作后,这个 报文标识符
就可被重用。
SUBSCRIBE
,UNSUBSCRIBE
,PUBLISH (QoS > 0)
报文必须包含一个非零的 16 位 报文标识符
,而 PUBLISH (QoS = 0)
报文不能包含 报文标识符
。是否包含 报文标识符
的控制报文如下表所示:
报文类型 | 是否包含报文标识符 |
---|---|
CONNECT | 否 |
CONNACK | 否 |
PUBLISH | 是 (Qos > 0),Qos = 0 时不包含 |
PUBACK | 是 |
PUBREC | 是 |
PUBREL | 是 |
PUBCOMP | 是 |
SUBSCRIBE | 是 |
SUBACK | 是 |
UNSUBSCRIBE | 是 |
UNSUBACK | 是 |
PINGREQ | 否 |
PINGRESP | 否 |
DISCONNECT | 否 |
3.3 CONNECT
CONNECT
报文用于客户端请求连接到服务端,在一个网络连接上,客户端只能发送一次 CONNECT
报文;若服务端收到客户端发送的第二个 CONNECT
报文必须当作协议违规处理并断开与客户端的连接。
CONNECT 固定报头
CONNECT 可变报头
前 6 个字节为固定值,字节 7 在本文中值为 4
, 表示使用的是 MQTT v3.1.1
,字节 8 是连接标志,其中:
- 用户名标记:此标记为
0
时,有效载荷中不能包含用户名字段,此标记为1
时,有效载荷中必须包含用户名字段; - 密码标记:此标记为
0
时,有效载荷中不能包含密码字段,此标记为1
时,有效载荷中必须包含密码字段; - Retain:如果遗嘱消息被发布时需要保留,需要设置此位为
1
; - QoS:指定发布遗嘱消息时使用的服务质量等级,如果遗嘱标记为
0
,QoS
也必须为0
,如果遗嘱标记为1
,QoS
可以为0
、1
、2
; - 遗嘱标记:遗嘱标记
Will Flag
被设置为 1,表示如果连接请求被接受了,遗嘱消息Will Message
必须被存储在服务端中且与这个网络连接关联。当此网络连接关闭时,服务端必须发布这个遗嘱消息,除非服务端收到DISCONNECT
报文时删除了这个遗嘱消息; - 清理会话:清理会话标记
Clean Session
,此标记指定了会话状态的处理方式。此标记为0
时服务端必须基于当前会话的状态恢复与客户端的通信,此标记为1
时客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。
CONNECT 有效载荷
CONNECT
报文的有效载荷包含一个或多个以长度为前缀的字段,可变报头中的标志决定是否包含这些字段。如果包含的话,必须按 客户端标识符
,遗嘱主题
,遗嘱消息
,用户名
,密 码
的顺序出现。图 3.5 展示了仅包含 客户端标识符
,用户名
,密 码
字段的有效载荷。
3.4 CONNACK
服务端发送 CONNACK
报文响应从客户端收到的 CONNECT
报文,服务端发送给客户端的第一个报文必须是 CONNACK
。如果客户端在一定时间内没有收到服务端的 CONNACK
报文,客户端应该关闭网络连接。
CONNACK 固定报头
CONNACK 可变报头
其中:
-
SP:当前会话标记
Session Present
。如果服务端收到清理会话标记Clean Session
为1
的连接,需要将CONNACK
报文中的SP
和连接返回码
设置为0
。如果服务端收到清理会话标记Clean Session
为0
的连接,SP
标记值取决于服务端是否已经保存了clientId
对应客户端的会话状态,如果服务端已经保存了会话状态,需要将CONNACK
报文中的SP
和连接返回码
设置为1
;如果服务端没有已保存的会话状态,需要将CONNACK
报文中的SP
和连接返回码
设置为0
。 -
连接返回码:连接返回码字段使用一个字节的无符号值,描述了对此连接的响应结果,如下表所示,如果下表中的所有连接返回码都不太合适,那么服务端必须关闭网络连接,不需要发送
CONNACK
报 文。返回码 名称 描述 0x00 连接已接受 连接已被服务端接受 0x01 连接已拒绝 服务端不支持客户端请求的 MQTT 协议级别 0x02 连接已拒绝 客户端标识符是正确的 UTF-8 编码,但服务端不允许使用 0x03 连接已拒绝 网络连接已建立,但 MQTT 服务不可用 0x04 连接已拒绝 用户名或密码的数据格式无效 0x05 连接已拒绝 客户端未被授权连接到此服务器 0x06 ~ 0xFF 保留 保留
CONNACK 有效载荷
CONNACK
报文没有有效载荷。
3.5 PUBLISH
PUBLISH
控制报文用于从客户端向服务端或者服务端向客户端传输一个消息。
PUBLISH 固定报头
其中:
- DUP:重复发布标志,此标志为
0
表示是第一次发布消息,为1
表示对消息的重新发布; - QoS:消息发布的服务质量,详见 1.3 节;
- RETAIN:发布消息保留标志,此标志为
1
时服务器必须存储客户端发布的PUBLISH
数据包及其QoS
,后面有客户端建立新订阅时必须把主题上的最后保留消息发送给订阅者。
PUBLISH 可变报头
只有当 Qos > 0
时可变报头才会包含 报文标识符 (Packet Identifier)
字段。
PUBLISH 有效载荷
PUBLISH
报文有效载荷包含将被发布的消息内容,$有效载荷的长度 = 固定报头中的剩余长度 - 可变报头的长度$, 有效载荷可为空。
3.6 SUBSCRIBE
客户端向服务端发送 SUBSCRIBE
报文用于创建一个或多个订阅。服务端收到 PUBLISH
报文后,需要将此 PUBLISH
报文转发给订阅了相应主题的客户端。
SUBSCRIBE 固定报头
SUBSCRIBE 可变报头
SUBSCRIBE 有效载荷
SUBSCRIBE
报文的有效载荷包含了一个主题过滤器列表,它们表示客户端想要订阅的主题,$有效载荷的长度 = 固定报头中的剩余长度 - 可变报头的长度$,主题过滤器列表中的每一个项结构如图 3.12 所示。
3.7 SUBACK
服务端发送 SUBACK
报文给客户端,用于确认它已收到并且正在处理 SUBSCRIBE
报文。 SUBACK
报文包含一个返回码,它们指定了 SUBSCRIBE
请求的每个订阅被授予的最大 QoS
等级。
SUBACK 固定报头
SUBACK 可变报头
SUBACK 有效载荷
有效载荷包含一个返回码列表,每个返回码对应等待确认的 SUBSCRIBE
报文中的一个主题过滤器。返回码的顺序必须和 SUBSCRIBE
报文中主题过滤器的顺序相同。返回码列表中的每一个项结构如图 3.15 所示。
3.8 PINGREQ
客户端发送 PINGREQ
报文给服务端的,用于:
- 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着;
- 请求服务端发送响应确认告知客户端服务端还活着;
- 使用网络以确认网络连接没有断开。
客户端发送 PINGREQ
报文的时间间隔是在 CONNECT
请求连接报文中的可变报头中的保持连接 Keep Alive
字段。
PINGREQ 固定报头
PINGREQ 可变报头
PINGREQ
报文没有可变报头。
PINGREQ 有效载荷
PINGREQ
报文没有有效载荷。
3.9 PINGRESP
服务端发送 PINGRESP
报文响应客户端的 PINGREQ
报文,表示服务端还活着。
PINGRESP 固定报头
PINGRESP 可变报头
PINGRESP
报文没有可变报头。
PINGRESP 有效载荷
PINGRESP
报文没有有效载荷。
4. 实现 MQTT 代理
基于第 3 节简述的部分报文类型,使用 Socket
实现一个简单的 MQTT
代理服务器。读取 MQTT 固定报头
的关键代码如下:
try {
InputStream is = mSocket.getInputStream();
OutputStream os = mSocket.getOutputStream();
// ...忽略部分变量定义...
while (true) {
read = is.read(buffer, 0, 1); // 读取报文第 1 个字节
length = BitUtils.decodeRemainingLength(is); // 解码固定报头中的剩余长度字段
type = BitUtils.getHigh4bit(buffer[0]); // 读取报文第 1 个字节中的高 4 位, 即报文类型
flag = BitUtils.getLow4bit(buffer[0]); // 读取报文第 1 个字节中的低 4 位, 即报文类型标记
System.out.println("\n固定报头:类型 = " + type + ", flag = " + flag + ", length = " + length);
}
} catch (IOException e) {
e.printStackTrace();
}
由于 可变报头
的内容根据 报文类型
的不同而不同,所以需要先确认报文类型后再依据协议规范读取。本文中仅简单实现部分报文,分别为:CONNECT
、CONNACK
、PUBLISH (QoS = 0)
、SUBSCRIBE
、SUBACK
、PINGREQ
、PINGRESP
,对各个报文类型的判断和回复关键代码如下:
if (type == 1) {
// CONNECT - 客户向服务器发送的连接请求
// 回复连接确认报文 CONNACK,固定报头 2 字节 + 可变报头 2 字节
// 可变报头内容为连接确认标记 0 和连接确认码 0
os.write(new byte[] { 0x20, 0x02, 0x00, 0x00 });
os.flush();
} else if (type == 3) {
// PUBLISH - 发布报文,忽略 QoS
// qos = 0, 无需回复 PUBACK
} else if (type == 8 && flag == 2) {
// SUBSCRIBE - 订阅主题
// 回复订阅主题报文 SUBACK, 总长度为 固定报头 2 字节 + 可变报头 2 字节 + 1字节 * 订阅主题过滤器列表大小
byte[] subacks = new byte[ 2 + 2 + topics.size() ];
// 固定报头 2 字节
// 由于 java 中没有无符号类型,所以 0x90 超出了 byte 的取值范围,强转处理即可
subacks[0] = (byte)0x90; // 类型:SUBACK, 标记: 0
subacks[1] = (byte)(2 + topics.size()); // 剩余长度 = 可变报头 2 字节 + 主题过滤器列表大小 * 1字节, 此处忽略剩余长度大于 127 的情况
// 可变报头 2 字节 - 报文标识符 = 0x0001
subacks[2] = 0x00;
subacks[3] = 0x01;
// 有效载荷
for(int i = 4; i < subacks.length; i ++) {
// 简单处理,统一设置回复主题过滤器 Qos = 0
subacks[i] = 0x00;
}
// 回复 SUBACK
os.write(subacks);
os.flush();
} else if (type == 12) {
// PINGREQ – 心跳请求
System.out.println("======== PINGREQ 心跳请求 ========");
// 回复心跳请求报文 PINGRESP
os.write(new byte[] { (byte)0xD0, 0x00 });
}
剩下的步骤就是根据不同的 报文类型
的相应规范读取 可变报头
和 有效载荷
,MQTT
代理服务器完整代码如下:
MqttBrokerDemo.java
package org.example;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
/**
* MQTT 代理服务器演示
*/
public class MqttBrokerDemo {
private static final int port = 1883;
private HashMap<Socket, String> clientMap = new HashMap<Socket, String>();
private HashMap<String, ArrayList<Socket>> topics = new HashMap<String, ArrayList<Socket>>();
public static void main(String[] args) {
new MqttBrokerDemo().run();
}
public void run() {
try {
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("MQTT Broker started on port " + port);
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("\n有新连接 getInetAddress:" + clientSocket.getInetAddress());
new MqttBrokerServer(clientSocket, new CallBack() {
/**
* 客户端连接请求回调
* @param socket - 当前客户端 Socket 对象
* @param clientId - 客户端标识符
*/
@Override
public void connect(Socket socket, String clientId) {
clientMap.put(socket, clientId);
}
/**
* 客户端发布请求回调
* @param socket - 当前客户端 Socket 对象
* @param topic - 主题
* @param bytes - publish 数据包,用于转发给目标客户端
*/
@Override
public void publish(Socket socket, String topic, byte[] bytes) {
ArrayList<Socket> array = topics.get(topic);
if (array == null) return;
try {
for (Socket temp : array) {
OutputStream os = temp.getOutputStream();
os.write(bytes);
os.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 客户端订阅请求回调
* @param socket - 当前客户端 Socket 对象
* @param topicList - 主题过滤器列表
*/
@Override
public void subscribe(Socket socket, ArrayList<String> topicList) {
if (topicList == null) return;
for (String topic : topicList) {
ArrayList<Socket> sockets = topics.get(topic);
if (sockets == null) {
sockets = new ArrayList<Socket>();
sockets.add(socket);
topics.put(topic, sockets);
} else {
sockets.add(socket);
topics.put(topic, sockets);
}
}
}
}).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static class MqttBrokerServer extends Thread {
protected Socket mSocket;
protected CallBack mCallBack;
public MqttBrokerServer(Socket socket, CallBack callBack) {
this.mSocket = socket;
this.mCallBack = callBack;
}
@Override
public void run() {
super.run();
try {
InputStream is = mSocket.getInputStream();
OutputStream os = mSocket.getOutputStream();
byte[] buffer = new byte[1024];
int type = 0;
int flag = 0;
int length = 0;
int read = 0;
long skipLen = 0;
while (true) {
read = is.read(buffer, 0 ,1);
length = BitUtils.decodeRemainingLength(is);
type = BitUtils.getHigh4bit(buffer[0]);
flag = BitUtils.getLow4bit(buffer[0]);
System.out.println("\n固定报头:类型 = " + type + ", flag = "+ flag +", length = " + length);
if (type == 1) {
// CONNECT - 客户向服务器发送的连接请求
// 读取 CONNECT 报文的可变报头
// 读取协议名长度
read = is.read(buffer, 0 ,2);
int protocolLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
// 读取协议名
read = is.read(buffer, 0 , protocolLen);
String protocolName = new String(Arrays.copyOfRange(buffer, 0, protocolLen), StandardCharsets.UTF_8);
// 读取部分连接标记
skipLen = is.skip(1);
read = is.read(buffer, 0 ,1);
int userNameFlag = BitUtils.getBit(buffer[0], 7); // 用户名标志
int pwdFlag = BitUtils.getBit(buffer[0], 6); // 密码标志
// 读取心跳连接间隔
read = is.read(buffer, 0 ,2);
int heartbeat = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
// 读取 clientId
read = is.read(buffer, 0, 2);
int clientIdLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
read = is.read(buffer, 0 , clientIdLen);
String clientId = new String(Arrays.copyOfRange(buffer, 0, clientIdLen), StandardCharsets.UTF_8);
// 读取 userName
read = is.read(buffer, 0, 2);
int userNameLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
read = is.read(buffer, 0 , userNameLen);
String userName = new String(Arrays.copyOfRange(buffer, 0, userNameLen), StandardCharsets.UTF_8);
// 读取 password
read = is.read(buffer, 0, 2);
int passwordLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
read = is.read(buffer, 0 , passwordLen);
String password = new String(Arrays.copyOfRange(buffer, 0, passwordLen), StandardCharsets.UTF_8);
System.out.println("======== CONNECT 连接请求 ========");
System.out.println("protocolLen = " + protocolLen + ", protocolName = " + protocolName + ", userNameFlag = " + userNameFlag + ", pwdFlag = " + pwdFlag + ", heartbeat = " + heartbeat + "s");
System.out.println("clientId = " + clientId + ", userName = " + userName + ", password = " + password);
mCallBack.connect(mSocket, clientId);
// 回复连接确认报文 CONNACK,固定报头 2 字节 + 可变报头 2 字节
// 可变报头内容为连接确认标记 0 和连接确认码 0
os.write(new byte[] { 0x20, 0x02, 0x00, 0x00 });
os.flush();
} else if (type == 3) {
// PUBLISH - 发布报文,忽略 QoS
ArrayList<Byte> byteList = new ArrayList<>();
byteList.add(buffer[0]);
for (byte b : BitUtils.encodeRemainingLength(length)) {
byteList.add(b);
}
int messageLength = length;
// 读取 topic
read = is.read(buffer, 0, 2);
int topicLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
byteList.add(buffer[0]);
byteList.add(buffer[1]);
read = is.read(buffer, 0 , topicLen);
byte[] topicByte = Arrays.copyOfRange(buffer, 0, topicLen);
String topic = new String(topicByte, StandardCharsets.UTF_8);
for (byte b : topicByte) { byteList.add(b); }
// Qos = 0, 无报文标识符
// 读取 message
messageLength = messageLength - 2 - topicLen;
read = is.read(buffer, 0 , messageLength);
byte[] msgByte = Arrays.copyOfRange(buffer, 0, messageLength);
String message = new String(msgByte, StandardCharsets.UTF_8);
for (byte b : msgByte) { byteList.add(b); }
System.out.println("======== PUBLISH 发布请求 ========");
System.out.println("topic = " + topic + ", message = " + message);
byte[] bytes = new byte[byteList.size()];
for(int i = 0; i < bytes.length; i ++) {
bytes[i] = byteList.get(i);
}
mCallBack.publish(mSocket, topic, bytes);
// qos = 0, 无需回复 PUBACK
} else if (type == 8 && flag == 2) {
// SUBSCRIBE - 订阅主题
int messageLength = length;
// 跳过可变报头 - 报文标识符
skipLen = is.skip(2);
messageLength -= skipLen;
// 读取 topic
ArrayList<String> topics = new ArrayList<String>();
while(messageLength > 0) {
read = is.read(buffer, 0 ,2);
int topicLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
read = is.read(buffer, 0 , topicLen);
String topic = new String(Arrays.copyOfRange(buffer, 0, topicLen), StandardCharsets.UTF_8);
topics.add(topic);
// Qos 暂时忽略
read = is.read(buffer, 0 , 1);
messageLength -= (topicLen + 3);
}
System.out.println("======== SUBSCRIBE 订阅请求 ========");
System.out.println("topics = " + Arrays.toString(topics.toArray()));
mCallBack.subscribe(mSocket, topics);
// 回复订阅主题报文 SUBACK, 总长度为 固定报头 2 字节 + 可变报头 2 字节 + 1字节 * 订阅主题过滤器列表大小
byte[] subacks = new byte[ 2 + 2 + topics.size() ];
// 固定报头 2 字节
// 由于 java 中没有无符号类型,所以 0x90 超出了 byte 的取值范围,强转处理即可
subacks[0] = (byte)0x90; // 类型:SUBACK, 标记: 0
subacks[1] = (byte)(2 + topics.size()); // 剩余长度 = 可变报头 2 字节 + 主题过滤器列表大小 * 1字节, 此处忽略剩余长度大于 127 的情况
// 可变报头 2 字节 - 报文标识符 = 0x0001
subacks[2] = 0x00;
subacks[3] = 0x01;
// 有效载荷
for(int i = 4; i < subacks.length; i ++) {
// 简单处理,统一设置回复主题过滤器 Qos = 0
subacks[i] = 0x00;
}
// 回复 SUBACK
os.write(subacks);
os.flush();
} else if (type == 12) {
// PINGREQ – 心跳请求
System.out.println("======== PINGREQ 心跳请求 ========");
// 回复心跳请求报文 PINGRESP
os.write(new byte[] { (byte)0xD0, 0x00 });
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
interface CallBack {
public void connect(Socket socket, String clientId);
public void publish(Socket socket, String topic, byte[] bytes);
public void subscribe(Socket socket, ArrayList<String> topicList);
}
}
BitUtils.java
package org.example;
import java.io.IOException;
import java.io.InputStream;
public class BitUtils {
/**
* 解码 MQTT 固定报头剩余长度
* @param is - InputStream
* @return int
*/
public static int decodeRemainingLength(InputStream is) throws IOException {
byte[] buffer = new byte[1];
int length = 0;
int multiplier = 1;
int max = 128 * 128 * 128;
int read = 0;
do {
read = is.read(buffer, 0 ,1);
if (read == -1) break;
length += (buffer[0] & 0x7f) * multiplier;
multiplier *= 128;
if (multiplier > max) break;
} while ((buffer[0] & 0x80) != 0);
return length;
}
/**
* 编码 MQTT 固定报头剩余长度为 byte 数组
* @param length - 固定报头剩余长度
* @return byte[]
*/
public static byte[] encodeRemainingLength(int length) {
byte[] temp = new byte[4];
int index = 0;
int number = length;
do {
temp[index] = (byte)(number % 128);
number /= 128;
if (number > 0) {
// 向最高位添加标志 1
temp[index] = (byte)(temp[index] | 128);
}
index ++;
} while (number > 0);
byte[] bytes = new byte[index];
System.arraycopy(temp, 0, bytes, 0, bytes.length);
return bytes;
}
/**
* 获取 Byte 数据的高 4 位
* @param data - byte 数据
* @return int
*/
public static int getHigh4bit(byte data) {
return (data & 0xF0) >> 4;
}
/**
* 获取 byte 数据的低 4 位
* @param data - byte 数据
* @return int
*/
public static int getLow4bit(byte data) {
return data & 0x0F;
}
/**
* 通过字节数据中的某一二进制位
* @param data - 字节数据
* @param index - 第几个二进制位, 从 0 开始
* @return int
*/
public static int getBit(byte data, int index) {
return ((1 << index) & data) >> index;
}
/**
* 通过高字节和低字节还原数据值
* @param high - 据高字节
* @param low - 数据低字节
* @return int
*/
public static int getIntBy2ByteData(byte high, byte low) {
return ((high & 0x00FF) << 8) | low;
}
}
代理服务器运行后,遵循先运行订阅者,后运行发布者的顺序运行第 2 章节的 MQTT
客户端的演示程序,得到控制台打印结果如下:
# MqttBrokerDemo.java
MQTT Broker started on port 1883
有新连接 getInetAddress:/127.0.0.1
固定报头:类型 = 1, flag = 0, length = 46
======== CONNECT 连接请求 ========
protocolLen = 4, protocolName = MQTT, userNameFlag = 1, pwdFlag = 1, heartbeat = 60s
clientId = paho1226108144974700, userName = user, password = 123456
有新连接 getInetAddress:/127.0.0.1
固定报头:类型 = 1, flag = 0, length = 47
======== CONNECT 连接请求 ========
protocolLen = 4, protocolName = MQTT, userNameFlag = 1, pwdFlag = 1, heartbeat = 60s
clientId = paho1226112802010100, userName = user1, password = 123456
固定报头:类型 = 8, flag = 2, length = 9
======== SUBSCRIBE 订阅请求 ========
topics = [test]
固定报头:类型 = 3, flag = 0, length = 18
======== PUBLISH 发布请求 ========
topic = test, message = test message
# MqttClientDemo.java 订阅者
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1226108144974700
通过用户名密码连接到 MQTT 代理服务器: username = user, password = 123456
订阅一个主题: topic = test, qos = 0
收到服务器消息, topic = test, payload = test message
# MqttClientDemo.java 发布者
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1226112802010100
通过用户名密码连接到 MQTT 代理服务器: username = user1, password = 123456
消息已传递完成且已收到所有确认, topic = [test], payload = test message
发布消息: topic = test, payload = test message