Ingest IoT Data from MQTT Brokers into OCI-Oracle Streaming Service, OCI- Kafka Connect Harness and Oracle Kubernetes Engine

A prominent 19th Century German Novelist and Short Story writer after whom the term Kafkaesque originated

Characteristic or reminiscent of the oppressive or nightmarish qualities of Franz Kafka’s fictional world

We are more interested in this Kafka, which can be a very handy platform useful to do a lot of things with data, but can also be kafkaesque

Taking away the complexity of managing the underlying infrastructure of a messaging system without needing to refactor code!!

This post is a tutorial to help you set up an unopinionated, horizontally scalable IoT data ingestion system with very little infrastructure management overhead using OCI- Streaming Service, OCI-Kafka connect Harness, Oracle Kubernetes Engine

Some Context

Oracle Streaming Service + Kafka Connect harness offers the possibility for developers to move into a fully managed service without having to refactor their code.

Benefit 2:

Kafka Connect Running on a system like Oracle Kubernetes Engine instead of a VM/JVM allows the Kafka Connect workers to scale horizontally, and on-demand with Oracle Streaming Service

Nah, it’s pretty simple to set up

Photo by Adrian Swancar on Unsplash

Components of this system

1. Oracle Streaming Service
2. Kafka Connect Harness for Oracle Streaming Service
3. Oracle Kubernetes Engine
5. MQTT Source Connectors
6. Oracle Cloud Infrastructure Object Storage
Overview Block Diagram

Scalability constructs

Oracle Streaming Service - The Broker
------------------------

More Topics => More Streams
Higher Volumes => More Partitions
Oracle Kubernetes Engine - Connector + Kafka Connect
---------------------------

Horizontal Pod Autoscaler
Vertical Pod Autoscaler
Node Pool Scaling
Non-Uniform Node Pools

Purpose of Each Component

1. Oracle Streaming Service 
Scalable, HA, Managed, Cloud Message Broker
2. Kafka Connect Harness
Storage, Config, Offset, SASL Auth Constructs
3. Oracle Kubernetes Engine
Scalable, Managed, Container Infra for Connect Workers
4. MQTT Source Connectors
Connectors to connect to MQTT Brokers for Ingestion
5. Oracle Cloud Object Storage
Long Term Storage of IoT Sensor Data

For the Impatient

Single Partition, Single Topic of Oracle Streaming Service

The stats
The Console Fetch Messages
1) Generate a 100,000 Messages
2) Gap between each msg 1 ms
3) Use 16 Threads to Generate the Count
python3 mqtt-gen.py 100000 1 16**************************RESULT****************************Thread7=454.07msg/sec
Thread13=453.64msg/sec
Thread4=452.84msg/sec
Thread11=452.67msg/sec
Thread9=452.61msg/sec
Thread12=452.35msg/sec
Thread2=452.3msg/sec
Thread15=452.22msg/sec
Thread14=452.22msg/sec
Thread1=452.11msg/sec
Thread3=452.11msg/sec
Thread16=452.0msg/sec
Thread5=451.82msg/sec
Thread6=451.8msg/sec
Thread8=451.78msg/sec
Thread10=451.71msg/sec

Every field that has the keys will recieve randomized data

Sensor Data 
"sensors": {
"Sensor 1": {
"c1": 12,"c2": "test1","c3":"incr.payload"
},
"Sensor 2": {
"c1": 12,"c2": "test2"
},
"Sensor 3": {
"c1": 12,"c2": "test3"
},
"Sensor 4": {
"c1": 12,"c2": "test4"
},
"Sensor 5": {
"c1": 12,"c2": "test5"
}
}
}
Step 1:
-------
Burger Menu --> Analytics --> Streaming
Step 2:
-------
Click on Streams -> Click on Stream Pools
Enter Stream Pool Name
Retention Period - < 24-168 Hours>
Partitions - < Each partition is a function of Throughput>

Kafka Connect Configurations

- Topic OCID
- Offset OCID
- Config OCID

Kafka Auth Settings in Stream Pool Kafka Connection Settings

- Username 
- Auth Mechanism
- Security Protocol

User Settings in OCI

Generate an Auth Token

What Next

Step 1: Setup the Kafka Connect Docker Container
Step 2: Download Stream-Reactor MQTT Open Source Connector Binaries
Step 3: Write the Kafka Connect Properties as k8s ConfigMap
Step 4: Write The Kafka Connect as a k8s Deployment YAML
Step 5: Setup the MQTT Source Connector Properties using REST Proxy
Step 6: Start Ingesting Data

Setup the Kafka Connect Docker Container

We will use Stream Reactor’s open-sourced MQTT Binaries, They have quite a few interesting Source and Sink Connectors

wget https://github.com/lensesio/stream-reactor/releases/download/1.2.3/kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gzmkdir -p kafka-connect-mqtttar -C kafka-connect-mqtt  -xvf kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gz
docker run -it confluentinc/cp-kafka-connect /bin/bash
mkdir -p /etc/kafka/plugins/libs
# Open another Shell 
docker ps
# Get Running Container
cd kafka-connect-mqtt
# Copy Connector JAR File
docker cp kafka-connect-mqtt-1.2.3-2.1.0-all.jar <Container-Name>:/etc/kafka/plugins/libs/kafka-connect-mqtt-1.2.3-2.1.0-all.jar
# Commit Container Locally after Copy
docker commit <Container-Name> <Your-Docker-Repo-Name>/<New-Image-Name>:<Tag>
# Push it to your Repo
docker push <Your-Docker-Repo-Name>/<New-Image-Name>:<Tag>

Write the K8s Deployment & Config Map

This post assumes you have a working Oracle Kubernetes Engine Setup but if you don’t here’s how you can go from Zero to OKE in a matter of minutes

A Few Button clicks and a few minutes is all it takes to run get a fully functional HA, Managed Multi-Master, -------
OKE
-------
https://gist.github.com/vamsiramakrishnan/e1140596ec645aa63b2d5217269f1006
https://gist.github.com/vamsiramakrishnan/43736ef39bfc5d432a107e6020ec61fe
# Create a Directory 
mkdir -p kafka-connect-k8s
cd kafka-connect-k8s
# Copy Text from the Gist Above
vi k8s-deployment-service.yaml
vi k8s-configmap.yaml
# Create a Separate Namespace
kubectl apply -f .
NAME                                                READY   STATUS    RESTARTS   AGE
pod/oss-kafka-connect-deployment-7478588cd9-9mrxr 1/1 Running 0 6h9m
pod/oss-kafka-connect-deployment-7478588cd9-dv2bg 1/1 Running 0 6h9m
pod/oss-kafka-connect-deployment-7478588cd9-vvs2s 1/1 Running 0 6h9m

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 23h
service/oss-kafka-connect-service LoadBalancer 10.96.214.110 x.x.x.x 80:31436/TCP 20h

NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/oss-kafka-connect-deployment 3/3 3 3 6h9m

NAME DESIRED CURRENT READY AGE
replicaset.apps/oss-kafka-connect-deployment-7478588cd9 3 3 3 6h9m
kubectl logs — follow < POD-ID>---------------------------------
STDIN RESPONSE
---------------------------------
[2020-01-01 03:25:52,074] INFO [Worker clientId=connect-1, groupId=sesym2019-mqtt-connect] Starting connectors and tasks using config offset 3062258120705 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1000)
[2020-01-01 03:25:52,075] INFO [Worker clientId=connect-1, groupId=sesym2019-mqtt-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1021)
kubectl get services -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 25h <none>
oss-kafka-connect-service LoadBalancer 10.96.214.110 <Public-IP> 80:31436/TCP 22h app=oss-kafka-connect

Time to Configure the MQTT Connector

The connect CLI is a simple way to manage all your kafka connectors that run on Kubernetes. To download

#Make a Directory
mkdir -p connect-cli
cd connect-cli
# Download the CLI
wget https://github.com/lensesio/kafka-connect-tools/releases/download/v1.0.6/connect-cli
#Prepare Environment
export KAFKA_CONNECT_REST="http://<Load-Balancer-Public-IP>"
save gist below as mqtt-source.properties

The CLI accepts STDIN Text as property objects to create the connector

./connect-cli create mqtt-source < mqtt-source.properties

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store