Kafka
What does it mean:
akhq Apache Kafka HQ. web-based user interface and management tool for Apache Kafka clusters. strimzi Apache Kafka on Kubernetes
using kafka client
List topics
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list
Describe topic
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --describe --topic <topic>
Read topic
export TOPIC=<topic> ; kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic ${TOPIC} --property print.key=true --max-messages 10 --from-beginning --timeout-ms 10000
Describe all topics
kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list | while read i ; do echo '*' $i ; kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --describe --topic $i </dev/null ; done
quick overview
kubectl api-resources | grep -i kafka | awk '{print $1}' | while read i ; do echo '*' $i ; kubectl get $i -A ; done
which users exit
kubectl get kafkausers -n kafka
which password does user have
kubectl get secret ifs -o json | jq -r .data.password | base64 -d
list topics
kubectl get kafkatopics -o wide
Kafka version
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --version
bootstrap server internally
kubectl get kafka kafka-cluster -o=jsonpath='{.status.listeners[?(@.type=="tls")].bootstrapServers}{"\n"}'
bootstrap sever external
kubectl get kafka kafka-cluster -o=jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}{"\n"}'
produce to topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic>
Produce to topic with key.
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bash bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic> --property "parse.key=true" --property "key.separator=:" >emp_info: {"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"} CTRL + c kubectl exec -it -n kafka kafka-cluster-kafka-2 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic customer-blacklistmail-dotnet-v1 --property print.key=true --max-messages 10 --from-beginning
read topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FedoraTopic --from-beginning
zookeeper
Who is the leader.
for i in 0 1 2 ; do echo kafka-cluster-zookeeper-$i $(kubectl exec -it -n kafka kafka-cluster-zookeeper-$i -- bash -c 'echo stat | nc 127.0.0.1 12181' | grep Mode:) ; done
Get various info from zookeeper.
kubectl exec -it -n kafka kafka-cluster-zookeeper-0 -- bash -c 'echo envi | nc 127.0.0.1 12181'
The following commands are availablew.
conf Print details about serving configuration. cons List full connection/session details for all clients connected to this server. Includes information on numbers of packets received/sent, session id, operation latencies, last operation performed, etc... crst Reset connection/session statistics for all connections. dump Lists the outstanding sessions and ephemeral nodes. This only works on the leader. envi Print details about serving environment ruok respond with imok if it is running. srst Reset server statistics. srvr Lists full details for the server. stat Lists brief details for the server and connected clients. wchs Lists brief information on watches for the server. wchc Lists detailed information on watches for the server, by session wchp Lists detailed information on watches for the server, by path. mntr Outputs a list of variables that could be used for monitoring the health of the cluster.