mqtt-kafka proxy
mqtt-kafka proxy
The mqtt-kafka proxy binding for adapting MQTT topic streams to Kafka topic streams. By configuring the Kafka topics that the proxy will use to route mqtt messages and session states an mqtt
server
binding can allow clients to connect and proxy MQTT messages onto Kafka topics.
mqtt_kafka_proxy:
type: mqtt-kafka
kind: proxy
options:
server: mqtt-1.example.com:1883
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
clients:
- place/{identity}/#
routes:
- when:
- publish:
- topic: place/+/device/#
- subscribe:
- topic: place/+/device/#
with:
messages: mqtt-devices
exit: kafka_cache_client
exit: kafka_cache_client
Configuration (* required)
options*
object
The mqtt-kafka
specific options.
options.server
string
The server reference used by the MQTT server in Zilla. This config enables scaling of the MQTT server when running multiple Zilla instances as it uses server redirection.
options:
server: mqtt-1.example.com:1883
options.topics*
object
The kafka
topics Zilla needs when routing MQTT messages
options:
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
topics.sessions*
string
A Kafka topic for storing mqtt session states.
cleanup.policy Required
A compact
cleanup.policy is required.
topics.messages*
string
The default Kafka topic used for routing mqtt messages.
topics.retained*
string
A Kafka topic for storing mqtt retained messages.
cleanup.policy Recommended
A compact
cleanup.policy is recommended.
options.clients
array
ofstring
Pattern defining how to extract client identity from the topic. Using this we can ensure that all messages for the same client identity are produced to Kafka on the same topic partition.
options:
clients:
- place/{identity}/#
options.publish
object
The MQTT client publish specific options.
publish.qosMax*
enum
[at_most_once
,at_least_once
,exactly_once
] | Default:exactly_once
Highest allowed QOS level.
options:
publish:
qosMax: at_most_once
routes
array
ofobject
Conditional mqtt-kafka
specific routes.
routes:
- when:
- publish:
- topic: place/+/device/#
- subscribe:
- topic: place/+/device/#
with:
messages: mqtt-devices
exit: kafka_cache_client
routes[].guarded
object
as map of namedarray
ofstring
List of roles required by each named guard to authorize this route.
routes:
- guarded:
my_guard:
- publish:clients
routes[].when
array
ofobject
List of conditions (any match) to match this route when adapting mqtt
topic streams to kafka
topic streams. Read more: When a route matches
routes:
- when:
- publish:
- topic: place/#
- subscribe:
- topic: place/#
when[].publish
array
ofobject
Array of MQTT topic filters matching topic names for publish.
- publish:
- topic: place/#
- topic: subs/#
publish[].topic
string
MQTT topic filter pattern.
when[].subscribe
array
ofobject
Array of MQTT topic filters matching topic names for subscribe.
- subscribe:
- topic: place/#
- topic: subs/#
subscribe[].topic
string
MQTT topic filter pattern.
routes[].exit
string
Next binding when following this route.
routes[].with*
object
Kafka parameters for matched route when adapting mqtt
topic streams to kafka
topic streams.
with:
messages: mqtt-devices
with.messages
string
Kafka topic to use for the route.
exit
string
Default exit binding when no conditional routes are viable.
exit: echo_server
telemetry
object
Defines the desired telemetry for the binding.
telemetry.metrics
array
Telemetry metrics to track
telemetry:
metrics:
- stream.*