活动报名
消费电子产品的智能体升级之路 →

使用 MQTT 和 Kafka 三分钟构建车联网流数据管道

EMQX Team
2023-9-12
使用 MQTT 和 Kafka 三分钟构建车联网流数据管道

引言

在当今的物联网领域,MQTT 和 Kafka 的集成为各种应用场景提供了巨大的价值。无论是网联汽车和车载信息通信系统、智慧城市基础设施,还是工业物联网监控、物流管理,MQTT 和 Kafka 的结合都能帮助这些场景实现无缝、高效和实时的数据处理。

本文将为您演示如何高效集成 MQTT 和 Kafka。我们将模拟车辆设备及其动态车联网数据,将它们连接到 MQTT Broker,然后再将数据发送到 Apache Kafka。本文选择 EMQX 作为 MQTT Broker,它内置了 Kafka 数据集成功能,可以简化演示流程。

前提条件

  • Git
  • Docker Engine: v20.10+
  • Docker Compose: v2.20+

工作原理

MQTT 和 Kafka 集成架构如下图所示:

MQTT 和 Kafka 集成架构

这是一个既简单又有效的架构,没有使用复杂的组件。它只使用了以下 3 个基本组件:

组件名称 版本 说明
MQTTX CLI 1.9.3+ 用于模拟车辆并生成测试数据的命令行工具。
EMQX Enterprise 5.0.4+ 车辆与 Kafka 系统之间进行信息交换的 MQTT Broker。
Kafka 2.8.0+ 一个分布式流媒体平台,用于接收、存储和处理车辆数据。

除了上述基础组件外,EMQX 还具备丰富的监控功能。您可以利用以下组件来实时监控 EMQX 的性能指标和负载情况:

组件名称 版本 说明
EMQX Exporter 0.1 用于 EMQX 的 Prometheus Exporter。
Prometheus v2.44.0 开源的系统监控和警报工具包。
Grafana 9.5.1+ 用于显示和分析收集的数据的可视化平台。

现在,您已经了解了这个项目的基本架构,下面让我们开始演示吧!

5 步轻松搭建 MQTT 到 Kafka 的演示

1. 将项目克隆到本地

emqx/mqtt-to-kafka 存储库克隆到本地,并初始化子模块以启用 EMQX Exporter(可选):

git clone https://github.com/emqx/mqtt-to-kafka
cd mqtt-to-kafka

# Optional
# 可选
git submodule init
git submodule update

代码库由 3 部分组成:

  • emqx 文件夹包含 EMQX-Kafka 集成配置,用于在自动启动 EMQX 时创建规则和数据桥接。
  • emqx-exporterprometheusgrafana-provisioning 文件夹包含 EMQX 的监控配置。
  • docker-compose.yml 可编排多个组件,一键启动项目。

2. 启动 MQTTX CLI、EMQX 和 Kafka

请确保已经安装 Docker,然后在后台运行 Docker Compose,开始演示:

docker-compose up -d

现在,MQTTX CLI 模拟的 10 辆特斯拉汽车连接到 EMQX,并以每秒一次的频率向 mqttx/simulate/Tesla/{clientid} 主题报告其状态。

EMQX 会创建一条规则来接收来自特斯拉的消息。您也可以修改该规则,使用 EMQX 的内置 SQL 函数添加自定义消息处理方法:

SELECT
  payload
FROM
  "mqttx/simulate/#"

EMQX 还会创建一个数据桥接,通过以下配置将车辆数据传送到 Kafka:

  • 向 Kafka 中的 my-vehicles 主题发布消息
  • 使用每辆车的客户端 ID 作为消息 Key
  • 使用消息发布时间作为消息时间戳

EMQX kafka 配置

3. 从 EMQX 订阅车辆数据

Docker Compose 包含一个用于打印所有车辆数据的订阅者。可以使用以下命令查看数据:

$ docker logs -f mqttx
[8/4/2023] [8:56:41 AM] › topic: mqttx/simulate/tesla/mqttx_063105a2
payload: {"car_id":"WLHK53W2GSL511787","display_name":"Roslyn's Tesla","model":"S...

使用任何 MQTT 客户端都可以订阅和接收数据:

mqttx sub -t mqttx/simulate/tesla/+

4. 从 Kafka 订阅车辆数据

如果一切运行正常,EMQX 会将车辆中的数据实时传输到 Kafka 的 my-vehicles 主题中。您可以使用以下命令从 Kafka 中获取数据:

docker exec -it kafka \
   kafka-console-consumer.sh \
   --topic my-vehicles \
   --from-beginning \
   --bootstrap-server localhost:9092

您将收到类似下面的 JSON 数据:

{"vin":"EDF226K7LZTZ51222","speed":39,"odometer":68234,"soc":87,"elevation":4737,"heading":33,"accuracy":24,"power":97,"shift_state":"D","range":64,"est_battery_range":307,"gps_as_of":1681704127537,"location":{"latitude":"83.3494","longitude":"141.9851"},"timestamp":1681704127537}

该数据的灵感来自 TeslaMate,这是一个强大的自托管特斯拉数据记录器。您可以查看 MQTTX CLI 脚本,了解数据是如何生成的。

5. 查看 EMQX 指标(可选)

如果您在第一步中启用了 EMQX Exporter,它将收集所有 EMQX 的指标,包括客户端连接、消息速率、规则执行情况等等,这将为系统提供有用的信息。

要在 Grafana 仪表板中查看 EMQX 指标,请在浏览器中打开 http://localhost:3000, 使用用户名 admin 和密码 public 登录。

结语

本文介绍了如何利用 MQTT 和 Kafka 构建车联网流数据管道。我们采用 EMQX 作为 MQTT Broker,借助 EMQX 的数据集成功能将数据实时传输到 Kafka,从而实现了一个收集和处理流数据的端到端解决方案。

接下来,您可以直接将应用集成到 Kafka 中,获取车辆数据并实现它们的解耦。您还可以利用 Kafka Streams 对汽车数据进行实时流处理,进行统计分析和异常检测。处理结果还可通过 Kafka Connect 输出到其他系统。

本文提供的是构建可扩展和可靠的流数据管道的入门示例。欢迎读者基于本文继续探索 MQTT 和 Kafka 的强大融合为各个领域的实时分析、监控和决策带来的更多可能。

相关资源:

  • 访问 GitHub 链接 查看使用 Kafka 简化 MQTT 数据集成的演示。
  • 参考 EMQX 文档 了解如何从头开始配置此数据流管道。
联系 EMQ 解决方案专家
联系我们 →

推荐阅读