Kafka: Difference between revisions
Jump to navigation
Jump to search
(14 intermediate revisions by the same user not shown) | |||
Line 2: | Line 2: | ||
akhq Apache Kafka HQ. web-based user interface and management tool for Apache Kafka clusters. | akhq Apache Kafka HQ. web-based user interface and management tool for Apache Kafka clusters. | ||
strimzi Apache Kafka on Kubernetes | strimzi Apache Kafka on Kubernetes | ||
cdc change data capture | |||
=using kafka client= | |||
==List topics== | |||
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list | |||
==Describe topic== | |||
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --describe --topic <topic> | |||
=Read topic= | |||
export TOPIC=<topic> ; kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic ${TOPIC} --property print.key=true --max-messages 10 --from-beginning --timeout-ms 10000 | |||
==Describe all topics== | |||
kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list | while read i ; do echo '*' $i ; kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --describe --topic $i </dev/null ; done | |||
=consumergroups= | |||
List consumer groups | |||
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-consumer-groups.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list | |||
=quick overview= | =quick overview= | ||
kubectl api-resources | grep -i kafka | awk '{print $1}' | while read i ; do echo '*' $i ; kubectl get $i -A ; done | kubectl api-resources | grep -i kafka | awk '{print $1}' | while read i ; do echo '*' $i ; kubectl get $i -A ; done | ||
=which users exit= | =which users exit= | ||
kubectl get kafkausers -n kafka | kubectl get kafkausers -n kafka | ||
Line 14: | Line 30: | ||
=Kafka version= | =Kafka version= | ||
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --version | kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --version | ||
=bootstrap server internally= | |||
kubectl get kafka kafka-cluster -o=jsonpath='{.status.listeners[?(@.type=="tls")].bootstrapServers}{"\n"}' | |||
=bootstrap sever external= | |||
kubectl get kafka kafka-cluster -o=jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}{"\n"}' | |||
=produce to topic= | |||
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic> | |||
Produce to topic with key. | |||
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bash | |||
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic> --property "parse.key=true" --property "key.separator=:" | |||
>emp_info: {"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"} | |||
CTRL + c | |||
kubectl exec -it -n kafka kafka-cluster-kafka-2 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic customer-blacklistmail-dotnet-v1 --property print.key=true --max-messages 10 --from-beginning | |||
=read topic= | |||
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FedoraTopic --from-beginning | |||
=zookeeper= | |||
Who is the leader. | |||
for i in 0 1 2 ; do echo kafka-cluster-zookeeper-$i $(kubectl exec -it -n kafka kafka-cluster-zookeeper-$i -- bash -c 'echo stat | nc 127.0.0.1 12181' | grep Mode:) ; done | |||
Get various info from zookeeper. | |||
kubectl exec -it -n kafka kafka-cluster-zookeeper-0 -- bash -c 'echo envi | nc 127.0.0.1 12181' | |||
The following commands are availablew. | |||
conf Print details about serving configuration. | |||
cons List full connection/session details for all clients connected to this server. Includes information on numbers of packets | |||
received/sent, session id, operation latencies, last operation performed, etc... | |||
crst Reset connection/session statistics for all connections. | |||
dump Lists the outstanding sessions and ephemeral nodes. This only works on the leader. | |||
envi Print details about serving environment | |||
ruok respond with imok if it is running. | |||
srst Reset server statistics. | |||
srvr Lists full details for the server. | |||
stat Lists brief details for the server and connected clients. | |||
wchs Lists brief information on watches for the server. | |||
wchc Lists detailed information on watches for the server, by session | |||
wchp Lists detailed information on watches for the server, by path. | |||
mntr Outputs a list of variables that could be used for monitoring the health of the cluster. | |||
=kafka-exporter= | |||
Get stats from kafka-exporter. | |||
kubectl -it exec deployment/kafka-cluster-kafka-exporter -- curl -s localhost:9404/metrics | |||
=connectors= | |||
These are the actual plugins that Kafka Connect uses to interface with external systems. Connectors can be either sources (pulling data into Kafka) or sinks (pushing data from Kafka). (io.debezium.connector.postgresql.PostgresConnector) | |||
=Kafka Connect= | |||
Is a framework and set of tools for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. |
Latest revision as of 11:40, 6 September 2024
What does it mean:
akhq Apache Kafka HQ. web-based user interface and management tool for Apache Kafka clusters. strimzi Apache Kafka on Kubernetes cdc change data capture
using kafka client
List topics
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list
Describe topic
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --describe --topic <topic>
Read topic
export TOPIC=<topic> ; kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic ${TOPIC} --property print.key=true --max-messages 10 --from-beginning --timeout-ms 10000
Describe all topics
kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list | while read i ; do echo '*' $i ; kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --describe --topic $i </dev/null ; done
consumergroups
List consumer groups
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-consumer-groups.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list
quick overview
kubectl api-resources | grep -i kafka | awk '{print $1}' | while read i ; do echo '*' $i ; kubectl get $i -A ; done
which users exit
kubectl get kafkausers -n kafka
which password does user have
kubectl get secret ifs -o json | jq -r .data.password | base64 -d
list topics
kubectl get kafkatopics -o wide
Kafka version
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --version
bootstrap server internally
kubectl get kafka kafka-cluster -o=jsonpath='{.status.listeners[?(@.type=="tls")].bootstrapServers}{"\n"}'
bootstrap sever external
kubectl get kafka kafka-cluster -o=jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}{"\n"}'
produce to topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic>
Produce to topic with key.
kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bash bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic> --property "parse.key=true" --property "key.separator=:" >emp_info: {"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"} CTRL + c kubectl exec -it -n kafka kafka-cluster-kafka-2 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic customer-blacklistmail-dotnet-v1 --property print.key=true --max-messages 10 --from-beginning
read topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FedoraTopic --from-beginning
zookeeper
Who is the leader.
for i in 0 1 2 ; do echo kafka-cluster-zookeeper-$i $(kubectl exec -it -n kafka kafka-cluster-zookeeper-$i -- bash -c 'echo stat | nc 127.0.0.1 12181' | grep Mode:) ; done
Get various info from zookeeper.
kubectl exec -it -n kafka kafka-cluster-zookeeper-0 -- bash -c 'echo envi | nc 127.0.0.1 12181'
The following commands are availablew.
conf Print details about serving configuration. cons List full connection/session details for all clients connected to this server. Includes information on numbers of packets received/sent, session id, operation latencies, last operation performed, etc... crst Reset connection/session statistics for all connections. dump Lists the outstanding sessions and ephemeral nodes. This only works on the leader. envi Print details about serving environment ruok respond with imok if it is running. srst Reset server statistics. srvr Lists full details for the server. stat Lists brief details for the server and connected clients. wchs Lists brief information on watches for the server. wchc Lists detailed information on watches for the server, by session wchp Lists detailed information on watches for the server, by path. mntr Outputs a list of variables that could be used for monitoring the health of the cluster.
kafka-exporter
Get stats from kafka-exporter.
kubectl -it exec deployment/kafka-cluster-kafka-exporter -- curl -s localhost:9404/metrics
connectors
These are the actual plugins that Kafka Connect uses to interface with external systems. Connectors can be either sources (pulling data into Kafka) or sinks (pushing data from Kafka). (io.debezium.connector.postgresql.PostgresConnector)
Kafka Connect
Is a framework and set of tools for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.