Store PLCnext CommunityPLCnext on LinkedInPLCnext on Instagram  PLCnext on YouTube Github PLCnext CommunityStore PLCnext Community

 

 How to create a Blog Entry

Connect PLCnext Control via MQTT to Apache Kafka

Technical Background

Kafka

Apache Kafka is a framework for data ingestion, storage, processing, and redistribution. Nowadays, it is widely deployed at companies all over the world. Kafka's official website offers more information about its idea and how to deploy it. One of its key features is the huge number of already existing connectors to other applications and communication protocols like MQTT.

MQTT

MQTT is a lightweight TCP based messaging protocol, often used for IoT communication due to its robustness and small footprint. Details about the OASIS standard MQTT could be found on its website.

Here you can find a Makers Blog article about how to cross-compile mosquitto for PLCnext, an MQTT implementation from Eclipse. Alternatively, the PLCnext Store offers ready MQTT apps.

Requirements

  • MQTT client on the PLCnext (see the previous section for implementation hints)
  • the controller is connected to a PC/VM 
  • MQTT broker on the PC/VM (e.g., mosquitto)
  • Kafka instance on the PC/VM (see Kafka's quickstart guide)

Setup

The following picture shows an overview of the setup we are going to implement to ingest data from the PLCnext control to Kafka. While it is possible to use Confluent's MQTT Proxy for their version of Kafka (2) we will focus on the more generic solution (1). It comprises an MQTT broker where the client connects to and publishes messages and a connector that subscribes to a topic at the broker, processes the messages and forwards them to Kafka. 

PLCnext to Kafka 2

 

Creating the Connector

In this tutorial our connector bases on the evokly/kafka-connect-mqtt repository from GitHub, licensed under the MIT License (detailed license information). First, we download and extract the repository. Since the latest repository version is of the end of 2016 we update the build.gradle file, by replacing old dependencies with their new versions:

ext { kafkaVersion = '2.6.0' }
...
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.13' 
    compile "org.apache.kafka:connect-api:$kafkaVersion"
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' 
    compile 'org.bouncycastle:bcprov-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpkix-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpg-jdk15on:1.67' 
    compile 'commons-io:commons-io:2.8.0' 
    compile 'org.slf4j:slf4j-api:1.7.30' 
    testCompile 'org.slf4j:slf4j-simple:1.7.30'
}

In this example we will send plain String messages to Kafka. Therefore we have to edit the Java class DumbProcessor.java in the folder /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt, which is the default message processor:

@Override
public SourceRecord[] getRecords(String kafkaTopic) {

    return new SourceRecord[]{new SourceRecord(null, //sourcePartition
                   null,                //sourceOffset
                   kafkaTopic,          //topic
                   null,                //partition
                   null,                //keySchema
                   mTopic,              //key
                   null,                //valueSchema
                   mMessage.toString(), //value
                   new Long(123L))};    //long timestamp
}

Thereafter, we build a Java Archive File (JAR) that contains the dependencies: ./gradlew clean jar. We copy the output JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar that could be found in the folder /kafka-connect-mqtt-master/build/libs to the libs directory of Kafka.

Furthermore, we have to create a configuration file for the connector mqtt.properties in Kafka's config folder. The file has the following content:

name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

kafka.topic=test_in                     # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123

mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

mqtt.server_uris=tcp://172.17.0.1:1883  # address of the MQTT broker
mqtt.topic=test/#                       # MQTT topic where the messages should be collected

#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor

Local Test

Now we can test our connector locally. Go to Kafka's directory and start a ZooKeeper and Broker instance:

# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# start Kafka:
bin/kafka-server-start.sh config/server.properties

# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto 

# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties

# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true

# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123

The message shows up in the console consumer.