Как проверить, что я получаю данные от Kafka в Spark-структурированном потоке с Java? - PullRequest
1 голос
/ 11 июля 2019

Я пытаюсь получить данные от kafka для потоковой передачи с искровой структурой, но я не могу проверить, хорошо ли я себя чувствую или нет. Я хочу напечатать данные из kafka на консоли, но ничего не приходит на консоли. Это может быть из-за огромного размера данных от Кафки, но я понятия не имею.

Я использую Windows 10. Я проверил, что порт для kafka установлен "netstat -an | findstr TARGET_IP". TARGET_IP означает IP производителя кафки. По PID из приведенного выше результата я проверил "список задач / FI" PID eq 5406 "". 5406 - это PID файла java.exe, а используемая память для PID 5406 постоянно увеличивается.

public static void main( String[] args ) {
    SparkSession spark = SparkSession.builder()
            .master("local")
            .appName("App").getOrCreate();
    Dataset<Row> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "TARGET_IP:TARGET_PORT")
            .option("subscribe", "TARGET_TOPIC")
            .option("startingOffsets", "earliest")
            .load();
    df.printSchema();
    StreamingQuery queryone = df.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
    try {
        queryone.awaitTermination();
    } catch (StreamingQueryException e) {
        e.printStackTrace();
    }
}

1 Ответ

0 голосов
/ 11 июля 2019

Я проверяю ваш код, он может печатать.

Сначала проверьте тему кафки , убедитесь, что в ней есть сообщение.

Затем проверьте ваше искровое приложение, убедитесь, что оно может подключиться к вашему брокеру kafka.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...