...
Testcontainers-Go is a Go package that makes it simple to create and clean up container-based dependencies for automated integration/smoke tests. The clean, easy-to-use API enables developers to programmatically define containers that should be run as part of a test and clean up those resources when the test is done.
Sample Test
Code Block | ||
---|---|---|
| ||
package component_tests
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"kafka-stream-operator/component-tests/containers"
"kafka-stream-operator/component-tests/services"
"os"
"testing"
"time"
)
var networkName = "test"
var inTopic = "ncyd_test_in"
var outTopic = "ncyd_test_out"
var kafkaStreamOperatorVersion = "22.0.0-SNAPSHOT"
var compose *testcontainers.LocalDockerCompose
var producer *services.Producer
var consumer *services.Consumer
var kafkaContainer *containers.KafkaContainer
var zookeeperContainer *containers.ZookeeperContainer
var kafkaStreamOperatorContainer *containers.KafkaStreamOperatorContainer
var network testcontainers.Network
var ctx context.Context
func TestMain(m *testing.M) {
defer teardown()
setup()
m.Run()
}
func setup(){
logrus.Info("Setup")
//read version
v := os.Getenv("VERSION")
if v !="" {
kafkaStreamOperatorVersion = v
logrus.Infof("Setting Kafka Stream Operator Image Version to: %v", kafkaStreamOperatorVersion)
}
var err error
ctx = context.Background()
network, err = testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
CheckDuplicate: true,
},
})
if err != nil {
logrus.Fatalf("Failed to create network: %v",err)
}
zookeeperContainer, err = containers.NewZookeeperContainer(ctx,networkName)
if err != nil{
logrus.Fatalf("Failed to start zookeeper: %v", err)
}
kafkaContainer, err = containers.NewKafkaContainer(ctx,networkName)
if err != nil{
logrus.Fatalf("Failed to start kafka: %v", err)
}
kafkaStreamOperatorContainer, err = containers.NewKafkaStreamOperatorContainer(ctx,networkName,kafkaStreamOperatorVersion)
if err != nil{
logrus.Fatalf("Failed to start kafka stream operator: %v", err)
}
var brokers= []string{ "localhost:9092"}
logrus.Info("Creating Producer")
producer, _ = services.NewProducer(brokers,inTopic)
logrus.Info("Creating Consumer")
consumer,_ = services.NewConsumer(brokers,inTopic)
go consumer.Start()
logrus.Info("Consumer Started")
}
func teardown(){
logrus.Info("Teardown")
kafkaStreamOperatorContainer.Terminate()
kafkaContainer.Terminate()
zookeeperContainer.Terminate()
network.Remove(ctx)
//time.Sleep(1 * time.Second)
}
func TestSuccess(t *testing.T) {
logrus.Info("testSuccess running")
expectedMessage := "{\"log\":\"9 - testing log for user test\", \"kubernetes\":{\"host\":\"docker-desktop\"}}"
//send 10 messages
logrus.Info("Sending 10 Messages")
for i:=0;i<10;i++ {
message := fmt.Sprintf("{\"log\":\"%d - testing log for user test\", \"kubernetes\":{\"host\":\"docker-desktop\"}}",i)
err := producer.Send("",message)
if err != nil {
t.Errorf("Failed to send message: %v", err)
t.Fail()
}
}
logrus.Info("Messages sent")
time.Sleep(30 * time.Second )
logrus.Info("Reading Messages")
outputMessage, err := consumer.Read()
if err !=nil {
t.Errorf("Failed to receive message: %v", err)
t.Fail()
}
logrus.Infof("Received Message %v",outputMessage)
assert.Equal(t,expectedMessage,outputMessage,"Failed to retrieve expected output")
} |
Code Block | ||
---|---|---|
| ||
package containers
import (
"context"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
type KafkaContainer struct {
Container testcontainers.Container
context context.Context
dockerId string
}
func NewKafkaContainer(ctx context.Context, networkName string ) (*KafkaContainer, error){
kafkaContainer := &KafkaContainer{
Container: nil,
context: ctx,
}
req := testcontainers.ContainerRequest{
Image: "bitnami/kafka:3" ,
Name: "kafka",
Hostname: "kafka",
Networks: []string{ networkName},
WaitingFor: wait.ForLog("Ready to serve as the new controller with epoch 1"),
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_CFG_ZOOKEEPER_CONNECT": "zookeeper:2181",
"ALLOW_PLAINTEXT_LISTENER": "yes",
"KAFKA_CFG_LISTENERS": "INTERNAL://:9093,EXTERNAL://:9092",
"KAFKA_CFG_ADVERTISED_LISTENERS": "INTERNAL://:9093,EXTERNAL://localhost:9092",
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT",
"KAFKA_CFG_INTER_BROKER_LISTENER_NAME": "INTERNAL",
},
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
return nil, err
}
kafkaContainer.Container = container
return kafkaContainer,nil
}
func (k KafkaContainer) Terminate() {
k.Container.Terminate(k.context)
//time.Sleep(1 * time.Second)
} |
Code Block | ||
---|---|---|
| ||
package containers
import (
"context"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
type ZookeeperContainer struct {
container testcontainers.Container
context context.Context
dockerId string
}
func NewZookeeperContainer(ctx context.Context, networkName string) (*ZookeeperContainer, error){
zookeeperContainer := &ZookeeperContainer{
container: nil,
context: ctx,
}
req := testcontainers.ContainerRequest{
Image: "bitnami/zookeeper:3.7",
Name: "zookeeper",
Hostname: "zookeeper",
Networks: []string{ networkName},
WaitingFor: wait.ForLog("Started AdminServer on address"),
ExposedPorts: []string{"2181/tcp"},
Env: map[string]string{
"ALLOW_ANONYMOUS_LOGIN": "yes",
},
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
return nil, err
}
zookeeperContainer.container = container
return zookeeperContainer,nil
}
func (z ZookeeperContainer) Terminate() {
z.container.Terminate(z.context)
//time.Sleep(1 * time.Second)
} |
Sample Test Case using Docker-Compose
...