Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
$ kubectl get pods

$ kubectl get pods
NAME                                                 READY   STATUS    RESTARTS   AGE
confluent-kafka-cp-control-center-5cf9477c94-6j2tz   1/1     Running   7          49m
confluent-kafka-cp-kafka-0                           2/2     Running   0          49m
confluent-kafka-cp-kafka-1                           2/2     Running   0          38m
confluent-kafka-cp-kafka-2                           2/2     Running   0          38m
confluent-kafka-cp-kafka-connect-7646bbff9-hhdqq     2/2     Running   3          49m
confluent-kafka-cp-kafka-rest-57588d5cb5-4sfnp       2/2     Running   6          49m
confluent-kafka-cp-ksql-server-5bdd6b999c-djrj2      2/2     Running   5          49m
confluent-kafka-cp-schema-registry-8db5569b8-2qgm9   2/2     Running   6          49m
confluent-kafka-cp-zookeeper-0                       2/2     Running   0          49m
confluent-kafka-cp-zookeeper-1                       2/2     Running   0          38m
confluent-kafka-cp-zookeeper-2                       2/2     Running   0          38m

Connectors

...


...

Kafka Configuration

https://

...

docs.confluent.

...

io/

...

platform/

...

current/installation/configuration/broker-configs.html#


Sample values.yaml

Code Block
cp-zookeeper:
  prometheus:
    jmx:
      enabled: false
  securityContext:
    runAsUser: 1000

cp-kafka:
  nodeport:
    enabled: true
    servicePort: 19092
    firstListenerPort: 31090
  persistence:
    enabled: true
    size: 10Gi
  configurationOverrides:
    "log.retention.bytes": "7516192768"
    "delete.topic.enable": "true"
    "message.max.bytes": "1048588"
    "advertised.listeners": |-
     EXTERNAL://localhost:$((31090 + ${KAFKA_BROKER_ID}))
  prometheus:
    jmx:
      enabled: false
  securityContext:
    runAsUser: 1000

cp-schema-registry:
  prometheus:
    jmx:
      enabled: false

cp-kafka-rest:
  prometheus:
    jmx:
      enabled: false

cp-ksql-server:
  prometheus:
    jmx:
      enabled: false

cp-control-center:
  prometheus:
    jmx:
      enabled: false


Connectors

Functions

These call can all be made from the kafka-connect pod


Login to the kafka-connect pod

Code Block
kubectl exec -it kafka-cp-kafka-connect-<ID> -c cp-kafka-connect-server bash


List Connectors

Code Block
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors

["azure-sink-connector"]


Enable a Connector

Code Block
$ curl -s -X POST -H "Content-Type: application/json" --data '{"name":"azure-sink-connector","config": {"connector.cla
ss": "io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector","topics" : "john-test", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.
converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "workspace.id" : "7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c", "workspace
.key" : "y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==" }}' http://kafka-cp-kafka-connect:8083/connectors


{"name":"azure-sink-connector","config":{"connector.class":"io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector","topics":"john-test","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","workspace.id":"7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c","workspace.key":"y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==","name":"azure-sink-connector"},"tasks":[],"type":"sink"}


Get connector details

Code Block
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector

{
  "name": "azure-sink-connector",
  "config": {
    "connector.class": "io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector",
    "workspace.id": "7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c",
    "topics": "john-test",
    "value.converter.schemas.enable": "false",
    "name": "azure-sink-connector",
    "workspace.key": "y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  },
  "tasks": [
    {
      "connector": "azure-sink-connector",
      "task": 0
    }
  ],
  "type": "sink"
}


Delete a Connector

Code Block
curl -s -X DELETE -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector


Create a Topic

Code Block
kafka-topics --zookeeper kafka-cp-zookeeper-headless:2181 --topic john-test --create --partitions 1 --replication-factor 1 --if-not-exists


Send Message to a topic

Code Block
echo '{"test": 213}'| kafka-console-producer --broker-list kafka-cp-kafka-headless:9092 --topic john-test


 Consume a Message from a Topic

Code Block
kafka-console-consumer --bootstrap-server kafka-cp-kafka-headless:9092 --topic john-test --from-beginning --timeout-ms 2000 --max-messages 1


Debugging


Kafka Connect

Should see the following log if running:

Code Block
[2021-11-16 15:24:32,204] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect)




Kafka Broker Details (Bitnami)

Folders

Kafka Home Folder:  /opt/bitnami/kafka

Code Block
$ ls -l

total 48
-rw-rw-r-- 1 root root 14520 Sep 21 09:58 LICENSE
-rw-rw-r-- 1 root root   953 Sep 21 09:58 NOTICE
drwxrwxr-x 1 root root  4096 Sep 21 10:06 bin
drwxrwxr-x 1 root root  4096 Oct  5 03:34 config
drwxrwxr-x 1 root root  4096 Sep 21 10:06 libs
drwxrwxr-x 1 root root  4096 Sep 21 10:06 licenses
drwxrwxr-x 1 root root  4096 Oct  6 17:14 logs
drwxrwxr-x 1 root root  4096 Sep 21 10:06 site-docs


Commands

Code Block
## Creating new Topics

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.learning.tweets \
            --partitions 1 \
            --replication-factor 1

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.learning.alerts \
            --partitions 1 \
            --replication-factor 1

## Listing Topics

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --list

## Getting details about a Topic

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --describe


## Publishing Messages to Topics

        ./kafka-console-producer.sh \
            --bootstrap-server localhost:29092 \
            --topic kafka.learning.tweets

## Consuming Messages from Topics

        ./kafka-console-consumer.sh \
            --bootstrap-server localhost:29092 \
            --topic kafka.learning.tweets \
            --from-beginning

## Deleting Topics

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --delete \
            --topic kafka.learning.alerts

#Create a Topic with multiple partitions

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.learning.orders \
            --partitions 3 \
            --replication-factor 1


#Check topic partitioning

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --topic kafka.learning.orders \
            --describe

## Publishing Messages to Topics with keys

        ./kafka-console-producer.sh \
            --bootstrap-server localhost:29092 \
            --property "parse.key=true" \
            --property "key.separator=:" \
            --topic kafka.learning.orders

## Consume messages using a consumer group

        ./kafka-console-consumer.sh \
            --bootstrap-server localhost:29092 \
            --topic kafka.learning.orders \
            --group test-consumer-group \
            --property print.key=true \
            --property key.separator=" = " \
            --from-beginning

## Check current status of offsets

        ./kafka-consumer-groups.sh \
            --bootstrap-server localhost:29092 \
            --describe \
            --all-groups

## Creating the Topic

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.usecase.students \
            --partitions 2 \
            --replication-factor 1

## Describe the Topic

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --topic kafka.usecase.students \
            --describe

## Publish to the Topic

        ./kafka-console-producer.sh \
            --bootstrap-server localhost:29092 \
            --property "parse.key=true" \
            --property "key.separator=:" \
            --topic kafka.usecase.students

## Consume Message from the Topic

        ./kafka-console-consumer.sh \
            --bootstrap-server localhost:29092 \
            --topic kafka.usecase.students \
            --group usecase-consumer-group \
            --property print.key=true \
            --property key.separator=" = " \
            --from-beginning


REST Api

Pushing Message to Topic

Publish a Message

Code Block
languagetext
themeEmacs
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data ' { "records": [ { "value": {"log":"value"} }]}' \ 
      "http://localhost:8082/topics/ncyd_test_in"


Code Block
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}% 


Consuming Messages from a Topic

Create a Consumer

Code Block
languagetext
themeEmacs
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "test_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/test_consumer


Code Block
{"instance_id":"test_consumer","base_uri":"http://ckaf-rest-0.ckaf-rest-headless.default.svc.cluster.local:8082/consumers/test_consumer/instances/test_consumer"}%


Subscribe to a Topic

Code Block
languagetext
themeEmacs
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
     --data '{"topics":["ncyd_test_in"]}' \
     http://localhost:8082/consumers/test_consumer/instances/test_consumer/subscription


Consume the Messages

Code Block
languagetext
themeEmacs
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/test_consumer/instances/test_consumer/records


Code Block
[{"topic":"ncyd_test_in","key":null,"value":{"log":"value"},"partition":0,"offset":0}]% 

Functions

These call can all be made from the kafka-connect pod

Login to the kafka-connect pod

Code Block
kubectl exec -it kafka-cp-kafka-connect-<ID> -c cp-kafka-connect-server bash

List Connectors

Code Block
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors

["azure-sink-connector"]

Get connector details

Code Block
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector

{
  "name": "azure-sink-connector",
  "config": {
    "connector.class": "io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector",
    "workspace.id": "7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c",
    "topics": "john-test",
    "value.converter.schemas.enable": "false",
    "name": "azure-sink-connector",
    "workspace.key": "y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  },
  "tasks": [
    {
      "connector": "azure-sink-connector",
      "task": 0
    }
  ],
  "type": "sink"
}

Delete a Connector

Code Block
curl -s -X DELETE -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector

Send Message to a topic

Code Block
echo '{"test": 213}'| kafka-console-producer --broker-list kafka-cp-kafka-headless:9092 --topic john-test


References