...
Code Block |
---|
$ kubectl get pods
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
confluent-kafka-cp-control-center-5cf9477c94-6rbxr6j2tz 01/1 ContainerCreatingRunning 07 2m17s49m
confluent-kafka-cp-kafka-0 02/2 ContainerCreatingRunning 0 2m17s49m
confluent-kafka-cp-kafka-connect-7646bbff9-ztxhj1 0/2 ContainerCreating 0 2m17s
confluent-kafka-cp-kafka-rest-57588d5cb5-pjjc7 02/2 ContainerCreatingRunning 0 2m17s38m
confluent-kafka-cp-ksql-server-5bdd6b999c-9d7sdkafka-2 02/2 ContainerCreatingRunning 0 2m17s38m
confluent-kafka-cp-schemakafka-registryconnect-8db5569b87646bbff9-j848khhdqq 02/2 ContainerCreatingRunning 03 2m17s49m
confluent-kafka-cp-kafka-rest-zookeeper57588d5cb5-04sfnp 2/2 Running 6 0/2 ContainerCreating49m
confluent-kafka-cp-ksql-server-5bdd6b999c-djrj2 2/2 Running 05 2m17s |
Connectors
...
49m
confluent-kafka-cp-schema-registry-8db5569b8-2qgm9 2/2 Running 6 49m
confluent-kafka-cp-zookeeper-0 2/2 Running 0 49m
confluent-kafka-cp-zookeeper-1 2/2 Running 0 38m
confluent-kafka-cp-zookeeper-2 2/2 Running 0 38m |
Kafka Configuration
https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html#
Sample values.yaml
Code Block |
---|
cp-zookeeper:
prometheus:
jmx:
enabled: false
securityContext:
runAsUser: 1000
cp-kafka:
nodeport:
enabled: true
servicePort: 19092
firstListenerPort: 31090
persistence:
enabled: true
size: 10Gi
configurationOverrides:
"log.retention.bytes": "7516192768"
"delete.topic.enable": "true"
"message.max.bytes": "1048588"
"advertised.listeners": |-
EXTERNAL://localhost:$((31090 + ${KAFKA_BROKER_ID}))
prometheus:
jmx:
enabled: false
securityContext:
runAsUser: 1000
cp-schema-registry:
prometheus:
jmx:
enabled: false
cp-kafka-rest:
prometheus:
jmx:
enabled: false
cp-ksql-server:
prometheus:
jmx:
enabled: false
cp-control-center:
prometheus:
jmx:
enabled: false |
Connectors
Functions
These call can all be made from the kafka-connect pod
Login to the kafka-connect pod
Code Block |
---|
kubectl exec -it kafka-cp-kafka-connect-<ID> -c cp-kafka-connect-server bash |
List Connectors
Code Block |
---|
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors
["azure-sink-connector"] |
Enable a Connector
Code Block |
---|
$ curl -s -X POST -H "Content-Type: application/json" --data '{"name":"azure-sink-connector","config": {"connector.cla
ss": "io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector","topics" : "john-test", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.
converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "workspace.id" : "7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c", "workspace
.key" : "y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==" }}' http://kafka-cp-kafka-connect:8083/connectors
{"name":"azure-sink-connector","config":{"connector.class":"io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector","topics":"john-test","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","workspace.id":"7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c","workspace.key":"y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==","name":"azure-sink-connector"},"tasks":[],"type":"sink"} |
Get connector details
Code Block |
---|
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector
{
"name": "azure-sink-connector",
"config": {
"connector.class": "io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector",
"workspace.id": "7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c",
"topics": "john-test",
"value.converter.schemas.enable": "false",
"name": "azure-sink-connector",
"workspace.key": "y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "azure-sink-connector",
"task": 0
}
],
"type": "sink"
} |
Delete a Connector
Code Block |
---|
curl -s -X DELETE -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector |
Create a Topic
Code Block |
---|
kafka-topics --zookeeper kafka-cp-zookeeper-headless:2181 --topic john-test --create --partitions 1 --replication-factor 1 --if-not-exists |
Send Message to a topic
Code Block |
---|
echo '{"test": 213}'| kafka-console-producer --broker-list kafka-cp-kafka-headless:9092 --topic john-test |
Consume a Message from a Topic
Code Block |
---|
kafka-console-consumer --bootstrap-server kafka-cp-kafka-headless:9092 --topic john-test --from-beginning --timeout-ms 2000 --max-messages 1 |
Debugging
Kafka Connect
Should see the following log if running:
Code Block |
---|
[2021-11-16 15:24:32,204] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect) |
Kafka Broker Details (Bitnami)
Folders
Kafka Home Folder: /opt/bitnami/kafka
Code Block |
---|
$ ls -l
total 48
-rw-rw-r-- 1 root root 14520 Sep 21 09:58 LICENSE
-rw-rw-r-- 1 root root 953 Sep 21 09:58 NOTICE
drwxrwxr-x 1 root root 4096 Sep 21 10:06 bin
drwxrwxr-x 1 root root 4096 Oct 5 03:34 config
drwxrwxr-x 1 root root 4096 Sep 21 10:06 libs
drwxrwxr-x 1 root root 4096 Sep 21 10:06 licenses
drwxrwxr-x 1 root root 4096 Oct 6 17:14 logs
drwxrwxr-x 1 root root 4096 Sep 21 10:06 site-docs |
Commands
Code Block |
---|
## Creating new Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.tweets \
--partitions 1 \
--replication-factor 1
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.alerts \
--partitions 1 \
--replication-factor 1
## Listing Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--list
## Getting details about a Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--describe
## Publishing Messages to Topics
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.tweets
## Consuming Messages from Topics
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.tweets \
--from-beginning
## Deleting Topics
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--delete \
--topic kafka.learning.alerts
#Create a Topic with multiple partitions
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.learning.orders \
--partitions 3 \
--replication-factor 1
#Check topic partitioning
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--topic kafka.learning.orders \
--describe
## Publishing Messages to Topics with keys
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=:" \
--topic kafka.learning.orders
## Consume messages using a consumer group
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.learning.orders \
--group test-consumer-group \
--property print.key=true \
--property key.separator=" = " \
--from-beginning
## Check current status of offsets
./kafka-consumer-groups.sh \
--bootstrap-server localhost:29092 \
--describe \
--all-groups
## Creating the Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--create \
--topic kafka.usecase.students \
--partitions 2 \
--replication-factor 1
## Describe the Topic
./kafka-topics.sh \
--zookeeper zookeeper:2181 \
--topic kafka.usecase.students \
--describe
## Publish to the Topic
./kafka-console-producer.sh \
--bootstrap-server localhost:29092 \
--property "parse.key=true" \
--property "key.separator=:" \
--topic kafka.usecase.students
## Consume Message from the Topic
./kafka-console-consumer.sh \
--bootstrap-server localhost:29092 \
--topic kafka.usecase.students \
--group usecase-consumer-group \
--property print.key=true \
--property key.separator=" = " \
--from-beginning |
REST Api
Pushing Message to Topic
Publish a Message
Code Block |
---|
|
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data ' { "records": [ { "value": {"log":"value"} }]}' \
"http://localhost:8082/topics/ncyd_test_in" |
Code Block |
---|
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}% |
Consuming Messages from a Topic
Create a Consumer
Code Block |
---|
|
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "test_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/test_consumer |
Code Block |
---|
{"instance_id":"test_consumer","base_uri":"http://ckaf-rest-0.ckaf-rest-headless.default.svc.cluster.local:8082/consumers/test_consumer/instances/test_consumer"}% |
Subscribe to a Topic
Code Block |
---|
|
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["ncyd_test_in"]}' \
http://localhost:8082/consumers/test_consumer/instances/test_consumer/subscription |
Consume the Messages
Code Block |
---|
|
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/test_consumer/instances/test_consumer/records |
Code Block |
---|
[{"topic":"ncyd_test_in","key":null,"value":{"log":"value"},"partition":0,"offset":0}]% |
...
References