Seamlessly Integrating EMQX Cloud with Confluent Cloud: A Step-by-Step Tutorial
Table of Contents
Introduction
Confluent Cloud is a managed cloud service offered by Confluent, the company behind Apache Kafka. This service enables easy deployment, operation, and scaling of Apache Kafka clusters in the cloud. With Confluent Cloud, you can focus on building event-driven applications without worrying about managing the underlying infrastructure. It boasts features such as automatic scaling, data replication, and seamless integration with other components of the Confluent Platform.
Recently, EMQX Cloud, a fully managed MQTT cloud service, has announced its integration support for Confluent Cloud. Organizations can now harness both EMQX Cloud for MQTT and Confluent Cloud for Kafka, enhancing MQTT communication and Kafka deployments for critical applications. This powerful combination allows for smooth integration between IoT devices that use MQTT and Kafka's stream processing capabilities, thus unlocking the full potential of IoT infrastructure for scalable and future-proof solutions.
In this tutorial, we'll show you step-by-step how to integrate Confluent Cloud with EMQX Cloud in four detailed steps.
Step 1: Set up a Confluent Cloud Cluster
To begin using Confluent Cloud, visit confluent.io/get-started/ and create an account. Upon signing up, you will receive a $400 credit to use within the first 30 days.
1. Create a Confluent Cloud cluster
Once you have completed the sign-up process, proceed to create your first Confluent Cloud cluster. Select the plan that best suits your needs and follow the step-by-step instructions provided by Confluent Cloud. For this example, the default settings should suffice.
2. Generate an API Key
After creating your first Confluent Cloud cluster, navigate to the API Keys section within the cluster overview and select ‘Create Key’. Generate an API key with global access and store the generated key in a safe place. This API key is crucial for authenticating your EMQX Cloud deployment with your Confluent Cloud cluster, enabling seamless data integration between the two.
3. Define a Topic
We then need to create a topic where we want to store all the data produced by our MQTT devices. In the navigation menu, select ‘Topics’ and then create a topic using the default settings. For this tutorial, we named the topic emqx
. It’s not necessary to create a schema for this example.
After creating your topic, navigate to the ‘Messages’ tab to monitor any incoming messages. Ensure that the browser tab remains open so that you can check for new messages at a later time.
Your Confluent Cloud cluster is now set up and ready for data ingestion.
Step 2: Set up an EMQX Cloud Deployment
Register for an EMQX account to access a 14-day free trial of a Dedicated deployment. No credit card is required.
1. Create a Dedicated Deployment
Log in to the Cloud Console and click the ‘New Deployment’ button to begin creating a new deployment. Select the ‘Dedicated’ plan to deploy a ‘Professional’ deployment.
For this tutorial, select ‘Professional’, choose the N.Virginia region with a specification for 1,000 sessions, and then click the 'Deploy' button.
2. Add a Credential for the MQTT Connection
In the Cloud Console, navigate to ‘Authentication & ACL’ from the left menu, and then click Authentication in the submenu. Click the ‘Add’ button on the right and provide a username and password for the MQTT connection. For this example, we will use "emqx" as the username and "public" as the password for the MQTT client connection.
3. Enable NAT Gateway
Before setting up data integration, we need to enable the NAT gateway. By default, the MQTT broker is deployed in a VPC, which cannot send data to other services over the public network.
There are two methods to enable external data transfer:
- Enable the NAT Gateway: this allows the broker to send data through the gateway.
- Set Up VPC Peering: This method is contingent on whether the target cloud service supports VPC peering.
In this tutorial, we will opt for the first method. On the deployment overview page, navigate to the ‘NAT Gateway’ tab located at the bottom and enable the NAT Gateway service by clicking ‘Subscribe Now’.
With these steps, your MQTT broker is now operational and ready for use. Let’s now proceed to Step 3.
Step 3: Set up EMQX Cloud Data Integration with Confluent Cloud
EMQX Cloud provides over 40 native data integrations. Previously, Kafka resources were used to connect data to Confluent. Our new customized integration makes connecting to Confluent more streamlined.
1. Create a Confluent Resource
Go to the Data Integrations page and select ‘Confluent’.
On the Confluent cluster settings page enter the required information in the ‘Endpoints’ section for the ‘Bootstrap Server’. Input the key and secret generated in the ‘Create API Key’ step into the ‘Key’ and ‘Secret’ fields. Click ‘Test’ to verify the connection to the Confluent server.
After passing the test, click the ‘New’ button. A confirmation message will appear indicating that the resource has been successfully created. Under ‘Configured Resources’, you will see the newly created Confluent resource.
2. Create a Rule
Create a new rule by entering the following SQL statement in the SQL input field. This rule will process messages from the temp_hum/emqx
topic, enriching the JSON object with ‘client_id’, ‘topic’, and ‘timestamp’ information.
up_timestamp
: the time when the message is reportedclient_id
: the ID of the client that publishes the messagetemp
: the temperature data in the message payloadHum
: the humidity data in the message payload
SELECT
timestamp as up_timestamp,
clientid as client_id,
payload.temp as temp,
payload.hum as hum
FROM
"temp_hum/emqx"
Test the SQL rule by entering the test payload, topic, and client information, then click ‘SQL Test’. The results displayed below will indicate whether the SQL test was successful.
3. Add an Action
Click ‘Next’ to add an action to the rule. Utilize the Kafka topic created in Step 1 along with the message template provided.
# kafka topic
emqx
# kafka message template
{"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
With these steps, you have successfully integrated Confluent Cloud and EMQX Cloud. The temperature data transmitted to EMQX Cloud should now be consistently relayed to the Confluent Cloud topic.
Let’s proceed to the final step to ensure everything is working as expected.
Step 4: Verify with MQTTX
To publish messages, you can use any MQTT client or SDK. In this tutorial, we’ll utilize MQTTX, a comprehensive MQTT client tool offered by EMQ.
1. Connect MQTTX
In MQTTX, click ‘New Connection’ and complete the connection form:
- Name: Enter a connection name of your choice.
- Host: This is the MQTT broker connection address, available on the EMQX Cloud overview page.
- Port: The MQTT broker connection port, also found on the EMQX Cloud overview page.
- Username/Password: Use the username and password specified in the EMQX Cloud Authentication settings.
2. Publish MQTT Messages to EMQX Cloud
- Set the payload format to 'JSON'.
- Use
temp_hum/emqx
as the topic (the one set in the rule). - JSON body:
{
"temp": 19,
"hum": 106
}
Click the ‘Send’ button on the right. You can change the temperature value and send additional data to EMQX Cloud.
3. Check Rule Status in EMQX Cloud
The data sent to EMQX Cloud should be automatically processed by the rule engine and transferred to Confluent Cloud, which can be verified in the EMQX Cloud Data Integration dashboard.
4. Check the Data in Confluent Topic
Examine the data within the Confluent Console.
Conclusion
The integration of EMQX Cloud with Confluent’s new service unites the capabilities of real-time data and event streaming technology. This seamless connection between the two platforms allows businesses to efficiently collect, forward, and process data, unlocking valuable insights and propelling digital transformation initiatives forward. The partnership between EMQX Cloud and Confluent marks a significant step towards building a comprehensive ecosystem for managing and leveraging IoT data in the cloud.