MQTT 消息队列遥测传输 (Message Queuing Telemetry Transport) 是 ISO 标准 (ISO/IEC PRF 20922) 下基于 发布(Publish)/订阅(Subscribe) 范式的消息协议。MQTT 协议是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的,具有轻量、简单、开放和易于实现的特点,这些特点使它适用范围非常广泛,如 M2M 机器与机器通信和 IoT 物联网。

1. MQTT 概述

1.1 MQTT 网络实体

如图 1.1 所示,MQTT 协议定义了两种网络实体:消息代理 (Message Broker)客户端 (Client)。其中,消息代理 用于接收来自 客户端 的消息并转发至 目标客户端客户端 可以是任何运行有 MQTT 库并通过网络连接至 消息代理 的设备,客户端 既可以是 发布方 也可以是 订阅方

图1.1

1.2 MQTT 消息

MQTT 传输的消息分为 主题 (Topic)载荷 (Payload) 两部分。载荷 是消息携带的具体数据内容; 主题 是用于管理信息传输的,消息代理 接收到 发布者 (客户端) 发布 的消息后会向 订阅主题客户端 分发此消息,如果此 主题 没有任何 订阅发布者 没有将其标记为 保留消息 (Retained),那么 消息代理 就会丢弃此消息。

1.3 MQTT 服务质量

MQTT 服务质量 QoS (Quality of Service) 指的是消息发布的服务质量,而不是消息接收的服务质量。MQTT 有三种消息发布服务质量:

  • QoS 0:最多一次传送,即只负责发送,发送过后就不管数据的发送情况;
  • QoS 1:至少一次传送,确认数据交付,需要握手2次,即发送方要接收到接收方发送的确认收到的回复消息,如图 1.2 所示;
  • QoS 2:正好一次传送,确认数据交付,需要握手4次,即接收方需要收到发送方发送的确认收到接收方回复的消息,如图 1.3 所示。

图1.2

图1.3

2. MQTT 简单使用

Eclipse Paho 是一个 MQTT 协议的可靠开源实现,要在 ` Java 中使用 Eclipse Paho`,需要先导入依赖包,最新依赖包版本可查看 Eclipse Paho GitHub 项目主页

<dependencies>
  ......
  <dependency>
    <!-- MQTT v3 -->
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
  </dependency>
</dependencies>

新建类 MqttClientDemo.java,实现一个 MQTT 客户端的演示程序,如下:

package org.example;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Arrays;


/**
 * MQTT 客户端演示
 */
public class MqttClientDemo {

  /** MQTT 客户端 */
  private IMqttClient mMqttClient;

  private final MqttCallback mqttCallback = new MqttCallback() {

    @Override
    public void connectionLost(Throwable throwable) {
      System.out.println("与服务器连接丢失, errorMsg = " + throwable.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
      System.out.println("收到服务器消息, topic = " + topic + ", payload = " + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
      try {
        System.out.println("消息已传递完成且已收到所有确认, topic = " + Arrays.toString(token.getTopics()) + ", payload = " + new String(token.getMessage().getPayload()));
      } catch (MqttException e) {
        throw new RuntimeException(e);
      
    }
  };

  /**
   * 创建 MQTT 客户端
   * @param serverURI - 代理服务器的地址
   */
  public void createClient(String serverURI) {
      String clientId = MqttClient.generateClientId();
      MqttClientPersistence persistence = new MemoryPersistence();
      try {
        // serverURI - 服务器的地址。支持两种类型的连接 tcp:// 用于 TCP 连接,ssl:// 用于受 SSL/TLS 保护的 TCP 连接。
        // clientId - 连接到代理时使用的唯一客户端 ID 字符串。serverURI 参数通常与 clientId 参数一起使用以形成密钥,用于在传递消息时存储和引用消息。
        // persistence - 用于存储传输中的消息的持久化类。如果为空,则使用默认的持久化机制。
        mMqttClient = new MqttClient(serverURI, clientId, persistence);
        System.out.println("创建 MQTT 客户端: serverURI = " + serverURI + ", clientId = " + clientId);
      } catch (MqttException e) {
        e.printStackTrace();
      }
  }

  /**
   * 通过用户名密码连接到 MQTT 代理服务器
   * @param username - 用户名
   * @param password - 密码
   */
  public void connect(String username, String password) {
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    // 返回如果连接丢失,客户端是否会自动尝试重新连接到服务器
    options.setAutomaticReconnect(true);
    // 设置客户端和服务器是否应在重新启动和重新连接时记住状态。
    options.setCleanSession(true);
    // 设置回调用于异步发生的事件,如收到新消息、消息发送成功、与服务器连接丢失
    mMqttClient.setCallback(mqttCallback);
    try {
      mMqttClient.connect(options);
      System.out.println("通过用户名密码连接到 MQTT 代理服务器: username = " + username + ", password = " + password);
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }

  /**
   * 发布消息
   * @param topic - 主题
   * @param payload - 负载,实际使用的消息内容
   * @param qos - 消息发布的服务质量,QoS0 - 0,QoS1 - 1,QoS2 - 2
   * @param retain - 发布消息是否应由代理服务器保留
   * @return boolean - 消息是否发布成功
   */
  public boolean publish(String topic, String payload, int qos, boolean retain) {
      MqttMessage mqttMessage = new MqttMessage();
      mqttMessage.setPayload(payload.getBytes());
      mqttMessage.setQos(qos);
      mqttMessage.setRetained(retain);
      try {
        mMqttClient.publish(topic, mqttMessage);
        System.out.println("发布消息: topic = " + topic + ", payload = " + payload);
        return true;
      } catch (MqttException e) {
        e.printStackTrace();
        return false;
      }
  }

  /**
   * 订阅一个主题
   * @param topic - 要订阅的主题,可以包含通配符
   * @param qos - 订阅的最高服务质量,以较低服务质量发布的消息将以发布的 QoS 接收
   */
  public void subscribe(String topic, int qos) {
    try {
      mMqttClient.subscribe(topic, qos);
      System.out.println("订阅一个主题: topic = " + topic + ", qos = " + qos);
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }

  /**
   * 退订一个主题
   * @param topic - 要退订的主题,必须与订阅中指定的主题匹配
   */
  public void unsubscribe(String topic) {
    try {
      mMqttClient.unsubscribe(topic);
      System.out.println("退订一个主题: topic = " + topic);
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }

  /**
   * 断开与服务器的连接
   */
  public void disConnect() {
    try {
      mMqttClient.disconnect();
      System.out.println("断开与服务器的连接");
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }

  /**
   * 重新连接服务器
   */
  public void reConnect() {
    try {
      mMqttClient.reconnect();
      System.out.println("重新连接服务器");
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
}

有了 MQTT 客户端程序后,还需要一个 MQTT 代理服务器来转发消息,本文使用 Mosquitto 来搭建 MQTT 代理服务器,Mosquitto 项目主页:https://github.com/eclipse/mosquitto

安装并运行 Mosquitto 后则运行 MQTT 客户端程序,至少需要两个客户端,一个作为订阅者,一个作为发布者。订阅者演示程序如下所示:

MqttClientDemo client = new MqttClientDemo();
client.createClient("tcp://localhost:1883");
client.connect("user", "123456");
new Thread() {
  @Override
  public void run() {
    super.run();
    try {
      Thread.sleep(5000);
      client.subscribe("test", 0);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}.start();

发布者演示程序如下所示:

MqttClientDemo client = new MqttClientDemo();
client.createClient("tcp://localhost:1883");
client.connect("user", "123456");
new Thread() {
  @Override
  public void run() {
    super.run();
    try {
      Thread.sleep(5000);
      client.subscribe("test", 0);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}.start();

遵循先运行订阅者,后运行发布者的顺序,得到控制台打印结果如下:

# 订阅者演示程序打印结果
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1174155411898200
通过用户名密码连接到 MQTT 代理服务器: username = user, password = 123456
订阅一个主题: topic = test, qos = 0
收到服务器消息, topic = test, payload = test message


# 发布者演示程序打印结果
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1174159650496000
通过用户名密码连接到 MQTT 代理服务器: username = user1, password = 123456
发布消息: topic = test, payload = test message
消息已传递完成且已收到所有确认, topic = [test], payload = test message

3. MQTT 结构

如图 3.1 所示,MQTT 报文由 固定报头可变报头有效载荷 组成,其中所有报文都包含 固定报头,而 可变报头有效载荷 只在部分报文中出现 。本章节基于 MQTT v3.1.1 简述部分报文类型,更多报文类型或其他 MQTT 协议版本可查看官方文档

图3.1

3.1 固定报头

如图 3.2 所示,固定报头长度为 16 位,2 字节。第 1 位字节的高 4 位为 MQTT 报文类型, 低 4 位为 标志位,不同的 报文类型标志位 有不同的含义。

图3.2

MQTT 的各个 报文类型 如下表所示:

报文类型 类型值 方向 描述
Reserved 0 N/A 保留
CONNECT 1 客户端到服务端 客户端请求连接服务端
CONNACK 2 服务端到客户端 连接报文确认
PUBLISH 3 双向 发布消息
PUBACK 4 双向 QoS=1 时消息发布收到确认
PUBREC 5 双向 QoS=2 时发布收到(保证交付第一步)
PUBREL 6 双向 QoS=2 时发布释放(保证交付第二步)
PUBCOMP 7 双向 QoS=2 时消息发布完成(保证交互第三步)
SUBSCRIBE 8 客户端到服务端 客户端订阅请求
SUBACK 9 服务端到客户端 订阅请求报文确认
UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求
UNSUBACK 11 服务端到客户端 取消订阅报文确认
PINGREQ 12 客户端到服务端 心跳请求
PINGRESP 13 服务端到客户端 心跳响应
DISCONNECT 14 客户端到服务端 客户端断开连接
Reserved 15 N/A 保留

MQTT 各个 报文类型 下的 标志位 含义如下表所示:

报文类型 报头标志 Bit 3 Bit 2 Bit 1 Bit 0
CONNECT 保留 0 0 0 0
CONNACK 保留 0 0 0 0
PUBLISH 用于 MQTT 3.1.1 DUP QoS QoS RETAIN
PUBACK 保留 0 0 0 0
PUBREC 保留 0 0 0 0
PUBREL 保留 0 0 1 0
PUBCOMP 保留 0 0 0 0
SUBSCRIBE 保留 0 0 1 0
SUBACK 保留 0 0 0 0
UNSUBSCRIBE 保留 0 0 1 0
UNSUBACK 保留 0 0 0 0
PINGREQ 保留 0 0 0 0
PINGRESP 保留 0 0 0 0
DISCONNECT 保留 0 0 0 0

其中:

  • DUP:重复发布标志,此标志为 0 表示是第一次发布消息,为 1 表示对消息的重新发布;
  • QoS:消息发布的服务质量,详见 1.3 节;
  • RETAIN:发布消息保留标志,此标志为 1 时服务器必须存储客户端发布的 PUBLISH 数据包及其 QoS,后面有客户端建立新订阅时必须把主题上的最后保留消息发送给订阅者。

剩余长度 字段使用一个变长度编码方案,且 剩余长度 不包括用于编码 剩余长度 字段本身的字节数。 剩余长度 最小 1 字节,最大 4 个字节,每个字节的最高位为标志位, 标志位为 1 时表示 剩余长度 字段有更多的字节,为 0 时表示 剩余长度 字段没有更多的字节。

单字节的 剩余长度 字段最大值为 0111 0000b = 0x7f = 127,从第 2 个字节开始,每字节的单位大小为 $128^{n-1}$,例如十进制数 456 可被拆分为 72 + 3 × 128,此时 剩余长度 字段长度为 2 字节,即 0x48 0x03 。此变长度编码方案的取值范围及其计算如下表所示。

字节数 最小值 最大值
1 0x00 = 0 0x7f = 127
2 0x80,0x01 = 0×1 + 1×128 = 128 0xff, 0x7f = 127×1 + 127×128 = 16,383
3 0x80, 0x80, 0x01
= 0×1 + 0×128 + 1×128^2
= 16,384
0xff, 0xff, 0x7f
= 127×1 + 127×128 + 127×128^2
= 2,097,151
4 0x80, 0x80, 0x80, 0x01
= 0×1 + 0×128 + 0×128^2 +1×128^3
= 2,097,152
0xff, 0xff, 0xff, 0x7f
= 127×1 + 127×128 + 127×128^2 + 127×128^3
= 268,435,455

3.2 可变报头

某些 MQTT 控制报文中包含一个 可变报头可变报头的内容根据 报文类型 的不同而不同。如图 3.3 所示,大部分控制报文的 可变报头 部分都包含一个两字节的 报文标识符 (Packet Identifier) 字段,报文标识符 用于保证报文的唯一性以用于报文重发,当客户端处理完此报文对应的确认操作后,这个 报文标识符 就可被重用。

SUBSCRIBEUNSUBSCRIBEPUBLISH (QoS > 0) 报文必须包含一个非零的 16 位 报文标识符,而 PUBLISH (QoS = 0) 报文不能包含 报文标识符。是否包含 报文标识符 的控制报文如下表所示:

报文类型 是否包含报文标识符
CONNECT
CONNACK
PUBLISH 是 (Qos > 0),Qos = 0 时不包含
PUBACK
PUBREC
PUBREL
PUBCOMP
SUBSCRIBE
SUBACK
UNSUBSCRIBE
UNSUBACK
PINGREQ
PINGRESP
DISCONNECT

3.3 CONNECT

CONNECT 报文用于客户端请求连接到服务端,在一个网络连接上,客户端只能发送一次 CONNECT 报文;若服务端收到客户端发送的第二个 CONNECT 报文必须当作协议违规处理并断开与客户端的连接。

CONNECT 固定报头

图3.3

CONNECT 可变报头

图3.4

前 6 个字节为固定值,字节 7 在本文中值为 4, 表示使用的是 MQTT v3.1.1 ,字节 8 是连接标志,其中:

  • 用户名标记:此标记为 0 时,有效载荷中不能包含用户名字段,此标记为 1 时,有效载荷中必须包含用户名字段;
  • 密码标记:此标记为 0 时,有效载荷中不能包含密码字段,此标记为 1 时,有效载荷中必须包含密码字段;
  • Retain:如果遗嘱消息被发布时需要保留,需要设置此位为 1
  • QoS:指定发布遗嘱消息时使用的服务质量等级,如果遗嘱标记为 0QoS 也必须为 0,如果遗嘱标记为 1QoS 可以为 012
  • 遗嘱标记:遗嘱标记 Will Flag 被设置为 1,表示如果连接请求被接受了,遗嘱消息 Will Message 必须被存储在服务端中且与这个网络连接关联。当此网络连接关闭时,服务端必须发布这个遗嘱消息,除非服务端收到 DISCONNECT 报文时删除了这个遗嘱消息;
  • 清理会话:清理会话标记 Clean Session,此标记指定了会话状态的处理方式。此标记为 0 时服务端必须基于当前会话的状态恢复与客户端的通信,此标记为 1 时客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。

CONNECT 有效载荷

CONNECT 报文的有效载荷包含一个或多个以长度为前缀的字段,可变报头中的标志决定是否包含这些字段。如果包含的话,必须按 客户端标识符遗嘱主题遗嘱消息用户名密 码 的顺序出现。图 3.5 展示了仅包含 客户端标识符用户名密 码 字段的有效载荷。

图3.5

3.4 CONNACK

服务端发送 CONNACK 报文响应从客户端收到的 CONNECT 报文,服务端发送给客户端的第一个报文必须是 CONNACK。如果客户端在一定时间内没有收到服务端的 CONNACK 报文,客户端应该关闭网络连接。

CONNACK 固定报头

图3.6

CONNACK 可变报头

图3.7

其中:

  • SP:当前会话标记 Session Present。如果服务端收到清理会话标记 Clean Session1 的连接,需要将 CONNACK 报文中的 SP连接返回码 设置为 0。如果服务端收到清理会话标记 Clean Session0 的连接,SP 标记值取决于服务端是否已经保存了 clientId 对应客户端的会话状态,如果服务端已经保存了会话状态,需要将 CONNACK 报文中的 SP连接返回码 设置为 1;如果服务端没有已保存的会话状态,需要将 CONNACK 报文中的 SP连接返回码 设置为 0

  • 连接返回码:连接返回码字段使用一个字节的无符号值,描述了对此连接的响应结果,如下表所示,如果下表中的所有连接返回码都不太合适,那么服务端必须关闭网络连接,不需要发送 CONNACK 报 文。

    返回码 名称 描述
    0x00 连接已接受 连接已被服务端接受
    0x01 连接已拒绝 服务端不支持客户端请求的 MQTT 协议级别
    0x02 连接已拒绝 客户端标识符是正确的 UTF-8 编码,但服务端不允许使用
    0x03 连接已拒绝 网络连接已建立,但 MQTT 服务不可用
    0x04 连接已拒绝 用户名或密码的数据格式无效
    0x05 连接已拒绝 客户端未被授权连接到此服务器
    0x06 ~ 0xFF 保留 保留

CONNACK 有效载荷

CONNACK 报文没有有效载荷。

3.5 PUBLISH

PUBLISH 控制报文用于从客户端向服务端或者服务端向客户端传输一个消息。

PUBLISH 固定报头

图3.8

其中:

  • DUP:重复发布标志,此标志为 0 表示是第一次发布消息,为 1 表示对消息的重新发布;
  • QoS:消息发布的服务质量,详见 1.3 节;
  • RETAIN:发布消息保留标志,此标志为 1 时服务器必须存储客户端发布的 PUBLISH 数据包及其 QoS,后面有客户端建立新订阅时必须把主题上的最后保留消息发送给订阅者。

PUBLISH 可变报头

图3.9

只有当 Qos > 0 时可变报头才会包含 报文标识符 (Packet Identifier) 字段。

PUBLISH 有效载荷

PUBLISH 报文有效载荷包含将被发布的消息内容,$有效载荷的长度 = 固定报头中的剩余长度 - 可变报头的长度$, 有效载荷可为空。

3.6 SUBSCRIBE

客户端向服务端发送 SUBSCRIBE 报文用于创建一个或多个订阅。服务端收到 PUBLISH 报文后,需要将此 PUBLISH 报文转发给订阅了相应主题的客户端。

SUBSCRIBE 固定报头

图3.10

SUBSCRIBE 可变报头

图3.11

SUBSCRIBE 有效载荷

SUBSCRIBE 报文的有效载荷包含了一个主题过滤器列表,它们表示客户端想要订阅的主题,$有效载荷的长度 = 固定报头中的剩余长度 - 可变报头的长度$,主题过滤器列表中的每一个项结构如图 3.12 所示。

图3.12

3.7 SUBACK

服务端发送 SUBACK 报文给客户端,用于确认它已收到并且正在处理 SUBSCRIBE 报文。 SUBACK 报文包含一个返回码,它们指定了 SUBSCRIBE 请求的每个订阅被授予的最大 QoS 等级。

SUBACK 固定报头

图3.13

SUBACK 可变报头

图3.14

SUBACK 有效载荷

有效载荷包含一个返回码列表,每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器。返回码的顺序必须和 SUBSCRIBE 报文中主题过滤器的顺序相同。返回码列表中的每一个项结构如图 3.15 所示。

图3.15

3.8 PINGREQ

客户端发送 PINGREQ 报文给服务端的,用于:

  1. 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着;
  2. 请求服务端发送响应确认告知客户端服务端还活着;
  3. 使用网络以确认网络连接没有断开。

客户端发送 PINGREQ 报文的时间间隔是在 CONNECT 请求连接报文中的可变报头中的保持连接 Keep Alive 字段。

PINGREQ 固定报头

图3.16

PINGREQ 可变报头

PINGREQ 报文没有可变报头。

PINGREQ 有效载荷

PINGREQ 报文没有有效载荷。

3.9 PINGRESP

服务端发送 PINGRESP 报文响应客户端的 PINGREQ 报文,表示服务端还活着。

PINGRESP 固定报头

图3.17

PINGRESP 可变报头

PINGRESP 报文没有可变报头。

PINGRESP 有效载荷

PINGRESP 报文没有有效载荷。

4. 实现 MQTT 代理

基于第 3 节简述的部分报文类型,使用 Socket 实现一个简单的 MQTT 代理服务器。读取 MQTT 固定报头 的关键代码如下:

try {
  InputStream is = mSocket.getInputStream();
  OutputStream os = mSocket.getOutputStream();
  // ...忽略部分变量定义...
  while (true) {
    read = is.read(buffer, 0, 1); // 读取报文第 1 个字节
    length = BitUtils.decodeRemainingLength(is); // 解码固定报头中的剩余长度字段
    type = BitUtils.getHigh4bit(buffer[0]); // 读取报文第 1 个字节中的高 4 位, 即报文类型
    flag = BitUtils.getLow4bit(buffer[0]); // 读取报文第 1 个字节中的低 4 位, 即报文类型标记
    System.out.println("\n固定报头:类型 = " + type + ", flag = " + flag + ", length = " + length);
  }
} catch (IOException e) {
  e.printStackTrace();
}

由于 可变报头 的内容根据 报文类型 的不同而不同,所以需要先确认报文类型后再依据协议规范读取。本文中仅简单实现部分报文,分别为:CONNECTCONNACKPUBLISH (QoS = 0)SUBSCRIBESUBACKPINGREQPINGRESP,对各个报文类型的判断和回复关键代码如下:

if (type == 1) {
  // CONNECT - 客户向服务器发送的连接请求

  // 回复连接确认报文 CONNACK,固定报头 2 字节 + 可变报头 2 字节
  // 可变报头内容为连接确认标记 0 和连接确认码 0
  os.write(new byte[] { 0x20, 0x02, 0x00, 0x00 });
  os.flush();
} else if (type == 3) {
  // PUBLISH - 发布报文,忽略 QoS

  // qos = 0, 无需回复 PUBACK
} else if (type == 8 && flag == 2) {
  // SUBSCRIBE - 订阅主题

  // 回复订阅主题报文 SUBACK, 总长度为 固定报头 2 字节 + 可变报头 2 字节 + 1字节 * 订阅主题过滤器列表大小
  byte[] subacks = new byte[ 2 + 2 + topics.size() ];
  // 固定报头 2 字节
  // 由于 java 中没有无符号类型,所以 0x90 超出了 byte 的取值范围,强转处理即可
  subacks[0] = (byte)0x90; // 类型:SUBACK, 标记: 0
  subacks[1] = (byte)(2 + topics.size()); // 剩余长度 = 可变报头 2 字节 + 主题过滤器列表大小 * 1字节, 此处忽略剩余长度大于 127 的情况
  // 可变报头 2 字节 - 报文标识符 = 0x0001
  subacks[2] = 0x00;
  subacks[3] = 0x01;
  // 有效载荷
  for(int i = 4; i < subacks.length; i ++) {
    // 简单处理,统一设置回复主题过滤器 Qos = 0
    subacks[i] = 0x00;
  }
  // 回复 SUBACK
  os.write(subacks);
  os.flush();

} else if (type == 12) {
  // PINGREQ – 心跳请求
  System.out.println("======== PINGREQ 心跳请求 ========");
  // 回复心跳请求报文 PINGRESP
  os.write(new byte[] { (byte)0xD0, 0x00 });
}

剩下的步骤就是根据不同的 报文类型 的相应规范读取 可变报头 有效载荷MQTT 代理服务器完整代码如下:

MqttBrokerDemo.java

package org.example;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;

/**
 * MQTT 代理服务器演示
 */
public class MqttBrokerDemo {

    private static final int port = 1883;
    private HashMap<Socket, String> clientMap = new HashMap<Socket, String>();
    private HashMap<String, ArrayList<Socket>> topics = new HashMap<String, ArrayList<Socket>>();

    public static void main(String[] args) {
        new MqttBrokerDemo().run();
    }


    public void run() {
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            System.out.println("MQTT Broker started on port " + port);
            while (true) {
                Socket clientSocket = serverSocket.accept();
                System.out.println("\n有新连接 getInetAddress:" + clientSocket.getInetAddress());
                new MqttBrokerServer(clientSocket, new CallBack() {
                    /**
                     * 客户端连接请求回调
                     * @param socket - 当前客户端 Socket 对象
                     * @param clientId - 客户端标识符
                     */
                    @Override
                    public void connect(Socket socket, String clientId) {
                        clientMap.put(socket, clientId);
                    }

                    /**
                     * 客户端发布请求回调
                     * @param socket - 当前客户端 Socket 对象
                     * @param topic - 主题
                     * @param bytes - publish 数据包,用于转发给目标客户端
                     */
                    @Override
                    public void publish(Socket socket, String topic, byte[] bytes) {
                        ArrayList<Socket> array = topics.get(topic);
                        if (array == null) return;
                        try {
                            for (Socket temp : array) {
                                OutputStream os = temp.getOutputStream();
                                os.write(bytes);
                                os.flush();
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    /**
                     * 客户端订阅请求回调
                     * @param socket - 当前客户端 Socket 对象
                     * @param topicList - 主题过滤器列表
                     */
                    @Override
                    public void subscribe(Socket socket, ArrayList<String> topicList) {
                        if (topicList == null) return;
                        for (String topic : topicList) {
                            ArrayList<Socket> sockets = topics.get(topic);
                            if (sockets == null) {
                                sockets = new ArrayList<Socket>();
                                sockets.add(socket);
                                topics.put(topic, sockets);
                            } else {
                                sockets.add(socket);
                                topics.put(topic, sockets);
                            }
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static class MqttBrokerServer extends Thread {

        protected Socket mSocket;
        protected CallBack mCallBack;

        public MqttBrokerServer(Socket socket, CallBack callBack) {
            this.mSocket = socket;
            this.mCallBack = callBack;
        }

        @Override
        public void run() {
            super.run();
            try {
                InputStream is = mSocket.getInputStream();
                OutputStream os = mSocket.getOutputStream();
                byte[] buffer = new byte[1024];
                int type = 0;
                int flag = 0;
                int length = 0;
                int read = 0;
                long skipLen = 0;
                while (true) {
                    read = is.read(buffer, 0 ,1);
                    length = BitUtils.decodeRemainingLength(is);
                    type = BitUtils.getHigh4bit(buffer[0]);
                    flag = BitUtils.getLow4bit(buffer[0]);
                    System.out.println("\n固定报头:类型 = " + type + ", flag = "+ flag +", length = " + length);

                    if (type == 1) {
                        // CONNECT - 客户向服务器发送的连接请求
                        // 读取 CONNECT 报文的可变报头
                        // 读取协议名长度
                        read = is.read(buffer, 0 ,2);
                        int protocolLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
                        // 读取协议名
                        read = is.read(buffer, 0 , protocolLen);
                        String protocolName = new String(Arrays.copyOfRange(buffer, 0, protocolLen), StandardCharsets.UTF_8);
                        // 读取部分连接标记
                        skipLen = is.skip(1);
                        read = is.read(buffer, 0 ,1);
                        int userNameFlag = BitUtils.getBit(buffer[0], 7); // 用户名标志
                        int pwdFlag = BitUtils.getBit(buffer[0], 6); // 密码标志
                        // 读取心跳连接间隔
                        read = is.read(buffer, 0 ,2);
                        int heartbeat =  BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
                        // 读取 clientId
                        read = is.read(buffer, 0, 2);
                        int clientIdLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
                        read = is.read(buffer, 0 , clientIdLen);
                        String clientId = new String(Arrays.copyOfRange(buffer, 0, clientIdLen), StandardCharsets.UTF_8);
                        // 读取 userName
                        read = is.read(buffer, 0, 2);
                        int userNameLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
                        read = is.read(buffer, 0 , userNameLen);
                        String userName = new String(Arrays.copyOfRange(buffer, 0, userNameLen), StandardCharsets.UTF_8);
                        // 读取 password
                        read = is.read(buffer, 0, 2);
                        int passwordLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
                        read = is.read(buffer, 0 , passwordLen);
                        String password = new String(Arrays.copyOfRange(buffer, 0, passwordLen), StandardCharsets.UTF_8);

                        System.out.println("======== CONNECT 连接请求 ========");
                        System.out.println("protocolLen = " + protocolLen + ", protocolName = " + protocolName + ", userNameFlag = " + userNameFlag + ", pwdFlag = " + pwdFlag + ", heartbeat = " + heartbeat + "s");
                        System.out.println("clientId = " + clientId + ", userName = " + userName + ", password = " + password);

                        mCallBack.connect(mSocket, clientId);

                        // 回复连接确认报文 CONNACK,固定报头 2 字节 + 可变报头 2 字节
                        // 可变报头内容为连接确认标记 0 和连接确认码 0
                        os.write(new byte[] { 0x20, 0x02, 0x00, 0x00 });
                        os.flush();

                    } else if (type == 3) {
                        // PUBLISH - 发布报文,忽略 QoS
                        ArrayList<Byte> byteList = new ArrayList<>();
                        byteList.add(buffer[0]);
                        for (byte b : BitUtils.encodeRemainingLength(length)) {
                            byteList.add(b);
                        }
                        int messageLength = length;

                        // 读取 topic
                        read = is.read(buffer, 0, 2);
                        int topicLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
                        byteList.add(buffer[0]);
                        byteList.add(buffer[1]);
                        read = is.read(buffer, 0 , topicLen);
                        byte[] topicByte = Arrays.copyOfRange(buffer, 0, topicLen);
                        String topic = new String(topicByte, StandardCharsets.UTF_8);
                        for (byte b : topicByte) { byteList.add(b); }

                        // Qos = 0, 无报文标识符
                        // 读取 message
                        messageLength = messageLength - 2 - topicLen;
                        read = is.read(buffer, 0 , messageLength);
                        byte[] msgByte = Arrays.copyOfRange(buffer, 0, messageLength);
                        String message = new String(msgByte, StandardCharsets.UTF_8);
                        for (byte b : msgByte) { byteList.add(b); }

                        System.out.println("======== PUBLISH 发布请求 ========");
                        System.out.println("topic = " + topic + ", message = " + message);

                        byte[] bytes = new byte[byteList.size()];
                        for(int i = 0; i < bytes.length; i ++) {
                            bytes[i] = byteList.get(i);
                        }
                        mCallBack.publish(mSocket, topic, bytes);
                        // qos = 0, 无需回复 PUBACK

                    } else if (type == 8 && flag == 2) {
                        // SUBSCRIBE - 订阅主题
                        int messageLength = length;
                        // 跳过可变报头 - 报文标识符
                        skipLen = is.skip(2);
                        messageLength -= skipLen;

                        // 读取 topic
                        ArrayList<String> topics = new ArrayList<String>();
                        while(messageLength > 0) {
                            read = is.read(buffer, 0 ,2);
                            int topicLen = BitUtils.getIntBy2ByteData(buffer[0], buffer[1]);
                            read = is.read(buffer, 0 , topicLen);
                            String topic = new String(Arrays.copyOfRange(buffer, 0, topicLen), StandardCharsets.UTF_8);
                            topics.add(topic);
                            // Qos 暂时忽略
                            read = is.read(buffer, 0 , 1);
                            messageLength -= (topicLen + 3);
                        }
                        System.out.println("======== SUBSCRIBE 订阅请求 ========");
                        System.out.println("topics = " + Arrays.toString(topics.toArray()));

                        mCallBack.subscribe(mSocket, topics);

                        // 回复订阅主题报文 SUBACK, 总长度为 固定报头 2 字节 + 可变报头 2 字节 + 1字节 * 订阅主题过滤器列表大小
                        byte[] subacks = new byte[ 2 + 2 + topics.size() ];
                        // 固定报头 2 字节
                        // 由于 java 中没有无符号类型,所以 0x90 超出了 byte 的取值范围,强转处理即可
                        subacks[0] = (byte)0x90; // 类型:SUBACK, 标记: 0
                        subacks[1] = (byte)(2 + topics.size()); // 剩余长度 = 可变报头 2 字节 + 主题过滤器列表大小 * 1字节, 此处忽略剩余长度大于 127 的情况
                        // 可变报头 2 字节 - 报文标识符 = 0x0001
                        subacks[2] = 0x00;
                        subacks[3] = 0x01;
                        // 有效载荷
                        for(int i = 4; i < subacks.length; i ++) {
                            // 简单处理,统一设置回复主题过滤器 Qos = 0
                            subacks[i] = 0x00;
                        }
                        // 回复 SUBACK
                        os.write(subacks);
                        os.flush();

                    } else if (type == 12) {
                        // PINGREQ – 心跳请求
                        System.out.println("======== PINGREQ 心跳请求 ========");
                        // 回复心跳请求报文 PINGRESP
                        os.write(new byte[] { (byte)0xD0, 0x00 });
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    interface CallBack {
        public void connect(Socket socket, String clientId);

        public void publish(Socket socket, String topic, byte[] bytes);

        public void subscribe(Socket socket, ArrayList<String> topicList);
    }

}

BitUtils.java

package org.example;

import java.io.IOException;
import java.io.InputStream;

public class BitUtils {

    /**
     * 解码 MQTT 固定报头剩余长度
     * @param is - InputStream
     * @return int
     */
    public static int decodeRemainingLength(InputStream is) throws IOException {
        byte[] buffer = new byte[1];
        int length = 0;
        int multiplier = 1;
        int max = 128 * 128 * 128;
        int read = 0;
        do {
            read = is.read(buffer, 0 ,1);
            if (read == -1) break;
            length += (buffer[0] & 0x7f) * multiplier;
            multiplier *= 128;
            if (multiplier > max) break;
        } while ((buffer[0] & 0x80) != 0);
        return length;
    }

    /**
     * 编码 MQTT 固定报头剩余长度为 byte 数组
     * @param length - 固定报头剩余长度
     * @return byte[]
     */
    public static byte[] encodeRemainingLength(int length) {
        byte[] temp = new byte[4];
        int index = 0;
        int number = length;
        do {
            temp[index] = (byte)(number % 128);
            number /= 128;
            if (number > 0) {
                // 向最高位添加标志 1
                temp[index] = (byte)(temp[index] | 128);
            }
            index ++;
        } while (number > 0);

        byte[] bytes = new byte[index];
        System.arraycopy(temp, 0, bytes, 0, bytes.length);
        return bytes;
    }


    /**
     * 获取 Byte 数据的高 4 位
     * @param data - byte 数据
     * @return int
     */
    public static int getHigh4bit(byte data) {
        return (data & 0xF0) >> 4;
    }

    /**
     * 获取 byte 数据的低 4 位
     * @param data - byte 数据
     * @return int
     */
    public static int getLow4bit(byte data) {
        return data & 0x0F;
    }

    /**
     * 通过字节数据中的某一二进制位
     * @param data - 字节数据
     * @param index - 第几个二进制位, 从 0 开始
     * @return int
     */
    public static int getBit(byte data, int index) {
        return ((1 << index) & data) >> index;
    }


    /**
     * 通过高字节和低字节还原数据值
     * @param high - 据高字节
     * @param low - 数据低字节
     * @return int
     */
    public static int getIntBy2ByteData(byte high, byte low) {
        return ((high & 0x00FF) << 8) | low;
    }

}

代理服务器运行后,遵循先运行订阅者,后运行发布者的顺序运行第 2 章节的 MQTT 客户端的演示程序,得到控制台打印结果如下:

# MqttBrokerDemo.java
MQTT Broker started on port 1883

有新连接 getInetAddress:/127.0.0.1

固定报头:类型 = 1, flag = 0, length = 46
======== CONNECT 连接请求 ========
protocolLen = 4, protocolName = MQTT, userNameFlag = 1, pwdFlag = 1, heartbeat = 60s
clientId = paho1226108144974700, userName = user, password = 123456

有新连接 getInetAddress:/127.0.0.1

固定报头:类型 = 1, flag = 0, length = 47
======== CONNECT 连接请求 ========
protocolLen = 4, protocolName = MQTT, userNameFlag = 1, pwdFlag = 1, heartbeat = 60s
clientId = paho1226112802010100, userName = user1, password = 123456

固定报头:类型 = 8, flag = 2, length = 9
======== SUBSCRIBE 订阅请求 ========
topics = [test]

固定报头:类型 = 3, flag = 0, length = 18
======== PUBLISH 发布请求 ========
topic = test, message = test message



# MqttClientDemo.java 订阅者
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1226108144974700
通过用户名密码连接到 MQTT 代理服务器: username = user, password = 123456
订阅一个主题: topic = test, qos = 0
收到服务器消息, topic = test, payload = test message



# MqttClientDemo.java 发布者
创建 MQTT 客户端: serverURI = tcp://localhost:1883, clientId = paho1226112802010100
通过用户名密码连接到 MQTT 代理服务器: username = user1, password = 123456
消息已传递完成且已收到所有确认, topic = [test], payload = test message
发布消息: topic = test, payload = test message

参考