探索 EMQX 的可扩展性:EMQX 5.x 的插件机制解析
EMQX 具有卓越的可扩展性、出色的吞吐量和极低的延迟,同时还配备一个直观且现代的 Dashboard 与这些特性和功能相辅相成,使其成为许多物联网开发者的首选。
今天我们则要深入探讨 EMQX 另一个同样关键但可能被忽视的方面:可扩展性。我们将深入了解插件机制——这是 EMQX 从诞生之初就具备的一个特性,经过多个版本的迭代优化后,它 EMQX 5.x 版本中变得更加完善。
在本文中,我们将探讨:
- EMQX 插件的核心:要想深入探索插件的世界,首先要了解它们的基本原理。
- 构建插件:我们将用一个实际的例子,逐步演示如何为 EMQX 5.x 定制一个插件。
- 安装和运行插件:了解如何在一个运行中的 EMQX 集群中无缝地集成和运行插件。
- 操作技巧:不借助 EMQX Dashboard 用户界面或 CLI,直接从文件中安装插件的方法。
- 调试技巧:缩短编译、部署和测试周期的策略。
EMQX 插件揭秘
EMQX 插件本质上是 Erlang 应用程序。它们的工作方式是向 EMQX 的核心应用程序 emqx
注册(Hook)特定的回调函数。这些回调函数会在某些事件发生前后被触发。
需要明白的是,回调函数并不只是用于第三方插件的集成。实际上,EMQX 的许多内置功能(比如认证、授权和数据集成)都是基于这些回调函数来实现的。在 5.x 版本之前,这些功能被称为原生插件。
Hook 回调函数的实质
在 EMQX 的世界里,每个客户端都对应一个独立的 Erlang 进程。这个进程负责处理连接、会话或消息的各种生命周期事件,并触发相应的 Hook 回调函数。这种架构设计使得数百万客户端能够并发运行。这也意味着一个回调函数会被同时执行,因此,建议在设计回调时避免使用竞争资源。
Hook 回调函数的注册
当一个插件应用启动时,它会通过 EMQX 的 API 在指定的 Hook 点注册回调函数。这些注册信息会保存在一个 ets
表(名为 emqx_hooks
)中。
但是,这种关联并不是永久的。当插件应用关闭时,应该及时注销回调函数。
三大支柱:Hook 点
一般来说,有三类事件会触发回调函数:
- 客户端事件:它们涉及到 MQTT 连接生命周期的各个阶段。比如,当收到一个 CONNECT 包时,就会激活与
client.connect
Hook 点相对应的回调函数。 - 会话事件:它们包括 MQTT 会话生命周期从开始到结束的各个阶段。
- 消息事件:它们涵盖 MQTT 消息传递的全过程,例如当一个 PUBLISH 包到达或者向客户端发送一个 PUBACK 时。
截至目前,我们向开发者们提供了 20 个不同的 Hook 点。
一个简单的示例:创建我们的第一个插件
在这个示例中,我们将尝试创建一个插件,它可以用一个特殊的规则来增强 EMQX 的访问控制:客户端只能订阅与模式 msg/{{user-id}}/whatever
相匹配的主题,其中 {{user-id}}
是从 MQTT 客户端 ID 中截取出来的。
在我们开始之前,有必要提一下,EMQX 有很多内置的访问控制(授权)解决方案,因此通常没有必要再开发一个插件。
例如,如果需求是允许客户端订阅以其客户端 ID 为前缀的任意主题,那么内置的基于文件的 ACL 规则就可以满足,只需在 acl.conf
的开头添加 {allow, all, subscribe, ["msg/${clientid}/#"]}.
即可。
但是,在这个示例中,我们需要从客户端 ID 的一部分(而不是整个客户端 ID)中截取 user-id,这是目前内置功能无法实现的。
准备:安装 Erlang/OTP
本文基于 EMQX 5.1 版本,该版本已在 Erlang/OTP 25 上正式发布。可以在这里阅读更多相关信息:https://github.com/emqx/emqx/blob/master/README.md#build-from-source
您也可以在更新版本的 Erlang/OTP 上编译 EMQX,但如果想把插件加载到 EMQX 的官方发行包中,那么就不能用更新的 Erlang/OTP 来构建插件。
第 1 步:从模板生成框架代码
EMQX 提供了一个 rebar3 模板,可以用来生成插件的框架代码。具体步骤如下。
在
~/.config/rebar3/templates(模板)
目录安装插件$ mkdir -p ~/.config/rebar3/templates $ cd ~/.config/rebar3/templates $ git clone https://github.com/emqx/emqx-plugin-template.git
生成框架模块
$ cd /path/to/my/project $ rebar3 new emqx-plugin emqx_simple_acl
生成后的插件项目结构如下:
tree emqx_simple_acl/
emqx_simple_acl/
├── erlang_ls.config
├── get-rebar3
├── LICENSE
├── Makefile
├── priv
│ └── config.hocon
├── README.md
├── rebar.config
├── rebar.lock
└── src
├── emqx_simple_acl_app.erl
├── emqx_simple_acl.app.src
├── emqx_simple_acl_cli.erl
├── emqx_simple_acl.erl
└── emqx_simple_acl_sup.erl
第 2 步:修改生成的代码
框架代码包含了所有可用的 Hook 点,并在每个回调函数中输出调试信息。由于我们只需要使用 client.subscribe
Hook 点,所以可以直接删除其他多余的代码。
删除后,模块 emqx_simple_acl.erl
的内容如下:
-module(emqx_simple_acl).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ load/1
, unload/0
]).
-export([ on_client_subscribe/4 ]).
load(Env) ->
hook('client.subscribe', {?MODULE, on_client_subscribe, [Env]}).
unload() ->
unhook('client.subscribe', {?MODULE, on_client_subscribe}).
on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
io:format("Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),
{ok, TopicFilters}.
hook(HookPoint, MFA) ->
emqx_hooks:add(HookPoint, MFA, _Property = ?HP_HIGHEST).
unhook(HookPoint, MFA) ->
emqx_hooks:del(HookPoint, MFA).
除了修改代码,还需要更新以下位置的描述文本:
src/emqx_simple_acl.app.src
文件中的description
字段。这个字段用来描述 Erlang 应用程序的功能和特点。README.md
文件的内容。这个文件清晰地说明了插件的作用、用法和维护方式等信息。rebar.confg
文件中的emqx_plugrel
部分。这部分提供了插件的程序包信息,这些信息会在 CLI 输出和仪表板上显示给用户。下面是一个示例:
%% 插件的其他信息
{emqx_plugrel,
[ {authors, ["Zaiming (stone) Shi"]}
, {builder,
[ {name, "zmstone"}
, {contact, "contact@emqx.com"}
, {website, "http://emqx.com"}
]}
, {repo, "https://github.com/zmstone/emqx_simple_acl"}
, {functionality, ["Demo"]}
, {compatibility,
[ {emqx, "~> 5.0"}
]}
, {description, "My simple ACL."}
]
}.
第 3 步:实现规则
我们需要修改的是 on_client_subscribe
回调函数,它在 EMQX 接受订阅并在系统中注册它们之前执行。
我们要实现的功能可以概括为:
- Parse
ClientId
(which is a binary string), to get user ID. - Filter the
TopicFilters
list, drop the ones that do not match the patternmsg/{{user-id}}/#
- 从
ClientId
(二进制字符串)中提取用户 ID。 - 筛选
TopicFilters
列表,删除不符合msg/{{user-id}}/#
模式的项。
假设客户端 ID 的格式是 {{region}}-{{type}}-{{user-id}}
,我们需要截取由破折号分隔的字符串的最后一部分。具体实现如下:
on_client_subscribe(#{clientid := ClientId}, _Properties, Subscriptions, _Env) ->
io:format("Client(~s) will subscribe: ~0p~n", [ClientId, topics(Subscriptions)]),
case parse_client_id_for_user_id(ClientId) of
{ok, UserId} ->
Allowed = lists:filter(fun(S) -> is_valid_subscription(UserId, S) end, Subscriptions),
io:format("Client(~s) is allowed to subscribe: ~0p~n", [ClientId, topics(Allowed)]),
{ok, Allowed};
{error, invalid_clientid} ->
io:format("Client(~s) is not allowed to subscribe to any topics~n", [ClientId]),
%% 此处返回空列表表示没有订阅任何主题
{ok, []}
end.
%% 获取模式 {{region}}-{type}}-{user-id}} 的客户端 ID,
%% 并返回 {{user-id}}。
%% 如果客户端 ID 与此模式不匹配,
%% 我们就认为它不是一个有效的客户端,不允许它订阅任何主题。
parse_client_id_for_user_id(ClientId) ->
case binary:split(ClientId, <<"-">>, [global]) of
[_Region, _Type, UserId] when UserId =/= <<>> ->
{ok, UserId};
_ ->
{error, invalid_clientid}
end.
%% 检查主题是否以 "msg/{{userid}}/" 开头
is_valid_subscription(UserId, {Topic, _SubOpts}) ->
Size = size(UserId),
case Topic of
<<"msg/", UserId:Size/binary, "/", _/binary>> ->
true;
_ ->
false
end.
%% 订阅是一个包含 {主题、订阅选项} 的列表。
topics(Subs) ->
lists:map(fun({T, _SubOpts}) -> T end, Subs).
第 4 步:构建插件
执行命令 make rel
即可生成一个插件包,文件名为 _build/default/emqx_plugrel/emqx_simple_acl-1.0.0.tar.gz
。
安装和运行插件
您可以运行 emqx ctl plugins
命令来管理插件,但是一个更直接的方法是在仪表板用户界面中管理它。
我们可以用下列命令启动 EMQX:docker run --name emqx -it --rm -p 18083:18083 -p 1883:1883 emqx/emqx:5.1.5
然后在端口 18083 打开仪表板,用 admin
public
登录(第一次登录时,需要修改密码)。您应该能在“管理”分组下找到“插件”菜单。点击右上角的“+ 安装插件”按钮,就会跳转到一个上传页面。
安装完成后,应该能在插件列表中看到它,它的初始状态是“未激活”。
在我们启动它之前,让我们先检查一下。点击它的名称,就能看到这个插件的所有信息。
描述文本是从 README.md
和 rebar.config
文件中获取的。
现在我们可以点击“启动”按钮来启动它,会看到插件变成了“激活”状态。
我们也可以用 CLI 来检查它:docker exec -it emqx bash -c 'emqx ctl plugins list'
[
{
"running_status" : "running",
"repo" : "https://github.com/zmstone/emqx_simple_acl",
"rel_vsn" : "1.0.0",
"rel_apps" : [
"emqx_simple_acl-0.1.0",
"map_sets-1.1.0"
],
"name" : "emqx_simple_acl",
"metadata_vsn" : "0.1.0",
"git_ref" : "unknown",
"functionality" : [
"Demo"
],
"description" : "My simple ACL.",
"date" : "2023-08-30",
"config_status" : "enabled",
"compatibility" : {
"emqx" : "~> 5.0"
},
"built_on_otp_release" : "25",
"builder" : {
"website" : "http://emqx.com",
"name" : "zmstone",
"contact" : "contact@emqx.com"
},
"authors" : [
"Zaiming (Stone) Shi"
]
}
]
验证插件
首先,我们来看看插件是否已经成功地在 emqx_hooks
表中注册了回调函数。
通过连接到 EMQX 的远程控制台,我们可以像下面这样查看所有的 Hook:
$ docker exec -it emqx bash -c 'emqx remote_console'
Erlang/OTP 25 [erts-13.2.2] [emqx] [64-bit] [smp:20:20] [ds:20:20:10] [async-threads:1] [jit:ns]
Restricted Eshell V13.2.2 (abort with ^G)
v5.1.5-build.3(emqx@172.17.0.2)1> rr(emqx_hooks).
[callback,hook]
v5.1.5-build.3(emqx@172.17.0.2)2> ets:tab2list(emqx_hooks).
[...
#hook{name = 'client.subscribe',
callbacks = [#callback{action = {emqx_simple_acl,on_client_subscribe,
[[]]},
filter = undefined,priority = 1000}]},
...
]
接下来,我们尝试连接一个 MQTT 客户端来测试插件是否按照预期运行。
我们将在测试中使用 MQTTX 命令行工具。
如果我们使用客户端 ID region1-type1-user1
连接,然后订阅 msg/user1/#
,如下所示:
mqttx sub -h localhost -p 1883 -i region1-type1-user1 -t msg/user1/#
我们应该能够在仪表板上看到该订阅,如下图所示:
同时,在 docker 控制台上也可以看到调试输出,如下所示:
Client(region1-type1-user1) will subscribe: [<<"msg/user1/#">>]
Client(region1-type1-user1) is allowed to subscribe: [<<"msg/user1/#">>]
如果我们使用一个错误的主题模式测试,比如 $ mqttx sub -h localhost -p 1883 -i region1-type1-user1 -t msg/userX/0
,我们将看到如下日志。
Client(region1-type1-user1) will subscribe: [<<"msg/userX/0">>]
Client(region1-type1-user1) is allowed to subscribe: []
如果我们使用一个不符合 {{region}}-{{type}}-{{user-id}}
模式的客户端 ID 连接,比如 $ mqttx sub -h localhost -p 1883 -i user1 -t msg/user1/0
,那么客户端将无法订阅任何主题。
Client(user1) will subscribe: [<<"msg/user1/0">>]
Client(user1) is not allowed to subscribe to any topics
操作技巧
我们已经了解到 EMQX 把插件识别为 .tar.gz
格式的压缩包。您可以通过仪表板界面或 CLI 来查看这些插件的状态(是否激活、停用等)。但是,对于喜欢自动化的系统管理员,还有一些不明确的地方:
- EMQX 把上传的压缩包保存在哪里?
- EMQX 如何管理每个插件的状态?
一旦我们弄清楚这些问题,自动化过程就会变得更加顺畅。
插件包解压
正如许多人所见,EMQX v5 插件实际上是 .tar.gz
格式的文件,其内容与普通的 Erlang 应用程序没有太大区别。EMQX 将这个压缩包解压到插件目录中。例如:
docker exec -it emqx bash -c 'ls /opt/emqx/plugins/emqx_simple_acl-1.0.0/'
README.md emqx_simple_acl-0.1.0 map_sets-1.1.0 release.json
插件状态存储
通过 docker exec -it emqx bash -c 'cat /opt/emqx/data/configs/cluster.hocon'
查看集群的配置文件,内容如下:
plugins {
install_dir = plugins
states = [
{
enable = true
name_vsn = emqx_simple_acl-1.0.0
}
]
}
每个安装的插件都会把自己的状态保存在 plugins.states
数组中。这种数组格式保证了插件的加载和初始化顺序是一致的。
要在 EMQX 节点启动之前预设插件的状态,可以通过与 etc/emqx.conf
文件结合来实现状态存储。对于 Docker 用户而言,这表示可以从主机挂载,或者使用 Kubernetes 的 ConfigMap。但是,请注意,当在 Docker 中操作时,数据目录充当 Docker 卷。务必确保 cluster.hocon
文件对 EMQX 是可写的,以允许存储实时的、集群同步的配置更改。因此,请避免直接挂载 cluster.hocon
文件。
调试技巧
如果您需要频繁地重新构建程序包、上传并重启插件,那么使用 EMQX 进行集成测试就会很麻烦。
这时,EMQX 的热补丁机制就派上用场了。
修改了 emqx_simple_acl.erl
模块后,使用 make
命令可以重新编译代码。新编译的代码会被保存到 _build/default/lib/emqx_simple_acl/ebin/emqx_simple_acl.beam
。
可以使用下面的命令,无需重启 EMQX 就可以加载新的 beam 文件:
docker cp _build/default/lib/emqx_simple_acl/ebin/emqx_simple_acl.beam emqx:/opt/emqx/plugins/emqx_simple_acl-1.0.0/emqx_simple_acl-0.1.0/ebin/
docker exec -it emqx bash -c 'emqx eval "c:lm()."'
成功更新 beam 文件后,应该显示 [{module,emqx_simple_acl}]
。
结语
EMQX 是一个强大的平台,它不仅拥有丰富的内置功能,还能通过其灵活的插件系统进行定制化扩展。本文向读者介绍了 EMQX 插件的基本概念,从插件的核心原理到如何亲自动手开发一个适用于 EMQX 5.x 的插件。我们还探讨了在 EMQX 环境中集成和使用插件的便捷方法。对于那些追求高效工作方式的用户,我们的操作技巧提供了一些绕过传统 UI 或 CLI 的方式,而我们的调试策略旨在缩短开发周期。无论您是资深开发者还是 EMQX 新手,这些信息都将帮助您充分发挥 EMQX 插件生态系统的优势。
示例代码已发布到 GitHub 仓库:zmstone/emqx_simple_acl
您可以在 EMQX 官方文档的 v5 插件部分找到更简明的示例。
关于 Hook 的更详细信息,请浏览:https://docs.emqx.com/en/enterprise/v5.1/extensions/hooks.html。