Ingest IoT Data from MQTT Brokers into OCI-Oracle Streaming Service, OCI- Kafka Connect Harness and Oracle Kubernetes Engine

Vamsi Ramakrishnan
7 min readJan 1, 2020

--

Franz Kafka

A prominent 19th Century German Novelist and Short Story writer after whom the term Kafkaesque originated

Kafkaesque

Characteristic or reminiscent of the oppressive or nightmarish qualities of Franz Kafka’s fictional world

We are more interested in this Kafka, which can be a very handy platform useful to do a lot of things with data, but can also be kafkaesque

What problems are we solving

Taking away the complexity of managing the underlying infrastructure of a messaging system without needing to refactor code!!

This post is a tutorial to help you set up an unopinionated, horizontally scalable IoT data ingestion system with very little infrastructure management overhead using OCI- Streaming Service, OCI-Kafka connect Harness, Oracle Kubernetes Engine

Some Context

Benefit 1:

Oracle Streaming Service + Kafka Connect harness offers the possibility for developers to move into a fully managed service without having to refactor their code.

Benefit 2:

Kafka Connect Running on a system like Oracle Kubernetes Engine instead of a VM/JVM allows the Kafka Connect workers to scale horizontally, and on-demand with Oracle Streaming Service

Nah, it’s pretty simple to set up

Photo by Adrian Swancar on Unsplash

Some Other Material

Components of this system

1. Oracle Streaming Service
2. Kafka Connect Harness for Oracle Streaming Service
3. Oracle Kubernetes Engine
5. MQTT Source Connectors
6. Oracle Cloud Infrastructure Object Storage

A Visual Representation

Overview Block Diagram

Scalability constructs

Oracle Streaming Service - The Broker
------------------------

More Topics => More Streams
Higher Volumes => More Partitions
Oracle Kubernetes Engine - Connector + Kafka Connect
---------------------------

Horizontal Pod Autoscaler
Vertical Pod Autoscaler
Node Pool Scaling
Non-Uniform Node Pools

Purpose of Each Component

1. Oracle Streaming Service 
Scalable, HA, Managed, Cloud Message Broker
2. Kafka Connect Harness
Storage, Config, Offset, SASL Auth Constructs
3. Oracle Kubernetes Engine
Scalable, Managed, Container Infra for Connect Workers
4. MQTT Source Connectors
Connectors to connect to MQTT Brokers for Ingestion
5. Oracle Cloud Object Storage
Long Term Storage of IoT Sensor Data

For the Impatient

Show me the Results Already

Single Partition, Single Topic of Oracle Streaming Service

The stats

Here’s how it looks on the Console

The Console Fetch Messages

How Did I Generate Load

1) Generate a 100,000 Messages
2) Gap between each msg 1 ms
3) Use 16 Threads to Generate the Count
python3 mqtt-gen.py 100000 1 16**************************RESULT****************************Thread7=454.07msg/sec
Thread13=453.64msg/sec
Thread4=452.84msg/sec
Thread11=452.67msg/sec
Thread9=452.61msg/sec
Thread12=452.35msg/sec
Thread2=452.3msg/sec
Thread15=452.22msg/sec
Thread14=452.22msg/sec
Thread1=452.11msg/sec
Thread3=452.11msg/sec
Thread16=452.0msg/sec
Thread5=451.82msg/sec
Thread6=451.8msg/sec
Thread8=451.78msg/sec
Thread10=451.71msg/sec

Message Format

Every field that has the keys will recieve randomized data

Sensor Data 
"sensors": {
"Sensor 1": {
"c1": 12,"c2": "test1","c3":"incr.payload"
},
"Sensor 2": {
"c1": 12,"c2": "test2"
},
"Sensor 3": {
"c1": 12,"c2": "test3"
},
"Sensor 4": {
"c1": 12,"c2": "test4"
},
"Sensor 5": {
"c1": 12,"c2": "test5"
}
}
}

Use the HiveMQ Public Broker as a MQTT Broker

Use a Python Generator to Create the Messages

Use the MQTT Source Connector to read from the public broker

How to set up Oracle Streaming Service

Step 1:
-------
Burger Menu --> Analytics --> Streaming
Step 2:
-------
Click on Streams -> Click on Stream Pools
Enter Stream Pool Name
Retention Period - < 24-168 Hours>
Partitions - < Each partition is a function of Throughput>

Keep these Details handy

Kafka Connect Configurations

- Topic OCID
- Offset OCID
- Config OCID

Kafka Auth Settings in Stream Pool Kafka Connection Settings

- Username 
- Auth Mechanism
- Security Protocol

User Settings in OCI

Generate an Auth Token

What Next

Step 1: Setup the Kafka Connect Docker Container
Step 2: Download Stream-Reactor MQTT Open Source Connector Binaries
Step 3: Write the Kafka Connect Properties as k8s ConfigMap
Step 4: Write The Kafka Connect as a k8s Deployment YAML
Step 5: Setup the MQTT Source Connector Properties using REST Proxy
Step 6: Start Ingesting Data

Setup the Kafka Connect Docker Container

1 : Download and Extract Kafka Connect MQTT Binaries

We will use Stream Reactor’s open-sourced MQTT Binaries, They have quite a few interesting Source and Sink Connectors

wget https://github.com/lensesio/stream-reactor/releases/download/1.2.3/kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gzmkdir -p kafka-connect-mqtttar -C kafka-connect-mqtt  -xvf kafka-connect-mqtt-1.2.3-2.1.0-all.tar.gz

2: Bash into Docker Container and Prepare it

docker run -it confluentinc/cp-kafka-connect /bin/bash
mkdir -p /etc/kafka/plugins/libs

3: Copy Binaries into Docker Container and Commit it

# Open another Shell 
docker ps
# Get Running Container
cd kafka-connect-mqtt
# Copy Connector JAR File
docker cp kafka-connect-mqtt-1.2.3-2.1.0-all.jar <Container-Name>:/etc/kafka/plugins/libs/kafka-connect-mqtt-1.2.3-2.1.0-all.jar
# Commit Container Locally after Copy
docker commit <Container-Name> <Your-Docker-Repo-Name>/<New-Image-Name>:<Tag>
# Push it to your Repo
docker push <Your-Docker-Repo-Name>/<New-Image-Name>:<Tag>

Write the K8s Deployment & Config Map

This post assumes you have a working Oracle Kubernetes Engine Setup but if you don’t here’s how you can go from Zero to OKE in a matter of minutes

A Few Button clicks and a few minutes is all it takes to run get a fully functional HA, Managed Multi-Master, -------
OKE
-------

1. ConfigMap

https://gist.github.com/vamsiramakrishnan/e1140596ec645aa63b2d5217269f1006

2. Deployment, Service & Volume Mount

https://gist.github.com/vamsiramakrishnan/43736ef39bfc5d432a107e6020ec61fe

3. Deploy & Verify

# Create a Directory 
mkdir -p kafka-connect-k8s
cd kafka-connect-k8s
# Copy Text from the Gist Above
vi k8s-deployment-service.yaml
vi k8s-configmap.yaml
# Create a Separate Namespace
kubectl apply -f .

4. Wait until pods are running

NAME                                                READY   STATUS    RESTARTS   AGE
pod/oss-kafka-connect-deployment-7478588cd9-9mrxr 1/1 Running 0 6h9m
pod/oss-kafka-connect-deployment-7478588cd9-dv2bg 1/1 Running 0 6h9m
pod/oss-kafka-connect-deployment-7478588cd9-vvs2s 1/1 Running 0 6h9m

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 23h
service/oss-kafka-connect-service LoadBalancer 10.96.214.110 x.x.x.x 80:31436/TCP 20h

NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/oss-kafka-connect-deployment 3/3 3 3 6h9m

NAME DESIRED CURRENT READY AGE
replicaset.apps/oss-kafka-connect-deployment-7478588cd9 3 3 3 6h9m

5.the logs of one of the pods show

kubectl logs — follow < POD-ID>---------------------------------
STDIN RESPONSE
---------------------------------
[2020-01-01 03:25:52,074] INFO [Worker clientId=connect-1, groupId=sesym2019-mqtt-connect] Starting connectors and tasks using config offset 3062258120705 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1000)
[2020-01-01 03:25:52,075] INFO [Worker clientId=connect-1, groupId=sesym2019-mqtt-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1021)

6. Get the Public IP of the Service

kubectl get services -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 25h <none>
oss-kafka-connect-service LoadBalancer 10.96.214.110 <Public-IP> 80:31436/TCP 22h app=oss-kafka-connect

Time to Configure the MQTT Connector

1.Download the Connect CLI

The connect CLI is a simple way to manage all your kafka connectors that run on Kubernetes. To download

#Make a Directory
mkdir -p connect-cli
cd connect-cli
# Download the CLI
wget https://github.com/lensesio/kafka-connect-tools/releases/download/v1.0.6/connect-cli
#Prepare Environment
export KAFKA_CONNECT_REST="http://<Load-Balancer-Public-IP>"

2. Create the Connector Config

save gist below as mqtt-source.properties

3. Use the CLI to create a connector

The CLI accepts STDIN Text as property objects to create the connector

./connect-cli create mqtt-source < mqtt-source.properties

4. Check if Connector is working

--

--

Vamsi Ramakrishnan
Vamsi Ramakrishnan

Written by Vamsi Ramakrishnan

I work for Google. All views expressed in this publication are my own. Google Cloud | ex-Oracle | https://goo.gl/aykaPB

No responses yet

Write a response