package io.kafka.connect.log.anlaytics;

import io.kafka.connect.log.anlaytics.sink.LogAnlayticsSinkTask;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.*;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.mockserver.integration.ClientAndServer;

import static org.mockserver.integration.ClientAndServer.startClientAndServer;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;


public class LogAnalyticsTaskTest {
    private ClientAndServer mockServer;

    @Before
    public void startMockServer() {
        mockServer = startClientAndServer(1080);
        mockServer.when(
                request()
                    .withPath("/azure/api")
            )
            .respond(
                response()
                .withBody("some_response_body")
                .withStatusCode(200)
            );
    }

    @After
    public void stopMockServer() {
        mockServer.stop();
    }

    @Test
    public void testSuccess(){
        Map<String, String> props = new HashMap<>();
        props.put("workspace.id", "xxx");
        props.put("workspace.key", "xxx");
        props.put("endpointUrlPattern", "http://localhost:1080/azure/api");
        props.put("retryBackoffMs", "30000");
        props.put("ignoreInvalidCertHost", "false");
        props.put("topics", "testTopic");

        LogAnlayticsSinkTask task = new LogAnlayticsSinkTask();
        task.initialize(mock(SinkTaskContext.class));
        task.start(props);

        String topic = "testTopic";
        String value = "{\"log\":\"test from John\"}";
        SinkRecord record = new SinkRecord(topic, 0, null, null, null, value.getBytes(StandardCharsets.UTF_8), 0);

        task.put(Collections.singleton(record));

    }

}


References

  • No labels