package mockconsumer
import (
"errors"
"credential-management/src/internal/dao"
internalErrors "credential-management/src/internal/error"
"credential-management"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"kafka-azure-sink/src/internal/modelmock"
"slices"
"github.com/stretchr/testify/mock"testing"
)
typefunc MockDaoClient structTestSuccessAdditionOfKafkaTopics(t *testing.T) {
mock.Mock
}
type MockTransaction struct {
mock.Mock
}
func (m *MockDaoClient) Connect() error {
args := m.Called()
return args.Error(0)
}
func (m *MockDaoClient) Disconnect() error {
args := m.Called()
return args.Error(0)
}
func (m *MockDaoClient) NewTransaction() (dao.Transaction, error) {
args := m.Called()
return new(MockTransaction), args.Error(0)
}
func (m *MockDaoClient) InsertLookup(tr dao.Transaction, credentialLookup *model.CredentialLookup) *internalErrors.CredentialError {
args := m.Called(tr, credentialLookup)
return ConvertCredError(args.Get(0))
}
func (m *MockDaoClient) UpdateLookup(tr dao.Transaction, credentialLookup *model.CredentialLookup) *internalErrors.CredentialError {
args := m.Called(tr, credentialLookup)
return ConvertCredError(args.Get(0))
}
func (m *MockDaoClient) GetLookupById(tr dao.Transaction, id string) (*model.CredentialLookup, *internalErrors.CredentialError) {
args := m.Called(tr, id)
return args.Get(0).(*model.CredentialLookup), ConvertCredError(args.Get(1))
}
func (m *MockDaoClient) GetLookup(tr dao.Transaction, searchCriteria map[string]interface{}, pageInfo *dao.PageInfo) ([]*model.CredentialLookup, *internalErrors.CredentialError) {
args := m.Called(tr, searchCriteria)
return args.Get(0).([]*model.CredentialLookup), ConvertCredError(args.Get(1))
}
func (m *MockDaoClient) GetLookupCount(tr dao.Transaction, searchCriteria map[string]interface{}) (int64, *internalErrors.CredentialError) {
args := m.Called(tr, searchCriteria)
return int64(args.Get(0).(int)), ConvertCredError(args.Get(1))
}
func (m *MockDaoClient) DeleteLookup(tr dao.Transaction, id string) *internalErrors.CredentialError {
args := m.Called(tr, id)
return ConvertCredError(args.Get(0))
}
func (tr *MockTransaction) Commit() error {
return nil
}
func (tr *MockTransaction) Abort() error {
return nil
}
func ConvertCredError(arg interface{}) *internalErrors.CredentialError {
if e, ok := arg.(*internalErrors.CredentialError); ok {
return e
}
return nil
}
baseTopics := []string{"ncyd_kubernetes", "etlflow_*"}
kafkaTopics := []string{"ncyd_test1", "etlflow_ssh", "etlflow_nrd"}
initialKafkaTopics := buildTopicMap(kafkaTopics)
mock := mock.NewMockClusterAdmin()
mock.On("ListTopics").Return(initialKafkaTopics, nil).Once()
topicManager, err := NewTopicManager(mock, baseTopics, 1)
assert.Nil(t, err)
topics := topicManager.GetTopicList()
assert.True(t, slices.Contains(topics,baseTopics[0]))
assert.True(t, slices.Contains(topics,kafkaTopics[1]))
assert.True(t, slices.Contains(topics,kafkaTopics[2]))
//add a kafka topic
kafkaTopics = append(kafkaTopics, "etlflow_newTopic")
revisedKafkaTopics := buildTopicMap(kafkaTopics)
mock.On("ListTopics").Return(revisedKafkaTopics, nil).Once()
//scan for new topic
newTopic, err := topicManager.scanForNewTopics()
assert.Nil(t, err)
assert.True(t, newTopic)
}
func TestFailureInitializationError(t *testing.T) {
baseTopics := []string{"ncyd_kubernetes", "etlflow_*"}
emptyMap := buildTopicMap([]string{})
mock := mock.NewMockClusterAdmin()
mock.On("ListTopics").Return(emptyMap, errors.New("error")).Once()
_, err := NewTopicManager(mock, baseTopics, 1)
assert.NotNil(t, err)
}
func buildTopicMap(kafkaTopics []string) map[string]sarama.TopicDetail {
topicMap := make(map[string]sarama.TopicDetail)
for _, topic := range kafkaTopics {
tp := sarama.TopicDetail{}
topicMap[topic] = tp
}
return topicMap
}
... |