Spark Streaming Unit Test в Java - PullRequest
       28

Spark Streaming Unit Test в Java

0 голосов
/ 27 апреля 2018

Я пытаюсь написать юнит-тест для моего задания потоковой передачи Spark. Моя работа с потоковым воспроизведением использует сообщения от MQ и отправляет их в тему kafka.

Мой подход

  • Отправить тестовое сообщение MQ
  • Запустить потоковое задание в отдельном потоке. (Потоковая работа будет толкать сообщения кафке в тему " topic1 ")
  • потребитель kafka продолжает опросить topic1 .
  • Как только сообщение получено, остановите цепочку и вырвитесь из цикла.

Ниже мой код, и он не работает. Задание потоковой передачи Spark началось нормально, но как только началось потоковое задание, цикл while перестал работать. Не уверен в причине, так как я новичок в теме Concurrency 1023 *

public class StreamingJobTest {

private static KafkaConsumer<String, String> consumer;

@BeforeClass
public static void setUpClass()  {

    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9090");
    properties.put("subscribe", "topic1");
    properties.put("startingOffsets", "earliest");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    consumer = new KafkaConsumer<String, String>(properties);



}


@Test
public void create_test() {
    String[] arguments = new String[]{};
    ConsumerRecords<String, String> records;

    Thread thread = new Thread(() -> StreamingJob.main(arguments));
    thread.start();

     //send a message to MQ.

    MqSender mqSender = new MqSender();
    mqSender.mqPushMsg("TestMsg");

    //keep polling the kafka topic.

    while(true){
        System.out.println("Polling...");
        records = consumer.poll(100);

        if(!records.isEmpty()){

            thread.interrupt();
            break;
        }

    assertNotNull(records);

    }


}

}

Почему мой цикл перестал работать после запуска потокового задания? Согласно моему пониманию, потоковая передача будет выполняться в отдельном потоке, верно?

1 Ответ

0 голосов
/ 28 апреля 2018

Я разобрался сам. Мне нужно подписать тему в отдельной строке. Я добавил это в мои свойства. И еще groupid обязателен в кафке и я его пропустил. Теперь он отлично работает для меня. ниже приведен код для подписки на тему.

consumer.subscribe(Arrays.asList("topic1")); 
...