Ingest IoT Data from MQTT Brokers into OCI-Oracle Streaming Service, OCI- Kafka Connect Harness and Oracle Kubernetes Engine

Franz Kafka
A prominent 19th Century German Novelist and Short Story writer after whom the term Kafkaesque originated
Kafkaesque
Characteristic or reminiscent of the oppressive or nightmarish qualities of Franz Kafka’s fictional world
We are more interested in this Kafka, which can be a very handy platform useful to do a lot of things with data, but can also be kafkaesque

What problems are we solving
Taking away the complexity of managing the underlying infrastructure of a messaging system without needing to refactor code!!
This post is a tutorial to help you set up an unopinionated, horizontally scalable IoT data ingestion system with very little infrastructure management overhead using OCI- Streaming Service, OCI-Kafka connect Harness, Oracle Kubernetes Engine

Benefit 1:
Oracle Streaming Service + Kafka Connect harness offers the possibility for developers to move into a fully managed service without having to refactor their code.
Benefit 2:
Kafka Connect Running on a system like Oracle Kubernetes Engine instead of a VM/JVM allows the Kafka Connect workers to scale horizontally, and on-demand with Oracle Streaming Service
Nah, it’s pretty simple to set up
Some Other Material
Components of this system
1. Oracle Streaming Service
2. Kafka Connect Harness for Oracle Streaming Service
3. Oracle Kubernetes Engine
5. MQTT Source Connectors
6. Oracle Cloud Infrastructure Object Storage
A Visual Representation

Scalability constructs
Oracle Streaming Service - The Broker
------------------------
More Topics => More Streams
Higher Volumes => More PartitionsOracle Kubernetes Engine - Connector + Kafka Connect
---------------------------
Horizontal Pod Autoscaler
Vertical Pod Autoscaler
Node Pool Scaling
Non-Uniform Node Pools
Purpose of Each Component
1. Oracle Streaming Service
Scalable, HA, Managed, Cloud Message Broker2. Kafka Connect Harness
Storage, Config, Offset, SASL Auth Constructs3. Oracle Kubernetes Engine
Scalable, Managed, Container Infra for Connect Workers4. MQTT Source Connectors
Connectors to connect to MQTT Brokers for Ingestion5. Oracle Cloud Object Storage
Long Term Storage of IoT Sensor Data
For the Impatient
Show me the Results Already
Single Partition, Single Topic of Oracle Streaming Service

Here’s how it looks on the Console

How Did I Generate Load
1) Generate a 100,000 Messages
2) Gap between each msg 1 ms
3) Use 16 Threads to Generate the Countpython3 mqtt-gen.py 100000 1 16**************************RESULT****************************Thread7=454.07msg/sec
Thread13=453.64msg/sec
Thread4=452.84msg/sec
Thread11=452.67msg/sec
Thread9=452.61msg/sec
Thread12=452.35msg/sec
Thread2=452.3msg/sec
Thread15=452.22msg/sec
Thread14=452.22msg/sec
Thread1=452.11msg/sec
Thread3=452.11msg/sec
Thread16=452.0msg/sec
Thread5=451.82msg/sec
Thread6=451.8msg/sec
Thread8=451.78msg/sec
Thread10=451.71msg/sec
Message Format
Every field that has the keys will recieve randomized data
Sensor Data
"sensors": {
"Sensor 1": {
"c1": 12,"c2": "test1","c3":"incr.payload"
},
"Sensor 2": {
"c1": 12,"c2": "test2"
},
"Sensor 3": {
"c1": 12,"c2": "test3"
},
"Sensor 4": {
"c1": 12,"c2": "test4"
},
"Sensor 5": {
"c1": 12,"c2": "test5"
}
}
}
Use the HiveMQ Public Broker as a MQTT Broker

Use a Python Generator to Create the Messages
Use the MQTT Source Connector to read from the public broker
How to set up Oracle Streaming Service
Step 1:
-------
Burger Menu --> Analytics --> Streaming

Step 2:
-------
Click on Streams -> Click on Stream PoolsEnter Stream Pool Name
Retention Period - < 24-168 Hours>
Partitions - < Each partition is a function of Throughput>

Keep these Details handy
Kafka Connect Configurations
- Topic OCID
- Offset OCID
- Config OCID
Kafka Auth Settings in Stream Pool Kafka Connection Settings
- Username
- Auth Mechanism
- Security Protocol
User Settings in OCI
Generate an Auth Token
What Next
Step 1: Setup the Kafka Connect Docker Container
Step 2: Download Stream-Reactor MQTT Open Source Connector Binaries
Step 3: Write the Kafka Connect Properties as k8s ConfigMap
Step 4: Write The Kafka Connect as a k8s Deployment YAML
Step 5: Setup the MQTT Source Connector Properties using REST Proxy
Step 6: Start Ingesting Data
Setup the Kafka Connect Docker Container
1 : Download and Extract Kafka Connect MQTT Binaries
We will use Stream Reactor’s open-sourced MQTT Binaries, They have quite a few interesting Source and Sink Connectors
wget https://github.com/lensesio/stream-reactor/releases/download/1.2.3/kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gzmkdir -p kafka-connect-mqtttar -C kafka-connect-mqtt -xvf kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gz
2: Bash into Docker Container and Prepare it
docker run -it confluentinc/cp-kafka-connect /bin/bash
mkdir -p /etc/kafka/plugins/libs
3: Copy Binaries into Docker Container and Commit it
# Open another Shell
docker ps # Get Running Container
cd kafka-connect-mqtt# Copy Connector JAR File
docker cp kafka-connect-mqtt-1.2.3-2.1.0-all.jar <Container-Name>:/etc/kafka/plugins/libs/kafka-connect-mqtt-1.2.3-2.1.0-all.jar# Commit Container Locally after Copy
docker commit <Container-Name> <Your-Docker-Repo-Name>/<New-Image-Name>:<Tag> # Push it to your Repo
docker push <Your-Docker-Repo-Name>/<New-Image-Name>:<Tag>
Write the K8s Deployment & Config Map
This post assumes you have a working Oracle Kubernetes Engine Setup but if you don’t here’s how you can go from Zero to OKE in a matter of minutes

A Few Button clicks and a few minutes is all it takes to run get a fully functional HA, Managed Multi-Master, -------
OKE
-------
1. ConfigMap
2. Deployment, Service & Volume Mount
3. Deploy & Verify
# Create a Directory
mkdir -p kafka-connect-k8s
cd kafka-connect-k8s# Copy Text from the Gist Above
vi k8s-deployment-service.yaml
vi k8s-configmap.yaml # Create a Separate Namespace
kubectl apply -f .
4. Wait until pods are running
NAME READY STATUS RESTARTS AGE
pod/oss-kafka-connect-deployment-7478588cd9-9mrxr 1/1 Running 0 6h9m
pod/oss-kafka-connect-deployment-7478588cd9-dv2bg 1/1 Running 0 6h9m
pod/oss-kafka-connect-deployment-7478588cd9-vvs2s 1/1 Running 0 6h9m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 23h
service/oss-kafka-connect-service LoadBalancer 10.96.214.110 x.x.x.x 80:31436/TCP 20h
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/oss-kafka-connect-deployment 3/3 3 3 6h9m
NAME DESIRED CURRENT READY AGE
replicaset.apps/oss-kafka-connect-deployment-7478588cd9 3 3 3 6h9m
5.the logs of one of the pods show
kubectl logs — follow < POD-ID>---------------------------------
STDIN RESPONSE
---------------------------------[2020-01-01 03:25:52,074] INFO [Worker clientId=connect-1, groupId=sesym2019-mqtt-connect] Starting connectors and tasks using config offset 3062258120705 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1000)
[2020-01-01 03:25:52,075] INFO [Worker clientId=connect-1, groupId=sesym2019-mqtt-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1021)
6. Get the Public IP of the Service
kubectl get services -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 25h <none>
oss-kafka-connect-service LoadBalancer 10.96.214.110 <Public-IP> 80:31436/TCP 22h app=oss-kafka-connect
Time to Configure the MQTT Connector
1.Download the Connect CLI
The connect CLI is a simple way to manage all your kafka connectors that run on Kubernetes. To download
#Make a Directory
mkdir -p connect-cli
cd connect-cli# Download the CLI
wget https://github.com/lensesio/kafka-connect-tools/releases/download/v1.0.6/connect-cli#Prepare Environment
export KAFKA_CONNECT_REST="http://<Load-Balancer-Public-IP>"
2. Create the Connector Config
save gist below as mqtt-source.properties
3. Use the CLI to create a connector
The CLI accepts STDIN Text as property objects to create the connector
./connect-cli create mqtt-source < mqtt-source.properties