package component_tests
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"kafka-stream-operator/component-tests/containers"
"kafka-stream-operator/component-tests/services"
"os"
"testing"
"time"
)
var networkName = "test"
var inTopic = "ncyd_test_in"
var outTopic = "ncyd_test_out"
var kafkaStreamOperatorVersion = "22.0.0-SNAPSHOT"
var compose *testcontainers.LocalDockerCompose
var producer *services.Producer
var consumer *services.Consumer
var kafkaContainer *containers.KafkaContainer
var zookeeperContainer *containers.ZookeeperContainer
var kafkaStreamOperatorContainer *containers.KafkaStreamOperatorContainer
var network testcontainers.Network
var ctx context.Context
func TestMain(m *testing.M) {
defer teardown()
setup()
m.Run()
}
func setup(){
logrus.Info("Setup")
//read version
v := os.Getenv("VERSION")
if v !="" {
kafkaStreamOperatorVersion = v
logrus.Infof("Setting Kafka Stream Operator Image Version to: %v", kafkaStreamOperatorVersion)
}
var err error
ctx = context.Background()
network, err = testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
CheckDuplicate: true,
},
})
if err != nil {
logrus.Fatalf("Failed to create network: %v",err)
}
zookeeperContainer, err = containers.NewZookeeperContainer(ctx,networkName)
if err != nil{
logrus.Fatalf("Failed to start zookeeper: %v", err)
}
kafkaContainer, err = containers.NewKafkaContainer(ctx,networkName)
if err != nil{
logrus.Fatalf("Failed to start kafka: %v", err)
}
kafkaStreamOperatorContainer, err = containers.NewKafkaStreamOperatorContainer(ctx,networkName,kafkaStreamOperatorVersion)
if err != nil{
logrus.Fatalf("Failed to start kafka stream operator: %v", err)
}
var brokers= []string{ "localhost:9092"}
logrus.Info("Creating Producer")
producer, _ = services.NewProducer(brokers,inTopic)
logrus.Info("Creating Consumer")
consumer,_ = services.NewConsumer(brokers,inTopic)
go consumer.Start()
logrus.Info("Consumer Started")
}
func teardown(){
logrus.Info("Teardown")
kafkaStreamOperatorContainer.Terminate()
kafkaContainer.Terminate()
zookeeperContainer.Terminate()
network.Remove(ctx)
//time.Sleep(1 * time.Second)
}
func TestSuccess(t *testing.T) {
logrus.Info("testSuccess running")
expectedMessage := "{\"log\":\"9 - testing log for user test\", \"kubernetes\":{\"host\":\"docker-desktop\"}}"
//send 10 messages
logrus.Info("Sending 10 Messages")
for i:=0;i<10;i++ {
message := fmt.Sprintf("{\"log\":\"%d - testing log for user test\", \"kubernetes\":{\"host\":\"docker-desktop\"}}",i)
err := producer.Send("",message)
if err != nil {
t.Errorf("Failed to send message: %v", err)
t.Fail()
}
}
logrus.Info("Messages sent")
time.Sleep(30 * time.Second )
logrus.Info("Reading Messages")
outputMessage, err := consumer.Read()
if err !=nil {
t.Errorf("Failed to receive message: %v", err)
t.Fail()
}
logrus.Infof("Received Message %v",outputMessage)
assert.Equal(t,expectedMessage,outputMessage,"Failed to retrieve expected output")
} |