HERE.OLP: Как я могу "Подписаться на уведомления" на каталог? - PullRequest
0 голосов
/ 05 июня 2018

Я пытаюсь подписаться на каталог для уведомления.Но не могу найти символ "thenAppy".Пожалуйста, помогите.

// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
    queryApi.subscribeToNotifications(consumerSettings)
        .thenApply(subscription -> {
            subscription
                .notifications()
                .runWith(Sink.foreach(notification ->
                    // this callback is called each time a new batch publication happens in catalog
                    System.out.printf("catalog %s has a new version %d\n", catalogHrn, notification.getCatalogVersion())
                ), myMaterializer);
            return subscription.subscriptionControl();
        });

[ОШИБКА] ОШИБКА КОМПИЛЯЦИИ: [ИНФО] ------------------------------------------------------------- [ОШИБКА] Main.java:[41,25]невозможно найти символьный символ: метод thenApply ((subscript ...;}) расположение: интерфейс org.apache.flink.streaming.api.functions.source.SourceFunction [ERROR] Main.java:[44,65] пакет akka.stream.javadsl не существует [ОШИБКА] Main.java:[47,40] не может найти символ символа: переменная myMaterializer

Ответы [ 2 ]

0 голосов
/ 01 июля 2018

Если subscribeToNotifications является функцией блокировки, то вы можете заключить ее в завершенный метод будущего CompletableFuture.

CompletionStage<NotificationSubscriptionControl> controlStage =
        CompletableFuture.completedFuture(queryApi.subscribeToNotifications(consumerSettings))
            .thenApply(
                subscription -> {
                  subscription
                      .notifications()
                      .runWith(
                          Sink.foreach(
                              notification ->
                                  // this callback is called each time a new batch publication
                                  // happens in catalog
                                  System.out.printf(
                                      "catalog %s has a new version %d\n",
                                      catalogHrn, notification.getCatalogVersion())),
                          myMaterializer);
                  return subscription.subscriptionControl();
                });
0 голосов
/ 20 июня 2018

Судя по вашей ошибке компиляции, вы пытаетесь подписаться на уведомление в приложении Flink.Клиент запроса данных Flink queryApi возвращает SourceFunction, а не CompletionStage.Вы можете использовать его так:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .addSource(
        query.subscribeToNotifications(
            new NotificationConsumerSettings(
                "my-notification-consumer-group-1"
            )
        )
    )
    .addSink(
        notification - > System.out.printf(
            "catalog %s has a new version %d\n",
            STREAMING_INPUT_CATALOG_HRN,
            notification
            .getCatalogVersion()
        )
    );
...