@Test
public void testSuccess() {
String topic="testLog";
String workspaceId = UUID.randomUUID().toString();
String workspaceKey = UUID.randomUUID().toString();
final Map<String, String> properties = ImmutableMap.of("workspace.id", workspaceId, "workspace.key", workspaceKey);
final Map<String, Object> messageMap = ImmutableMap.of("firstName", "example", "lastName", "user");
try {
LogAnalyticsGateway mockedLogAnalyticsGateway = mock(LogAnalyticsGateway.class);
LogAnlayticsSinkTask logAnlayticsSinkTask = new LogAnlayticsSinkTask(mockedLogAnalyticsGateway);
//start
logAnlayticsSinkTask.start(properties);
//define SinkRecord
final SinkRecord sinkRecord = new SinkRecord(topic,1,null,null,null, messageMap,1234L);
//send SinkRecord
ArrayList<SinkRecord> records = new ArrayList<>();
records.add(sinkRecord);
logAnlayticsSinkTask.put(records);
//verify arguments sent to LogAnalyticsGateway.sendData method
ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> rawJsonCapture = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> logTypeCapture = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> authorizationCapture = ArgumentCaptor.forClass(String.class);
verify(mockedLogAnalyticsGateway).sendData(urlCaptor.capture(),rawJsonCapture.capture(),logTypeCapture.capture(),authorizationCapture.capture());
assertThat("Logtype does not match topic",logTypeCapture.getValue().equals(topic));
assertThat("WorkspaceId not specified in URL",urlCaptor.getValue().startsWith("https://" + workspaceId));
ObjectMapper mapper = new ObjectMapper();
JsonNode entries = mapper.readTree(rawJsonCapture.getValue());
JsonNode entry = entries.get(0);
assertThat("json message incorrect firstname",entry.get("firstName").asText().equals("example"));
assertThat("json message incorrect lastname",entry.get("lastName").asText().equals("user"));
}catch (Exception ex){
fail("Unforseen exception occured");
}