Efficient Persistence of IoT Streaming Data | EMQX and HStreamDB Data Integration Practice
Table of Contents
In the context of the IoT, one commonly faces significant challenges, such as handling a vast number of devices, coping with high data generation rates, and dealing with large accumulated data volumes. Consequently, the problem of how to access, store, and process these massive amounts of device-generated data has become a critical issue.
EMQX is a powerful MQTT message broker designed for the IoT, capable of handling billions of device connections in a single cluster, while also offering a rich set of data aggregation capabilities. HStreamDB is a distributed streaming database that not only efficiently stores massive device data generated by EMQX but also provides real-time processing and analysis capabilities. Both EMQX and HStreamDB are highly scalable and reliable. By integrating them, one can not only meet the performance and stability requirements of large-scale IoT applications but also improve the real-time nature of the applications.
With the release of EMQX Enterprise 4.4.15, which includes support for the latest version of HStreamDB, this article provides a comprehensive guide on how to leverage the EMQX rule engine to persist data to HStreamDB, thereby enabling the storage and real-time processing of MQTT data streams.
Note: For the integration steps presented in this article, it is assumed that the user is utilizing EMQX 4.4.15 and HStreamDB 0.14.0 or a later version.
Connecting to HStreamDB cluster
In the following tutorial, we assume that there is a running EMQX Enterprise cluster and running HStream cluster. To develop or obtain an EMQX Enterprise cluster, please refer to the EMQX Enterprise docs. To develop or obtain an HStreamDB cluster, please refer to the HStreamDB docs, which contains instructions on how to quick start with Docker or deploy locally or on the cloud.
We can connect to the HStreamDB cluster through the HStream client. Here we use Docker for this purpose:
# get some help
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream --help
We use command hstream stream
here to create some streams, for demonstration purposes:
# create streams via `hstream stream`
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream stream create basic_condition_info_0 -r 3 -b $(( 7 * 24 * 60 * 60 ))
Then, connect to interactive HStream SQL Shell:
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql --service-url "<<YOUR-SERVICE-URL>>"
# and fill --tls-ca, --tls-key, --tls-cert if you are using TLS or HStreamDB cloud
If the connection to cluster is established, we will see:
__ _________________ _________ __ ___
/ / / / ___/_ __/ __ \/ ____/ | / |/ /
/ /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
/ __ /___/ // / / _, _/ /___/ ___ |/ / / /
/_/ /_//____//_/ /_/ |_/_____/_/ |_/_/ /_/
Command
:h To show these help info
:q To exit command line interface
:help [sql_operation] To show full usage of sql statement
SQL STATEMENTS:
To create a simplest stream:
CREATE STREAM stream_name;
To create a query select all fields from a stream:
SELECT * FROM stream_name EMIT CHANGES;
To insert values to a stream:
INSERT INTO stream_name (field1, field2) VALUES (1, 2);
After enter the HStream SQL Shell, we can use show streams;
to view info about existed streams:
> show streams;
+------------------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+------------------------+---------+----------------+-------------+
| basic_condition_info_0 | 1 | 0 seconds | 1 |
+------------------------+---------+----------------+-------------+
Creating an HStreamDB Resource
Before utilizing the EMQX Rule Engine to persist data to HStreamDB, it is necessary to create an HStreamDB resource.
To do this, access the EMQX Dashboard, click on Rules Engine
-> Resources
-> Create
, select HStreamDB Resources
, enter the resource address, and fill out the necessary options. The available options are:
Name | Description | Type | Required | Default Value |
---|---|---|---|---|
HStream Server | HStream Server | String | Yes | http://127.0.0.1:6570 |
Pool Size | Size of HStream Connection Pool | Number | Yes | 8 |
gRPC Timeout | Timeout for gRPC calls to the HStreamDB server (ms) | Number | No | 5000 |
Enable SSL | Enable SSL | Boolean | No | No |
When SSL is enabled, there are some additional options, which user can paste or upload SSL configurations.
Creating a Rule
Click on Rules Engine
-> Rule
-> Create
.
Edit the SQL rule and add an action, where you can use an SQL variable in a string template.
Please note that the SQL rules presented in this document are for demonstration purposes only, and actual SQL should be written according to the business design.
Click Add Action
and select Data Persistence
to save the data to HStreamDB. Select the resource that was created in the previous step and enter the parameters. The available parameters are:
Name | Description | Type | Required | Default Value |
---|---|---|---|---|
Stream | Stream. Cannot be placeholders | String | Yes | |
PartitionKey | PartitionKey. Placeholders supported | String | No | default PartitionKey |
gRPC Flush Period | gRPC Flush Period (ms) | Number | No | 10000 |
Enable Batch Append | Enable Batch Append | Boolean | No | true |
Max Batch Append Count | Max Batch Append Count | Number | No | 100 |
Max Batch Interval | Maximum interval in milliseconds that is allowed between two successive (batch) request (ms) | Number | No | 500 |
Click Confirm
to create the rule.
Getting timely data updates from HStream SQL Shell
Data that has been persisted from the EMQX rules engine to HStreamDB can be read in real-time using the HStream SQL Shell to query new data written to the stream. With the data already written to HStream, any consumer can be used to consume messages. The documentation presents a simple consumption method using the HStream SQL Shell. In addition, readers are free to choose their preferred programming language and SDK to write their consumer application.
# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;
The current select query does not have any results to print because no data has been written to HStreamDB through the EMQX rules engine yet. Once data is written, you will be able to observe real-time updates to the data in the HStream SQL Shell. Currently, when using SQL to query streams in HStream, the results are printed after the query is executed. If a query is created after EMQX has stopped writing to HStreamDB, the results may not be observed. For more information and concepts about streams and SQL in HStreamDB, please refer to the HStreamDB SQL documentation.
Writing Data to EMQX
Now use the state-of-the-art MQTT desktop client - MQTTX to connect to EMQX and send a message.
Obtaining the Running Status of the Rule Engine from EMQX Dashboard
We can check the metrics of rules via EMQX Dashboard:
If the metrics for the rules engine are operating normally, this means that EMQX is persisting data to HStreamDB. Once data is successfully written, real-time updates to the data can be obtained through the HStream SQL Shell.
> select * from basic_condition_info_0 emit changes;
{"current-number-of-people":247.0,"device-health":true,"number-of-people-in-line":14.0,"submitter":"admin-07","temperature":27.0}
{"current-number-of-people":220.0,"device-health":true,"number-of-people-in-line":13.0,"submitter":"admin-07","temperature":27.2}
{"current-number-of-people":135.0,"device-health":true,"number-of-people-in-line":2.0,"submitter":"admin-01","temperature":26.9}
{"current-number-of-people":137.0,"device-health":true,"number-of-people-in-line":0.0,"submitter":"admin-01","temperature":26.9}
Conclusion
Thus, we have completed the main process of persisting data from EMQX rules engine to HStreamDB.
Once the data collected by EMQX is stored in HStreamDB, real-time processing and analysis of this data can be performed to support upper-level AI, big data, and other applications, further uncovering and utilizing the value of data. As the first cloud-native streaming database designed for streaming data, HStreamDB, in combination with EMQX, can provide a one-stop solution for storing and processing massive IoT data in real-time, streamlining the IoT application data stack and accelerating enterprise IoT application development.