Я пытаюсь написать юнит-тест для моего задания потоковой передачи Spark. Моя работа с потоковым воспроизведением использует сообщения от MQ
и отправляет их в тему kafka
.
Мой подход
- Отправить тестовое сообщение MQ
- Запустить потоковое задание в отдельном потоке. (Потоковая работа будет толкать
сообщения кафке в тему " topic1 ")
- потребитель kafka продолжает опросить topic1 .
- Как только сообщение получено, остановите цепочку и вырвитесь из цикла.
Ниже мой код, и он не работает. Задание потоковой передачи Spark началось нормально, но как только началось потоковое задание, цикл while
перестал работать. Не уверен в причине, так как я новичок в теме Concurrency
1023 *
public class StreamingJobTest {
private static KafkaConsumer<String, String> consumer;
@BeforeClass
public static void setUpClass() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9090");
properties.put("subscribe", "topic1");
properties.put("startingOffsets", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(properties);
}
@Test
public void create_test() {
String[] arguments = new String[]{};
ConsumerRecords<String, String> records;
Thread thread = new Thread(() -> StreamingJob.main(arguments));
thread.start();
//send a message to MQ.
MqSender mqSender = new MqSender();
mqSender.mqPushMsg("TestMsg");
//keep polling the kafka topic.
while(true){
System.out.println("Polling...");
records = consumer.poll(100);
if(!records.isEmpty()){
thread.interrupt();
break;
}
assertNotNull(records);
}
}
}
Почему мой цикл перестал работать после запуска потокового задания? Согласно моему пониманию, потоковая передача будет выполняться в отдельном потоке, верно?