<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en-GB">
	<id>https://halfface.se/wiki/index.php?action=history&amp;feed=atom&amp;title=Kafka</id>
	<title>Kafka - Revision history</title>
	<link rel="self" type="application/atom+xml" href="https://halfface.se/wiki/index.php?action=history&amp;feed=atom&amp;title=Kafka"/>
	<link rel="alternate" type="text/html" href="https://halfface.se/wiki/index.php?title=Kafka&amp;action=history"/>
	<updated>2026-04-19T07:28:22Z</updated>
	<subtitle>Revision history for this page on the wiki</subtitle>
	<generator>MediaWiki 1.43.1</generator>
	<entry>
		<id>https://halfface.se/wiki/index.php?title=Kafka&amp;diff=16239&amp;oldid=prev</id>
		<title>Ekaanbj: /* Kafka Connect */</title>
		<link rel="alternate" type="text/html" href="https://halfface.se/wiki/index.php?title=Kafka&amp;diff=16239&amp;oldid=prev"/>
		<updated>2025-08-26T12:43:00Z</updated>

		<summary type="html">&lt;p&gt;&lt;span class=&quot;autocomment&quot;&gt;Kafka Connect&lt;/span&gt;&lt;/p&gt;
&lt;p&gt;&lt;b&gt;New page&lt;/b&gt;&lt;/p&gt;&lt;div&gt;=What does it mean:=&lt;br /&gt;
 akhq             Apache Kafka HQ. web-based user interface and management tool for Apache Kafka clusters.&lt;br /&gt;
 broker           managing messages. Receive and deliver.&lt;br /&gt;
 strimzi          Apache Kafka on Kubernetes&lt;br /&gt;
 cdc              change data capture&lt;br /&gt;
&lt;br /&gt;
=using kafka client=&lt;br /&gt;
==List topics==&lt;br /&gt;
 kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list&lt;br /&gt;
==Describe topic==&lt;br /&gt;
 kubectl exec -it -n kafka $(oc get kafka --no-headers -o custom-columns=:.metadata.name)-kafka-0 -- bin/kafka-topics.sh --bootstrap-server $(oc get kafka -o json | jq -r &amp;#039;.items[0].status.listeners[].bootstrapServers&amp;#039; | grep 9092) --describe --topic &amp;lt;topic&amp;gt;&lt;br /&gt;
 kubectl exec -it -n kafka $(oc get kafka --no-headers -o custom-columns=:.metadata.name)-kafka-0 -- bin/kafka-configs.sh --bootstrap-server $(oc get kafka -o json | jq -r &amp;#039;.items[0].status.listeners[].bootstrapServers&amp;#039; | grep 9092)  --describe --entity-type topics --entity-name &amp;lt;topic&amp;gt;&lt;br /&gt;
&lt;br /&gt;
=Read topic=&lt;br /&gt;
 export TOPIC=&amp;lt;topic&amp;gt; ; kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic ${TOPIC} --property print.key=true --max-messages 10 --from-beginning --timeout-ms 10000&lt;br /&gt;
==Describe all topics==&lt;br /&gt;
 kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list | while read i ; do echo &amp;#039;*&amp;#039; $i ; kubectl exec -i -n kafka kafka-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --describe --topic $i &amp;lt;/dev/null ; done&lt;br /&gt;
&lt;br /&gt;
=consumergroups=&lt;br /&gt;
List consumer groups&lt;br /&gt;
 kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bin/kafka-consumer-groups.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list&lt;br /&gt;
&lt;br /&gt;
=quick overview=&lt;br /&gt;
 kubectl api-resources | grep -i kafka | awk &amp;#039;{print $1}&amp;#039; | while read i ; do echo &amp;#039;*&amp;#039; $i ; kubectl get $i -A ; done&lt;br /&gt;
&lt;br /&gt;
=which users exit=&lt;br /&gt;
 kubectl get kafkausers -n kafka&lt;br /&gt;
&lt;br /&gt;
=which password does user have=&lt;br /&gt;
 kubectl get secret ifs -o json | jq -r .data.password | base64 -d&lt;br /&gt;
=list topics=&lt;br /&gt;
 kubectl get kafkatopics -o wide&lt;br /&gt;
=Kafka version=&lt;br /&gt;
 kubectl exec -it -n kafka $(oc get kafka --no-headers -o custom-columns=:.metadata.name)-kafka-0 -- bin/kafka-topics.sh --bootstrap-server $(oc get kafka -o json | jq -r &amp;#039;.items[0].status.listeners[].bootstrapServers&amp;#039; | grep 9092) --version&lt;br /&gt;
&lt;br /&gt;
=bootstrap server internally=&lt;br /&gt;
 kubectl get kafka kafka-cluster -o=jsonpath=&amp;#039;{.status.listeners[?(@.type==&amp;quot;tls&amp;quot;)].bootstrapServers}{&amp;quot;\n&amp;quot;}&amp;#039;&lt;br /&gt;
=bootstrap sever external=&lt;br /&gt;
 kubectl get kafka kafka-cluster -o=jsonpath=&amp;#039;{.status.listeners[?(@.type==&amp;quot;external&amp;quot;)].bootstrapServers}{&amp;quot;\n&amp;quot;}&amp;#039;&lt;br /&gt;
=produce to topic=&lt;br /&gt;
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic &amp;lt;topic&amp;gt;&lt;br /&gt;
Produce to topic with key.&lt;br /&gt;
 kubectl exec -it -n kafka kafka-cluster-kafka-0 -- bash&lt;br /&gt;
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic &amp;lt;topic&amp;gt; --property &amp;quot;parse.key=true&amp;quot; --property &amp;quot;key.separator=:&amp;quot;&lt;br /&gt;
 &amp;gt;emp_info: {&amp;quot;emp_id&amp;quot;:100,&amp;quot;first_name&amp;quot;:&amp;quot;Keshav&amp;quot;,&amp;quot;last_name&amp;quot;:&amp;quot;Lodhi&amp;quot;,&amp;quot;designation&amp;quot;:&amp;quot;DataEngineer&amp;quot;}&lt;br /&gt;
 CTRL + c&lt;br /&gt;
 kubectl exec -it -n kafka kafka-cluster-kafka-2 -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic customer-blacklistmail-dotnet-v1 --property print.key=true --max-messages 10 --from-beginning&lt;br /&gt;
&lt;br /&gt;
=read topic=&lt;br /&gt;
 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FedoraTopic --from-beginning&lt;br /&gt;
=zookeeper=&lt;br /&gt;
Who is the leader.&lt;br /&gt;
 for i in 0 1 2  ; do echo kafka-cluster-zookeeper-$i $(kubectl exec -it -n kafka kafka-cluster-zookeeper-$i -- bash -c &amp;#039;echo stat | nc 127.0.0.1 12181&amp;#039; | grep Mode:) ; done&lt;br /&gt;
Get various info from zookeeper.&lt;br /&gt;
 kubectl exec -it -n kafka kafka-cluster-zookeeper-0 -- bash -c &amp;#039;echo envi | nc 127.0.0.1 12181&amp;#039;&lt;br /&gt;
Query all zookeepers&lt;br /&gt;
 for i in 0 1 2 ; do echo &amp;#039;*&amp;#039; $i ; kubectl exec -it $(oc get kafka --no-headers -o custom-columns=:.metadata.name)-zookeeper-$i -n kafka -- bash -c &amp;#039;echo conf | nc 127.0.0.1 12181&amp;#039; ; echo ; done&lt;br /&gt;
The following commands are availablew.&lt;br /&gt;
 conf Print details about serving configuration.&lt;br /&gt;
 cons List full connection/session details for all clients connected to this server. Includes information on numbers of packets &lt;br /&gt;
 received/sent, session id, operation latencies, last operation performed, etc...&lt;br /&gt;
 crst Reset connection/session statistics for all connections.&lt;br /&gt;
 dump Lists the outstanding sessions and ephemeral nodes. This only works on the leader.&lt;br /&gt;
 envi Print details about serving environment&lt;br /&gt;
 ruok respond with imok if it is running.&lt;br /&gt;
 srst Reset server statistics.&lt;br /&gt;
 srvr Lists full details for the server.&lt;br /&gt;
 stat Lists brief details for the server and connected clients.&lt;br /&gt;
 wchs Lists brief information on watches for the server.&lt;br /&gt;
 wchc Lists detailed information on watches for the server, by session&lt;br /&gt;
 wchp Lists detailed information on watches for the server, by path.&lt;br /&gt;
 mntr Outputs a list of variables that could be used for monitoring the health of the cluster.&lt;br /&gt;
Zookeeper shell&lt;br /&gt;
 kubectl exec -it $(oc get kafka --no-headers -o custom-columns=:.metadata.name)-zookeeper-0 -n kafka -- bin/zookeeper-shell.sh 127.0.0.1:12181&lt;br /&gt;
&lt;br /&gt;
=kafka-exporter=&lt;br /&gt;
Get stats from kafka-exporter.&lt;br /&gt;
 kubectl -it exec deployment/kafka-cluster-kafka-exporter -- curl -s localhost:9404/metrics&lt;br /&gt;
=connectors=&lt;br /&gt;
These are the actual plugins that Kafka Connect uses to interface with external systems. Connectors can be either sources (pulling data into Kafka) or sinks (pushing data from Kafka). (io.debezium.connector.postgresql.PostgresConnector)&lt;br /&gt;
&lt;br /&gt;
=Kafka Connect=&lt;br /&gt;
Is a framework and set of tools for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.&lt;br /&gt;
==&lt;br /&gt;
=Test kafka client=&lt;br /&gt;
&amp;lt;pre&amp;gt;&lt;br /&gt;
# Get username password&lt;br /&gt;
oc get secrets -l strimzi.io/kind=KafkaUser -o custom-columns=NAME:.metadata.name,PASSWORD:.data.password --no-headers | head -1 | while read USERNAME PASSWORD ; do echo export USERNAME=$USERNAME PASSWORD=$(base64 -d &amp;lt;&amp;lt;&amp;lt; &amp;quot;${PASSWORD}&amp;quot;) ; done&lt;br /&gt;
# Start bash in kafka pod.&lt;br /&gt;
kafka_bash ()&lt;br /&gt;
{&lt;br /&gt;
    BOOTSTRAP_SERVERS=$(kubectl get kafka -o=jsonpath=&amp;#039;{.items[0].status.listeners[?(@.name==&amp;quot;plain&amp;quot;)].bootstrapServers}{&amp;quot;\n&amp;quot;}&amp;#039;);&lt;br /&gt;
    if [ ! &amp;quot;${BOOTSTRAP_SERVERS}&amp;quot; ]; then&lt;br /&gt;
        echo kafka not available.;&lt;br /&gt;
        return 1;&lt;br /&gt;
    fi;&lt;br /&gt;
    oc exec -n kafka $(oc get pods -l strimzi.io/component-type=kafka --no-headers -o custom-columns=:.metadata.name --field-selector &amp;quot;status.phase=Running&amp;quot; | head -1) -it -- bash -c &amp;quot;export BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}; export PATH=\$PATH:/opt/kafka/bin ; exec bash&amp;quot;&lt;br /&gt;
}&lt;br /&gt;
kafka_bash&lt;br /&gt;
# In pod paste reply from command above. export USERNAME=...&lt;br /&gt;
# In pod. Get p12 password.&lt;br /&gt;
export CERTS_STORE_PASSWORD=$(ls /proc/[0-9]*/environ | while read i ; do cat $i 2&amp;gt;/dev/null | tr &amp;#039;\000&amp;#039; &amp;#039;\n&amp;#039; ; done | sed -n &amp;#039;s/^CERTS_STORE_PASSWORD=//p&amp;#039; | uniq)&lt;br /&gt;
# Get bootstrap url&lt;br /&gt;
export BOOTSTRAP_SERVERS_TLS=$(openssl s_client -connect localhost:9094 &amp;lt; /dev/null 2&amp;gt; /dev/null | openssl x509 -inform pem -text 2&amp;gt;&amp;amp;1 | grep -A1 &amp;quot; Subject Alternative Name: &amp;quot; | tail -1 | sed &amp;#039;s/ //g;s/:/\n/g;s/DNS//g;s/,//g&amp;#039;| awk &amp;#039;{ print length, $0 }&amp;#039; | sort -n | cut -d&amp;quot; &amp;quot; -f2- | grep -v ^$ | while read i ; do ANSWER=$(nc -zv -w1 $i 9094 2&amp;gt;&amp;amp;1 | cat -v | xargs) ; if [[ &amp;quot;${ANSWER}&amp;quot; == *&amp;quot;Connected&amp;quot;* ]]; then echo $i:9094 ; break ; fi ; done)&lt;br /&gt;
# in pod generate config&lt;br /&gt;
cat &amp;lt;&amp;lt; EOF &amp;gt; /tmp/kafka_login.properties&lt;br /&gt;
security.protocol=SASL_SSL&lt;br /&gt;
sasl.mechanism=SCRAM-SHA-512&lt;br /&gt;
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=&amp;quot;${USERNAME}&amp;quot; password=&amp;quot;${PASSWORD}&amp;quot;;&lt;br /&gt;
ssl.truststore.location=/tmp/kafka/cluster.truststore.p12&lt;br /&gt;
ssl.truststore.password=${CERTS_STORE_PASSWORD}&lt;br /&gt;
ssl.truststore.type=PKCS12&lt;br /&gt;
EOF&lt;br /&gt;
# In pod. Test connectivity.&lt;br /&gt;
bin/kafka-topics.sh --list --bootstrap-server ${BOOTSTRAP_SERVERS_TLS} --command-config /tmp/kafka_login.properties&lt;br /&gt;
...&lt;br /&gt;
# test from other computer&lt;br /&gt;
# Get external Loadbalancer&lt;br /&gt;
echo export BOOTSTRAP_SERVERS_TLS=$(oc get service kafka-cluster-kafka-external-bootstrap -o jsonpath=&amp;#039;{.status.loadBalancer.ingress[0].hostname}&amp;#039;):9094&lt;br /&gt;
# Copy connection file and certificates.&lt;br /&gt;
oc cp $(oc get pods -l strimzi.io/component-type=kafka --no-headers -o custom-columns=:.metadata.name --field-selector &amp;quot;status.phase=Running&amp;quot; | head -1):/tmp/kafka_login.properties kafka_login.properties&lt;br /&gt;
oc cp $(oc get pods -l strimzi.io/component-type=kafka --no-headers -o custom-columns=:.metadata.name --field-selector &amp;quot;status.phase=Running&amp;quot; | head -1):/tmp/kafka/cluster.truststore.p12 cluster.truststore.p12&lt;br /&gt;
scp &amp;lt;bastion&amp;gt;:/tmp&lt;br /&gt;
# Login to bastion&lt;br /&gt;
# Download client.&lt;br /&gt;
(cd /tmp/ ; curl https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz -O)&lt;br /&gt;
# Update config to where files are.&lt;br /&gt;
sed -i &amp;#039;s%ssl.truststore.location=/tmp/kafka/cluster.truststore.p12%ssl.truststore.location=/tmp/cluster.truststore.p12%g&amp;#039; /tmp/kafka_login.properties&lt;br /&gt;
# Test connection. You should get all kafka queues&lt;br /&gt;
kafka*/bin/kafka-topics.sh --list --bootstrap-server ${BOOTSTRAP_SERVERS_TLS} --command-config /tmp/kafka_login.properties&lt;br /&gt;
&amp;lt;/pre&amp;gt;&lt;/div&gt;</summary>
		<author><name>Ekaanbj</name></author>
	</entry>
</feed>