package io.kafka.connect.log.anlaytics;
import io.kafka.connect.log.anlaytics.sink.LogAnlayticsSinkTask;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.*;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.mockserver.integration.ClientAndServer;
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
public class LogAnalyticsTaskTest {
private ClientAndServer mockServer;
@Before
public void startMockServer() {
mockServer = startClientAndServer(1080);
mockServer.when(
request()
.withPath("/azure/api")
)
.respond(
response()
.withBody("some_response_body")
.withStatusCode(200)
);
}
@After
public void stopMockServer() {
mockServer.stop();
}
@Test
public void testSuccess(){
Map<String, String> props = new HashMap<>();
props.put("workspace.id", "xxx");
props.put("workspace.key", "xxx");
props.put("endpointUrlPattern", "http://localhost:1080/azure/api");
props.put("retryBackoffMs", "30000");
props.put("ignoreInvalidCertHost", "false");
props.put("topics", "testTopic");
LogAnlayticsSinkTask task = new LogAnlayticsSinkTask();
task.initialize(mock(SinkTaskContext.class));
task.start(props);
String topic = "testTopic";
String value = "{\"log\":\"test from John\"}";
SinkRecord record = new SinkRecord(topic, 0, null, null, null, value.getBytes(StandardCharsets.UTF_8), 0);
task.put(Collections.singleton(record));
}
}