Overview

Testcontainers-Go is a Go package that makes it simple to create and clean up container-based dependencies for automated integration/smoke tests. The clean, easy-to-use API enables developers to programmatically define containers that should be run as part of a test and clean up those resources when the test is done.

Sample Test

component_test.go
package component_tests

import (
    "context"
    "fmt"
    "github.com/sirupsen/logrus"
    "github.com/stretchr/testify/assert"
    "github.com/testcontainers/testcontainers-go"
    "kafka-stream-operator/component-tests/containers"
    "kafka-stream-operator/component-tests/services"
    "os"
    "testing"
    "time"
)

var networkName = "test"

var inTopic = "ncyd_test_in"
var outTopic = "ncyd_test_out"
var kafkaStreamOperatorVersion = "22.0.0-SNAPSHOT"


var compose *testcontainers.LocalDockerCompose
var producer *services.Producer
var consumer *services.Consumer
var kafkaContainer *containers.KafkaContainer
var zookeeperContainer *containers.ZookeeperContainer
var kafkaStreamOperatorContainer *containers.KafkaStreamOperatorContainer
var network testcontainers.Network
var ctx context.Context

func TestMain(m *testing.M) {
	defer teardown()
    setup()
	m.Run()
}

func setup(){
	logrus.Info("Setup")

    //read version
    v := os.Getenv("VERSION")
    if v !="" {
        kafkaStreamOperatorVersion = v
        logrus.Infof("Setting Kafka Stream Operator Image Version to: %v", kafkaStreamOperatorVersion)
    }

   var err error
   ctx = context.Background()

   network, err = testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
	    NetworkRequest: testcontainers.NetworkRequest{
	    Name:           networkName,
	    CheckDuplicate: true,
	    },
	})
	if err != nil {
		logrus.Fatalf("Failed to create network: %v",err)
	}

   zookeeperContainer, err = containers.NewZookeeperContainer(ctx,networkName)
    if err != nil{
       logrus.Fatalf("Failed to start zookeeper: %v", err)
    }

    kafkaContainer, err = containers.NewKafkaContainer(ctx,networkName)
    if err != nil{
       logrus.Fatalf("Failed to start kafka: %v", err)
    }

    kafkaStreamOperatorContainer, err = containers.NewKafkaStreamOperatorContainer(ctx,networkName,kafkaStreamOperatorVersion)
    if err != nil{
       logrus.Fatalf("Failed to start kafka stream operator: %v", err)
    }

    var brokers= []string{ "localhost:9092"}

    logrus.Info("Creating Producer")
	producer, _ = services.NewProducer(brokers,inTopic)

    logrus.Info("Creating Consumer")
    consumer,_ = services.NewConsumer(brokers,inTopic)
    go consumer.Start()
    logrus.Info("Consumer Started")

}

func teardown(){
   logrus.Info("Teardown")
   kafkaStreamOperatorContainer.Terminate()
   kafkaContainer.Terminate()
   zookeeperContainer.Terminate()
   network.Remove(ctx)
   //time.Sleep(1 * time.Second)
}


func TestSuccess(t *testing.T) {
   logrus.Info("testSuccess running")
   expectedMessage := "{\"log\":\"9 - testing log for user test\", \"kubernetes\":{\"host\":\"docker-desktop\"}}"

   //send 10 messages
   logrus.Info("Sending 10 Messages")
   for i:=0;i<10;i++ {
     message := fmt.Sprintf("{\"log\":\"%d - testing log for user test\", \"kubernetes\":{\"host\":\"docker-desktop\"}}",i)
     err := producer.Send("",message)
     if err != nil {
       t.Errorf("Failed to send message: %v", err)
       t.Fail()
     }
   }
  logrus.Info("Messages sent")
   time.Sleep(30 * time.Second )

   logrus.Info("Reading Messages")
   outputMessage, err := consumer.Read()
   if err !=nil {
       t.Errorf("Failed to receive message: %v", err)
       t.Fail()
   }
   logrus.Infof("Received Message %v",outputMessage)

   assert.Equal(t,expectedMessage,outputMessage,"Failed to retrieve expected output")

}
kafka_container.go
package containers

import (
    "context"
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
)

type KafkaContainer struct {
    Container testcontainers.Container
    context context.Context
    dockerId string

}

func NewKafkaContainer(ctx context.Context, networkName string ) (*KafkaContainer, error){

   kafkaContainer := &KafkaContainer{
       Container: nil,
       context: ctx,
   }


   req := testcontainers.ContainerRequest{

       Image:           "bitnami/kafka:3" ,
       Name:            "kafka",
       Hostname:        "kafka",
       Networks:        []string{ networkName},
       WaitingFor:      wait.ForLog("Ready to serve as the new controller with epoch 1"),
       ExposedPorts:    []string{"9092:9092"},
       Env:             map[string]string{
                            "KAFKA_CFG_ZOOKEEPER_CONNECT": "zookeeper:2181",
                            "ALLOW_PLAINTEXT_LISTENER": "yes",
                            "KAFKA_CFG_LISTENERS": "INTERNAL://:9093,EXTERNAL://:9092",
                            "KAFKA_CFG_ADVERTISED_LISTENERS": "INTERNAL://:9093,EXTERNAL://localhost:9092",
                            "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT",
                            "KAFKA_CFG_INTER_BROKER_LISTENER_NAME": "INTERNAL",
                        },
   }
   container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
       ContainerRequest: req,
       Started:          true,
   })
   if err != nil {
       return nil, err
   }

   kafkaContainer.Container =  container

   return kafkaContainer,nil
}

func (k KafkaContainer) Terminate() {
        k.Container.Terminate(k.context)
        //time.Sleep(1 * time.Second)
}


zookeeper_container.go
package containers

import (
    "context"
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
)

type ZookeeperContainer struct {
    container testcontainers.Container
    context context.Context
    dockerId string

}

func NewZookeeperContainer(ctx context.Context, networkName string) (*ZookeeperContainer, error){

    zookeeperContainer := &ZookeeperContainer{
        container: nil,
        context: ctx,
    }


    req := testcontainers.ContainerRequest{

        Image:              "bitnami/zookeeper:3.7",
        Name:               "zookeeper",
        Hostname:           "zookeeper",
        Networks:           []string{ networkName},
        WaitingFor:         wait.ForLog("Started AdminServer on address"),
        ExposedPorts:       []string{"2181/tcp"},
        Env:                map[string]string{
                                "ALLOW_ANONYMOUS_LOGIN": "yes",
                            },
    }
    container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
        ContainerRequest: req,
        Started:          true,
    })
    if err != nil {
        return nil, err
    }

    zookeeperContainer.container =  container

    return zookeeperContainer,nil
}

func (z ZookeeperContainer) Terminate() {
         z.container.Terminate(z.context)
         //time.Sleep(1 * time.Second)
}



Sample Test Case using Docker-Compose


package test

import (
	"github.com/google/uuid"
	"github.com/sirupsen/logrus"
	"github.com/testcontainers/testcontainers-go"
	"github.com/testcontainers/testcontainers-go/wait"
	"kafka-stream-operator/src/internal/gateways/kafka/publisher"
	"log"
	"strings"
	"testing"
	"time"
)

var kafkaPublisher 	*publisher.Publisher
var bootstrapURL = "localhost:9092"
var compose *testcontainers.LocalDockerCompose
var testTopic = "test-in"

func TestMain(m *testing.M) {
	defer teardown()
    setup()
	m.Run()
}

func setup(){
	log.Println("Setup")
    compose = testcontainers.NewLocalDockerCompose([]string{"../../docker-compose.yml"}
		, strings.ToLower(uuid.New().String()))
	compose.WaitForService("kafka_1",wait.ForLog("Ready to serve as the new controller with epoch 1"))
	compose.WithCommand([]string{"up", "-d"}).Invoke()
    logrus.Info("Containers started! ")

	var err error
	kafkaPublisher, err = publisher.NewPublisher(bootstrapURL, testTopic)
	if err !=nil {
		logrus.Errorf("Failed to start publisher: %v", err)
	}

}

func teardown(){
	log.Println("Teardown")
   compose.Down()
   time.Sleep(1 * time.Second)
}


func sendMessage(message string) error{

   err := kafkaPublisher.Publish("", message)
	if err !=nil {
		logrus.Errorf("Failed to send mesaage: %v", message)
       return err
	}
   return nil
}


func TestSuccess(t *testing.T) {
   log.Println("testSuccess running")

   message := `{"log":"testing log for user test", "kubernetes":{"host":"docker-desktop"}}`

   err := sendMessage(message)
   if err != nil {
       t.Errorf("Failed to send message: %v", err)
       t.Fail()
   }
   err = sendMessage(message)
   if err != nil {
       t.Errorf("Failed to send message: %v", err)
       t.Fail()
   }

}


Docker-Compose

version: "2"

services:
  zookeeper:
    image: bitnami/zookeeper:3.7
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    image: docker.io/bitnami/kafka:3
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=INTERNAL://:9093,EXTERNAL://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9093,EXTERNAL://localhost:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
    depends_on:
      - zookeeper

  kowl:
    image: rsmnarts/kowl:latest
    restart: on-failure
    ports:
      - "8080:8080"
    environment:
      - KAFKA_BROKERS=kafka:9093
    depends_on:
      - kafka


References

  • No labels