Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consumers (still relevant? - moved to Kafka Connect?) 

Data Replication

  • topics should have a replication factor greater than 1 (usually 2 or 3)
  • this ensures that if a broker goes down, another broker can serve the data
  • ISR - in-synch replica

Image Added

Kafka Connect

Overview

  • Source connectors to get data from common data sources
  • Sink connectors to publish that data in common data sources
  • Make it easy for non-experienced dev to quickly get their data reliably into Kafka
  • Re-usable code!

...

Installation on Kubernetes

Installing Kafka Cluster

We are using the bitnami helm chart:See https://github.com/confluentinc/cp-helm-charts


Code Block
$ helm repo add bitnamiconfluentinc https://chartsconfluentinc.bitnami.com/bitnami
"bitnamigithub.io/cp-helm-charts/

"confluentinc" has been added to your repositories

> helm repo update
Hang tight while we grab the latest from your chart repositories...
...Skip local chart repository
...Successfully got an update from the "confluentinc" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈ Happy Helming!⎈


Code Block
$ helm install confluent-kafka bitnami/kafkaconfluentinc/cp-helm-charts

NAME: confluent-kafka
LAST DEPLOYED: Tue Jul 27 1118:1942:4752 2021
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    kafka.default.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    kafka-0.kafka-headless.default.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.0-debian-10-r55 --namespace default --command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace default -- bash

    PRODUCER:
        kafka-console-producer.sh \
            
NOTES:
## ------------------------------------------------------
## Zookeeper
## ------------------------------------------------------
Connection string for Confluent Kafka:
  confluent-kafka-cp-zookeeper-0.confluent-kafka-cp-zookeeper-headless:2181,confluent-kafka-cp-zookeeper-1.confluent-kafka-cp-zookeeper-headless:2181,...

To connect from a client pod:

1. Deploy a zookeeper client pod with configuration:

    apiVersion: v1
    kind: Pod
    metadata:
      name: zookeeper-client
      namespace: default
    spec:
      containers:
      - name: zookeeper-client
        image: confluentinc/cp-zookeeper:6.1.0
        command:
          - sh
          - -c
          - "exec tail -f /dev/null"

2. Log into the Pod

  kubectl exec -it zookeeper-client -- /bin/bash

3. Use zookeeper-shell to connect in the zookeeper-client Pod:

  zookeeper-shell confluent-kafka-cp-zookeeper:2181

4. Explore with zookeeper commands, for example:

  # Gives the list of active brokers
  ls /brokers/ids

  # Gives the list of topics
  ls /brokers/topics

  # Gives more detailed information of the broker id '0'
  get /brokers/ids/0## ------------------------------------------------------
## Kafka
## ------------------------------------------------------
To connect from a client pod:

1. Deploy a kafka client pod with configuration:

    apiVersion: v1
    kind: Pod
    metadata:
      name: kafka-client
      namespace: default
    spec:
      containers:
      - name: kafka-client
        image: confluentinc/cp-enterprise-kafka:6.1.0
        command:
          - sh
          - -c
          - "exec tail -f /dev/null"

2. Log into the Pod

  kubectl exec -it kafka-client -- /bin/bash

3. Explore with kafka commands:

  # Create the topic
  kafka-topics --zookeeper confluent-kafka-cp-zookeeper-headless:2181 --topic confluent-kafka-topic --create --partitions 1 --replication-factor 1 --if-not-exists

  # Create a message
  MESSAGE="`date -u`"

  # Produce a test message to the topic
  echo "$MESSAGE" | kafka-console-producer --broker-list confluent-kafka-cp-kafka-headless:9092 --topic confluent-kafka-topic

  # Consume a test message from the topic
  kafka-console-consumer --bootstrap-server confluent-kafka-cp-kafka-headless:9092 --topic confluent-kafka-topic --from-beginning --timeout-ms 2000 --max-messages 1 | grep "$MESSAGE"


Code Block
$ kubectl get pods

$ kubectl get pods
NAME                                                 READY   STATUS    RESTARTS   AGE
confluent-kafka-cp-control-center-5cf9477c94-6j2tz   1/1     Running   7          49m
confluent-kafka-cp-kafka-0                           2/2     Running   0          49m
confluent-kafka-cp-kafka-1                           2/2     Running   0          38m
confluent-kafka-cp-kafka-2                           2/2     Running   0          38m
confluent-kafka-cp-kafka-connect-7646bbff9-hhdqq     2/2     Running   3          49m
confluent-kafka-cp-kafka-rest-57588d5cb5-4sfnp       2/2     Running   6          49m
confluent-kafka-cp-ksql-server-5bdd6b999c-djrj2      2/2     Running   5          49m
confluent-kafka-cp-schema-registry-8db5569b8-2qgm9   2/2     Running   6          49m
confluent-kafka-cp-zookeeper-0                       2/2     Running   0          49m
confluent-kafka-cp-zookeeper-1                       2/2     Running   0          38m
confluent-kafka-cp-zookeeper-2                       2/2     Running   0          38m


Kafka Configuration

https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html#


Sample values.yaml

Code Block
cp-zookeeper:
  prometheus:
    jmx:
      enabled: false
  securityContext:
    runAsUser: 1000

cp-kafka:
  nodeport:
    enabled: true
    servicePort: 19092
    firstListenerPort: 31090
  persistence:
    enabled: true
    size: 10Gi
  configurationOverrides:
    "log.retention.bytes": "7516192768"
    "delete.topic.enable": "true"
    "message.max.bytes": "1048588"
    "advertised.listeners": |-
     EXTERNAL://localhost:$((31090 + ${KAFKA_BROKER_ID}))
  prometheus:
    jmx:
      enabled: false
  securityContext:
    runAsUser: 1000

cp-schema-registry:
  prometheus:
    jmx:
      enabled: false

cp-kafka-rest:
  prometheus:
    jmx:
      enabled: false

cp-ksql-server:
  prometheus:
    jmx:
      enabled: false

cp-control-center:
  prometheus:
    jmx:
      enabled: false


Connectors

Functions

These call can all be made from the kafka-connect pod


Login to the kafka-connect pod

Code Block
kubectl exec -it kafka-cp-kafka-connect-<ID> -c cp-kafka-connect-server bash


List Connectors

Code Block
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors

["azure-sink-connector"]


Enable a Connector

Code Block
$ curl -s -X POST -H "Content-Type: application/json" --data '{"name":"azure-sink-connector","config": {"connector.cla
ss": "io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector","topics" : "john-test", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.
converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "workspace.id" : "7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c", "workspace
.key" : "y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==" }}' http://kafka-cp-kafka-connect:8083/connectors


{"name":"azure-sink-connector","config":{"connector.class":"io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector","topics":"john-test","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","workspace.id":"7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c","workspace.key":"y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==","name":"azure-sink-connector"},"tasks":[],"type":"sink"}


Get connector details

Code Block
$ curl -s -X GET -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector

{
  "name": "azure-sink-connector",
  "config": {
    "connector.class": "io.kafka.connect.log.anlaytics.sink.LogAnalyticsSinkConnector",
    "workspace.id": "7e0d2c8e-a46c-4fd9-b274-4b07f0ba555c",
    "topics": "john-test",
    "value.converter.schemas.enable": "false",
    "name": "azure-sink-connector",
    "workspace.key": "y3n6lvRaKhDIaV6UuGn6+nuh/BoRsQI0fy9S13ZdrL/w56LUOuqrRK3ajAAnxjo8W4PAzxId0V09bJWxmtrNLA==",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  },
  "tasks": [
    {
      "connector": "azure-sink-connector",
      "task": 0
    }
  ],
  "type": "sink"
}


Delete a Connector

Code Block
curl -s -X DELETE -H "Content-Type: application/json" http://kafka-cp-kafka-connect:8083/connectors/azure-sink-connector


Create a Topic

Code Block
kafka-topics --zookeeper kafka-cp-zookeeper-headless:2181 --topic john-test --create --partitions 1 --replication-factor 1 --if-not-exists


Send Message to a topic

Code Block
echo '{"test": 213}'| kafka-console-producer --broker-list kafka-cp-kafka-headless:9092 --topic john-test


 Consume a Message from a Topic

Code Block
kafka-console-consumer --bootstrap-server kafka-cp-kafka-headless:9092 --topic john-test --from-beginning --timeout-ms 2000 --max-messages 1


Debugging


Kafka Connect

Should see the following log if running:

Code Block
[2021-11-16 15:24:32,204] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect)




Kafka Broker Details (Bitnami)

Folders

Kafka Home Folder:  /opt/bitnami/kafka

Code Block
$ ls -l

total 48
-rw-rw-r-- 1 root root 14520 Sep 21 09:58 LICENSE
-rw-rw-r-- 1 root root   953 Sep 21 09:58 NOTICE
drwxrwxr-x 1 root root  4096 Sep 21 10:06 bin
drwxrwxr-x 1 root root  4096 Oct  5 03:34 config
drwxrwxr-x 1 root root  4096 Sep 21 10:06 libs
drwxrwxr-x 1 root root  4096 Sep 21 10:06 licenses
drwxrwxr-x 1 root root  4096 Oct  6 17:14 logs
drwxrwxr-x 1 root root  4096 Sep 21 10:06 site-docs


Commands

Code Block
## Creating new Topics

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.learning.tweets \
            --partitions 1 \
            --replication-factor 1

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.learning.alerts \
            --partitions 1 \
            --replication-factor 1

## Listing Topics

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --list

## Getting details about a Topic

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --describe


## Publishing Messages to Topics

        ./kafka-console-producer.sh \
            --bootstrap-server localhost:29092 \
            --topic kafka.learning.tweets

## Consuming Messages from Topics

        ./kafka-console-consumer.sh \
            --broker-list kafka-0.kafka-headless.default.svc.cluster.local:9092-bootstrap-server localhost:29092 \
            --topic kafka.learning.tweets \
            --from-beginning

## Deleting Topics

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --delete \
            --topic kafka.learning.alerts

#Create a Topic with multiple partitions

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.learning.orders \
            --partitions 3 \
            --replication-factor 1


#Check topic test

partitioning

        ./kafka-topics.sh \
            --zookeeper CONSUMER:zookeeper:2181 \
            --topic kafka.learning.orders \
            --describe

## Publishing Messages to Topics with keys

        ./kafka-console-consumerproducer.sh \
            --bootstrap-server localhost:29092 \
            --bootstrap-serverproperty "parse.key=true" \
            --property "key.separator=:" \
            --topic kafka.default.svc.cluster.local:9092learning.orders

## Consume messages using a consumer group

        ./kafka-console-consumer.sh \
            --bootstrap-server localhost:29092 \
            --topic kafka.learning.orders \
            --group test-consumer-group \
            --property print.key=true \
            --property key.separator=" = " \
            --from-beginning

$ kubectl get pods

We can see that pods are deployed:

Code Block
$ kubectl get pods

NAME ## Check current status of offsets

        ./kafka-consumer-groups.sh \
            --bootstrap-server localhost:29092 \
            --describe \
            --all-groups

## Creating the Topic

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --create \
            --topic kafka.usecase.students \
            --partitions 2 \
      READY   STATUS    RESTARTS   AGE
kafka-0--replication-factor 1

## Describe the Topic

        ./kafka-topics.sh \
            --zookeeper zookeeper:2181 \
            --topic kafka.usecase.students \
            --describe

## Publish to the Topic

        1/1./kafka-console-producer.sh \
            --bootstrap-server localhost:29092 \
            --property "parse.key=true" \
            --property "key.separator=:" \
            --topic kafka.usecase.students

## Consume Message from the Topic

        Running./kafka-console-consumer.sh \
    3        --bootstrap-server localhost:29092 3h9m
kafka-zookeeper-0\
            --topic kafka.usecase.students \
            --group usecase-consumer-group \
            --property print.key=true \
            --property key.separator=" = " \
            --from-beginning


REST Api

Pushing Message to Topic

Publish a Message

Code Block
languagetext
themeEmacs
curl  1/1     Running   0          3h9m

Installing Kafka Connect Cluster

-X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data ' { "records": [ { "value": {"log":"value"} }]}' \ 
      "http://localhost:8082/topics/ncyd_test_in"


Code Block
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}% 


Consuming Messages from a Topic

Create a Consumer

Code Block
languagetext
themeEmacs
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "test_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/test_consumer


Code Block
{"instance_id":"test_consumer","base_uri":"http://ckaf-rest-0.ckaf-rest-headless.default.svc.cluster.local:8082/consumers/test_consumer/instances/test_consumer"}%


Subscribe to a Topic

Code Block
languagetext
themeEmacs
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
     --data '{"topics":["ncyd_test_in"]}' \
     http://localhost:8082/consumers/test_consumer/instances/test_consumer/subscription


Consume the Messages

Code Block
languagetext
themeEmacs
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/test_consumer/instances/test_consumer/records


Code Block
[{"topic":"ncyd_test_in","key":null,"value":{"log":"value"},"partition":0,"offset":0}]% 

...


References

Chartbitnamistackkafka/
ReferenceURL
Apache Kafka in 5 minuteshttps://www.youtube.com/watch?v=PzPXRmVHMxI
Nokia Learning - Kafkahttps://nokialearn.csod.com/ui/lms-learning-details/app/course/dc425d19-5642-535f-916c-211768f90a00
Kafka Topics, Partitions and Offsets Explainedhttps://www.youtube.com/watch?v=_q1IjK5jjyU
Kafka Helm Chartshttps://github.com/confluentinc/cp-helm-charts

Confluent for Kubernetes

Kafka Helm Chart Source

https://githubdocs.confluent.comio/bitnamioperator/charts/tree/master/bitnami/kafka/#installing-the-chartcurrent/overview.html#operator-about-intro