AI 赋能边缘计算:在 Kuiper 中运行 TensorFlow Lite 模型
EMQX Kuiper 是一款边缘轻量级物联网数据分析/流软件,可在各种资源受限的物联网设备上运行。
TensorFlow Lite 是一组帮助开发人员在移动端、嵌入式和物联网设备上运行 TensorFlow 模型的工具,它使得设备上的机器学习预测具有低延迟和较小的二进制容量。
通过集成 Kuiper 和 TensorFlow Lite,用户可以通过包含预先构建的 TensorFlow 模型的 AI 分析流中的数据。 在本教程中,我们将引导您构建一个 kuiper 插件,通过预先训练的图像识别 TensorFlow 模型,标记边缘设备生成的流图片(二进制数据)。
条件准备
如需运行 TensorFlow Lite 解释器,我们需要一个经过训练的模型。本篇教程将不介绍如何训练和涵盖这个模型,您可以通过查看 tflite converter 了解详情。我们既可以训练一个新的模型,也可以在线选择一个。在本教程中,我们将使用 mattn/go tflite 的 label image 模型。该 repo 为 tflite C API 创建了 golang 绑定。我们还将使用它来实现我们的插件。
开发插件
为了集成 Kuiper 和 TensorFlow Lite,我们将开发一个定制的 Kuiper 函数插件,供 Kuiper 规则使用。例如,我们将创建 LabelImage
函数,其输入是表示图像的二进制类型数据,输出是表示图像标签的字符串。例如,如果输入图像中有孔雀,LabelImage(col)
将输出“孔雀”。
要开发函数插件,我们需要:
- 创建插件 go 文件。例如,在 kuiper 源代码中,创建 plugins/functions/labelImage/labelImage.go 文件。
- 创建一个实现 api.函数接口 的 struct。
- 导出 struct。
实现的关键是 Exec 函数。伪代码如下:
func (f *labelImage) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool) {
//... 初始化和验证
// 解码输入图像
img, _, err := image.Decode(bytes.NewReader(arg[0]))
if err != nil {
return err, false
}
var outerErr error
f.once.Do(func() {
// 加载标签、tflite模型并初始化tflite解释器
})
// 对输入图像运行解释器
// 返回可能性最大的标签
return result, true
}
此外还需要注意插件的导出。该函数是无状态的,因此我们将仅导出一个 struct 实例。所有使用此函数的规则都会共享一个实例,以避免创建实例和加载模型的开销。模型和标签路径将在实例化时指定。
var LabelImage = labelImage{
modelPath: "labelImage/mobilenet_quant_v1_224.tflite",
labelPath: "labelImage/labels.txt",
}
查阅 本教程 以获得创建 Kuiper 插件的详细步骤。请参阅 labelImage.go 以获取完整的源代码。
构建并安装插件
要使用该插件,我们需要在运行 Kuiper 的环境中对其进行构建,然后将其安装在 Kuiper 中。
通过预构建的 zip 安装
如果使用基于 debian 的带有 1.1.1 或 1.1.1-slim标签的 Kuiper docker 镜像,我们可以安装预构建的 labelImage插件。例如,要在 docker image emqx/kuiper:1.1.2-slim 中安装 Kuiper 1.1.2 插件,则预构建的 zip 文件位于 https://www.emqx.com/zh/downloads/kuiper-plugins/v1.1.2/debian/functions/labelImage_amd64.zip\。按如下所示运行 rest命令以进行安装。
POST http://{{kuiperHost:kuiperRestPort}}/plugins/functions
Content-Type: application/json
{"name":"labelImage", "file": "https://www.emqx.com/zh/downloads/kuiper-plugins/v1.1.2/debian/functions/labelImage_amd64.zip"}
手动构建
- 如果您不使用官方的 Kuiper docker 镜像运行 Kuiper,由于 golang 插件的限制,预构建的 labelImage 插件将不适用。您需要手动构建插件。手动创建插件 zip 文件有3个步骤:
- 构建 TensorFlowLite C API。
- 构建 labelImage 插件。
- 将插件与安装脚本打包在一起。
构建 TensorFlowLite C API
有一个来自 tensorflow repo 的关于构建C API的非常简单的 说明 。 我们将在本节中逐步详细展开。 请注意,该插件仅针对 TensorFlow v2.2.0-rc3 进行测试,因此我们将以此版本为基础进行构建。 以 ubuntu 为例,以下是构建步骤:
安装 Python 3.
将 requirements.txt 复制到您指定位置。 安装所需的 python 库:
pip3 install -r requirements.txt
。 requirements 来自相应 TensorFlow 版本的tensorflow/tensorflow/tools/pip_package/setup.py
。安装 TensorFlow 的构建工具 Bazel。
克隆 tesorflow repo,通过
git checkout v2.2.0-rc3 -b mybranch
命令切换到所需的分支。生成目标 .so 文件,输出将位于 ./bazel-bin 中。 将两个 so 文件复制到 tensorflow/lib 文件夹中。
$ cd $tensorflowSrc $ bazel build --config monolithic -c opt //tensorflow/lite:libtensorflowlite.so $ bazel build --config monolithic -c opt //tensorflow/lite/c:libtensorflowlite_c.so $ mkdir lib $ cp bazel-bin/tensorflow/lite/libtensorflowlite.so lib $ cp bazel-bin/tensorflow/lite/c/libtensorflowlite_c.so lib
安装 so 文件。
- 更新 ldconfig 文件
sudo vi / etc / ld.so.conf.d / tflite.conf
。 - 将路径
{{tensorflowPath}}/lib
添加到 tflite.conf,然后保存并退出。 - 运行 ldconfig:
sudo ldconfig
。 - 检查安装结果:
ldconfig -p | grep libtensorflow
。 确保列出了两个so文件。
- 更新 ldconfig 文件
构建 labelImage 插件
确保已克隆 Kuiper github repo。 插件源文件位于 plugins/functions/labelImage/labelImage.go 中。 在构建插件之前,导出 tensorflow repo 和构建库的路径。
$ cd {{kuiperRepoPath}}
$ export CGO_CFLAGS=-I/root/tensorflow
$ export CGO_LDFLAGS=-L/root/tensorflow/lib
$ go build -trimpath --buildmode=plugin -o plugins/functions/LabelImage.so plugins/functions/labelImage/*.go
通过这些命令,插件将构建到 plugins/functions/LabelImage.so 中。出于开发目的,您可以重新启动 Kuiper 以自动加载此插件并进行测试。测试完成后,我们应该将其打包为一个 zip 文件,该文件可供 Kuiper 插件安装API 使用,以便可以在其他计算机(例如生产环境)中使用。
打包插件
将 plugins/functions/labelImage 目录中的所有文件和目录与构建的 LabelImage.so 一起打包到一个 zip 文件中。 zip文件的文件结构应类似于:
- etc
- labels.txt
- mobilenet_quant_v1_224.tflite
- lib
- libtensorflowlite.so
- libtensorflowlite_c.so
- install.sh
- LabelImage.so
- tflite.conf
将打包的插件安装到目标系统,如 通过预构建 zip 安装 所示。
运行插件
插件安装后,我们就可以在规则中使用它了。 我们将创建一个规则用于接收来自 MQTT 主题的图像字节数据,并通过 tflite 模型标记该图像。
定义流
通过 Kuiper rest API 定义流。 我们创建一个名为 tfdemo 的流,其格式为二进制,主题为 tfdemo。
POST http://{{host}}/streams
Content-Type: application/json
{"sql":"CREATE STREAM tfdemo () WITH (DATASOURCE=\"tfdemo\", FORMAT=\"BINARY\")"}
定义规则
通过 Kuiper rest API 定义规则。 我们将创建一个名为 ruleTf 的规则。 我们只是从 tfdemo 流中读取图像,然后对其运行自定义函数 labelImage。 返回结果将是 AI 识别的图像的标签。
POST http://{{host}}/rules
Content-Type: application/json
{
"id": "ruleTf",
"sql": "SELECT labelImage(self) FROM tfdemo",
"actions": [
{
"log": {}
}
]
}
输入数据
在这里,我们创建了一个 go 程序,用于将图像数据发送到 tfdemo 主题以便由规则进行处理。
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"io/ioutil"
"time"
)
func main(){
const TOPIC = "tfdemo"
images := []string{
"peacock.png",
"frog.jpg",
// 其他你需要的图像
}
opts := mqtt.NewClientOptions().AddBroker("tcp://yourownhost:1883")
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
for _, image := range images {
fmt.Println("Publishing " + image);
payload, err := ioutil.ReadFile(image)
if err != nil{
fmt.Println(err)
continue
}
if token := client.Publish(TOPIC, 0, false, payload); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
} else {
fmt.Println("Published " + image);
}
time.Sleep(1 * time.Second)
}
client.Disconnect(0)
}
运行 pub.go,它将开始将图像输入 tfdemo 主题。
检查结果
因为我们的规则定义只有一个目标:log,所以结果将被写入日志文件。 我们用 peacock.png 和 frog.png 两个图像填充流。 检查日志文件,我们会发现:
time="2021-02-05 16:23:29" level=info msg="sink result for rule ruleTf: [{\"labelImage\":\"peacock\"}]" file="sinks/log_sink.go:16" rule=ruleTf
time="2021-02-05 16:23:30" level=info msg="sink result for rule ruleTf: [{\"labelImage\":\"bullfrog\"}]" file="sinks/log_sink.go:16" rule=ruleTf
图像标记正确。
结论
通过本篇教程,您可以构建自定义的 Kuiper 插件,以使用预先训练好的 TensorFlow Lite 模型。如果需要使用其他模型,则只需按照规定步骤创建另一个函数即可。请注意,如果在同一环境中运行,构建的 TensorFlow C API 可以在所有函数之间共享。希望本文能帮助您更好地在边缘计算中利用 AI,让 AI 为您的边缘设备赋能。