白皮书
车云灵活数采方案:释放数据价值,加速智能创新 →

RT-Thread 使用 TLS/SSL 连接到 EMQX Cloud

Alvin Wong
2021-9-3
RT-Thread 使用 TLS/SSL 连接到 EMQX Cloud

本文将使用 RT-Thread 配合 ART-Pi 搭建 MQTT 客户端,快速接入 EMQX Cloud

EMQX Cloud 简介

EMQX Cloud 是由 EMQ 公司推出的可连接海量物联网设备,集成各类数据库及业务系统的全托管云原生 MQTT 服务。作为全球首个全托管的 MQTT 5.0 公有云服务,EMQX Cloud 提供了一站式运维代管、独有隔离环境的 MQTT 消息服务。

在万物互联的时代,EMQX Cloud 可以帮助用户快速构建面向物联网领域的行业应用,轻松实现物联网数据的采集、传输、计算和持久化。

EMQX Cloud

借助云服务商提供的基础计算设施,EMQX Cloud 面向全球数十个国家与地区提供服务,为 5G 与万物互联应用提供低成本、安全可靠的云服务。

创建和部署 EMQX Cloud

通过快速入门创建部署 EMQX Cloud,以下是创建完成的实例:

  • 基础版

EMQX Cloud 基础版

  • 专业版

EMQX Cloud 专业版

创建项目工程

本文使用 RT-Thread 官方 IDE:RT-Thread-Studio 来创建工程;

本次 Demo 使用的是 RT-Thread 官方的开发板 ART-Pi,通过板载 Wifi 模块进行联网,可以直接创建一个 art_pi_wifi 样例工程进行 MQTT 客户端的开发;

使用 RT-Thread 创建工程

工程配置和引入依赖包

  1. 进入配置页面

    RT-Thread 配置页面

    选择“More”

    RT-Thread More

  2. 启用 RTC 驱动

    启用 RTC 驱动

  3. 引入 MQTT 依赖包

    启动 TLS 需设置 MQTT 线程栈大小 ≥ 6144!

    引入 MQTT 依赖包

  4. 配置 mbedtls

    1. 选择 用户 CA 证书(单/双向认证)

    选择用户 CA 证书

    1. 选择无证书 SSL 连接(单向认证)

    选择无证书 SSL 连接

  5. 保存当前配置,IDE 会将配置更新到工程

    保存当前配置

  6. 修改宏 MEMP_NUM_NETDB2

    位于项目路径"rt-thread\components\net\lwip-2.0.2\src\include\lwip\opt.h:488"

    /**
     * MEMP_NUM_NETDB: the number of concurrently running lwip_addrinfo() calls
     * (before freeing the corresponding memory using lwip_freeaddrinfo()).
     */
    #if !defined MEMP_NUM_NETDB || defined __DOXYGEN__
    #define MEMP_NUM_NETDB                  2
    #endif
    

代码编写

  1. 打开 applications/main.c,可见 RT-Thread-Studio 已经帮我们生成好了连接 WiFi 和 LED 操作的代码

    #include <rtthread.h>
    #include <rtdevice.h>
    #include "drv_common.h"
    
    #define LED_PIN GET_PIN(I, 8)
    
    extern void wlan_autoconnect_init(void);
    
    int main(void)
    {
        rt_uint32_t count = 1;
        rt_pin_mode(LED_PIN, PIN_MODE_OUTPUT);
    
        /* init Wi-Fi auto connect feature */
        wlan_autoconnect_init();
        /* enable auto reconnect on WLAN device */
        rt_wlan_config_autoreconnect(RT_TRUE);
    
        while(count++)
        {
            rt_thread_mdelay(500);
            rt_pin_write(LED_PIN, PIN_HIGH);
            rt_thread_mdelay(500);
            rt_pin_write(LED_PIN, PIN_LOW);
        }
        return RT_EOK;
    }
    
    #include "stm32h7xx.h"
    static int vtor_config(void)
    {
        /* Vector Table Relocation in Internal QSPI_FLASH */
        SCB->VTOR = QSPI_BASE;
        return 0;
    }
    INIT_BOARD_EXPORT(vtor_config);
    
  2. 为了实现第一次启动也能自动连接WiFi,我们可以在 main() 加入连接函数;

     rt_wlan_connect(WIFI_SSID, WIFI_PASSWORD);
    
  3. 分别新建 mqtt-client.c 和 mqtt-client.h;

    static void mqtt_create(void)
    {
        /* init condata param by using MQTTPacket_connectData_initializer */
        MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
    
        static char client_id[50] = { 0 };
    
        rt_memset(&client, 0, sizeof(MQTTClient));
    
        /* config MQTT context param */
        {
            client.uri = MQTT_BROKER_URI;
    
            /* config connect param */
            memcpy(&client.condata, &condata, sizeof(condata));
            rt_snprintf(client_id, sizeof(client_id), "%s%d",MQTT_CLIENTID, rt_tick_get());
            client.condata.clientID.cstring = client_id;
            client.condata.keepAliveInterval = 60;
            client.condata.cleansession = 1;
            client.condata.username.cstring = MQTT_USERNAME;
            client.condata.password.cstring = MQTT_PASSWORD;
    
            /* config MQTT will param. */
            client.condata.willFlag = 1;
            client.condata.will.qos = MQTT_QOS;
            client.condata.will.retained = 0;
            client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
            client.condata.will.message.cstring = MQTT_WILLMSG;
    
            /* malloc buffer. */
            client.buf_size = client.readbuf_size = MQTT_PUB_SUB_BUF_SIZE;
            client.buf = rt_malloc(client.buf_size);
            client.readbuf = rt_malloc(client.readbuf_size);
            if (!(client.buf && client.readbuf))
            {
                rt_kprintf("no memory for MQTT client buffer!\n");
                goto _exit;
            }
    
            /* set event callback function */
            client.connect_callback = mqtt_connect_callback;
            client.online_callback = mqtt_online_callback;
            client.offline_callback = mqtt_offline_callback;
    
            /* set subscribe table and event callback */
            client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
            client.messageHandlers[0].callback = mqtt_sub_callback;
            client.messageHandlers[0].qos = MQTT_QOS;
    
            /* set default subscribe event callback */
            client.defaultMessageHandler = mqtt_sub_callback;
        }
    
        /* run mqtt client */
        paho_mqtt_start(&client);
    
        return;
    
    _exit:
        if (client.buf)
        {
            rt_free(client.buf);
            client.buf = RT_NULL;
        }
        if (client.readbuf)
        {
            rt_free(client.readbuf);
            client.readbuf = RT_NULL;
        }
        return;
    }
    

    对应 连接/订阅/上线/离线 回调函数

    static void mqtt_connect_callback(MQTTClient *c)
    {
        LOG_D("mqtt_connect_callback!");
    }
    
    static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
    {
        sub_count++;
        *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
        rt_kprintf("mqtt sub callback[%u]: \n topic: %.*s \n message %.*s",
                   sub_count,
                   msg_data->topicName->lenstring.len,
                   msg_data->topicName->lenstring.data,
                   msg_data->message->payloadlen,
                   (char *)msg_data->message->payload);
    }
    
    static void mqtt_online_callback(MQTTClient *c)
    {
        LOG_D("mqtt_online_callback!");
    }
    
    static void mqtt_offline_callback(MQTTClient *c)
    {
        LOG_D("mqtt_offline_callback!");
    }
    

    创建 pub 线程回调函数和 mqtt 客户端启动函数

    static void thread_pub(void *parameter)
    {
        pub_data = rt_malloc(TEST_DATA_SIZE * sizeof(char));
        if (!pub_data)
        {
            rt_kprintf("no memory for pub_data\n");
            return;
        }
    
        start_tm = time((time_t *) RT_NULL);
        rt_kprintf("test start at '%d'\r\n", start_tm);
    
        while (1)
        {
            rt_snprintf(pub_data, TEST_DATA_SIZE, "Pub EMQX message-%d", pub_count);
    
            if (!paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, pub_data))
            {
                ++pub_count;
            }
    
            rt_thread_delay(PUB_CYCLE_TM);
        }
    }
    
    void mqtt_client_start(void)
    {
        if (is_started)
        {
            return;
        }
    
        mqtt_create();
    
        while (!client.isconnected)
        {
            rt_kprintf("Waiting for mqtt connection...\n");
            rt_thread_delay(1000);
        }
    
        pub_thread_tid = rt_thread_create("pub_thread", thread_pub, RT_NULL, 1024, 8, 100);
        if (pub_thread_tid != RT_NULL)
        {
            rt_thread_startup(pub_thread_tid);
        }
    
        is_started = 1;
    
        return;
    }
    

    设置 MQTT 连接参数和用户名

    #define EMQX_Cloud_Professional_Version 1
    #define EMQX_Cloud_TLS_SSL 1
    
    #if !EMQX_Cloud_Professional_Version
    
    #if EMQX_Cloud_TLS_SSL
    #define MQTT_BROKER_URI         "ssl://ge06f1e1.cn-shenzhen.emqx.cloud:15455"
    #else
    #define MQTT_BROKER_URI         "tcp://ge06f1e1.cn-shenzhen.emqx.cloud:15915"
    #endif
    #else
    
    #if EMQX_Cloud_TLS_SSL
    #define MQTT_BROKER_URI         "ssl://oba9d641.emqx.cloud:8883"
    #else
    #define MQTT_BROKER_URI         "tcp://oba9d641.emqx.cloud:1883"
    #endif
    
    #endif
    
    #define MQTT_CLIENTID_PREFIX    "rtthread-mqtt"
    #define MQTT_USERNAME           "EMQX_RTT"
    #define MQTT_PASSWORD           "emqx_rtt_0813"
    #define MQTT_SUBTOPIC           "/emqx/mqtt/sub"
    #define MQTT_PUBTOPIC           "/emqx/mqtt/pub"
    #define MQTT_WILLMSG            "Goodbye!"
    #define MQTT_QOS                1
    

    main() 函数启动 MQTT client

    mqtt_client_start()
    

    也可在终端手动启动/停止

    mqtt_ctrl start
    mqtt_ctrl stop
    

    若选择用户 CA 证书验证,则将 CA 证书(双向认证还需 client.crt 和 client.key)放置到 packages/mbedtls-latest/certs 文件夹中

    CA 证书验证

    重新更新工程,会自动将证书内容复制到源文件中

    重新更新工程

    构建项目并下载到目标板上

    构建项目并下载到目标板上

    打开终端 Terminal 可以看到运行日志

    打开终端 Terminal 可以看到运行日志

使用 MQTTX 测试数据收发

客户端连接配置

单向无证书认证

单向无证书认证

单向自签名证书认证

单向自签名证书认证

双向认证

双向认证

订阅和发布

订阅和发布

查看 RT-Thread-Stdio 终端

查看 RT-Thread-Stdio 终端

数据收发正常!

完整代码

请见 MQTT-Client-Examples

免费试用 EMQX Cloud
无须绑定信用卡
开始试用 →

推荐阅读

2022-9-7Jiyong Huang
使用 Prometheus 监控 eKuiper 规则运行状态

本文介绍了 eKuiper 中的规则状态指标以及如何使用 Prometheus 监控这些状态指标,用户可以基于此进一步探索 Prometheus 的更多高级功能,更好地实现 eKuiper 的运维。