Native Consumer/Producer Explanation
- kafka-streams uses
sinek
as underlying kafka client (although it ships its own wrapper) - sinek comes with 2 kafka-clients
kafka-node
JS andnode-librdkafka
C++ - just like kafka-connect kafka-streams was built to support both clients as of version 3.0.0
- in kafka-streams the
KafkaFactory
will take care of instantiating the correct client for you automatically - depending on the configuration that you pass toKafkaStreams
- when the
noptions
field is set the native client will be used, if it is not the plain (default) configuration will be used for the javascript client - the installation of the native client might run during installation of kafka-streams but it is optional, a failed installation wont affect you, when using the JS client, although it might prolong the installation process
- the native client requires additional depdencies, such as
librdkafka
for example - you can find more details on usage and installation here
Why would I care about the native client?
- the kafka-streams API stays the same
- you will be able to produce & consume faster (by a magnitude compared to the JS client)
- you can tweak the consumer and producer setup better full list of config params
- you get access to features like SSL, SASL, and Kerberos
- if tweaked correctly your process will consume less memory
Installation
Debian/Ubuntu
sudo apt install librdkafka-dev libsasl2-dev
rm -rf node_modules
yarn
# node-rdkafka is installed as optional dependency
MacOS
brew install librdkafka
brew install openssl
rm -rf node_modules
yarn
# node-rdkafka is installed as optional dependency
# If you have a ssl problem with an error like: `Invalid value for configuration property "security.protocol"`
# Add to your shell profile:
export CPPFLAGS=-I/usr/local/opt/openssl/include
export LDFLAGS=-L/usr/local/opt/openssl/lib
# and redo the installation.
Windows
- is not supported currently (might be available soon)
Use
- make sure to follow the installation steps first
- the only thing left is to change your configuration object
- looking for SSL, SASL or Kerberos examples? go here
Configuration Example
const config = {
"noptions": {
"metadata.broker.list": "localhost:9092",
"group.id": "kafka-streams-test-native",
"client.id": "kafka-streams-test-name-native",
"event_cb": true,
"compression.codec": "snappy",
"api.version.request": true,
"socket.keepalive.enable": true,
"socket.blocking.max.ms": 100,
"enable.auto.commit": false,
"auto.commit.interval.ms": 100,
"heartbeat.interval.ms": 250,
"retry.backoff.ms": 250,
"fetch.min.bytes": 100,
"fetch.message.max.bytes": 2 * 1024 * 1024,
"queued.min.messages": 100,
"fetch.error.backoff.ms": 100,
"queued.max.messages.kbytes": 50,
"fetch.wait.max.ms": 1000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 10000
},
"tconf": {
"auto.offset.reset": "earliest",
"request.required.acks": 1
},
"batchOptions": {
"batchSize": 5,
"commitEveryNBatch": 1,
"concurrency": 1,
"commitSync": false,
"noBatchCommits": false
}
};