Spring boot graphql-java, подписки и потоковые прослушиватели - PullRequest
0 голосов
/ 18 мая 2018

Я пытаюсь создать наблюдаемое из ответа Streamlistener, но не могу этого сделать.Я совершенно новичок в идее EventObservable и Reactive Java.Было бы полезно, если бы кто-то мог заглянуть в мой код, чтобы убедиться, что он построен правильно, и если мое понимание концепции находится на правильном пути.

@Component
public class EventObservable {

private Observable<Event> observable = new Observable<Event>() {
    @Override
    protected void subscribeActual(Observer<? super Event> observer) {
        return;
    }
};

public Observable<Event> getObservable () {
    return this.observable;
}
}

@EnableBinding(ConsumerChannels.class)
public class EventConsumer {

@Autowired
private EventObservable eventObservable;

@StreamListener(ConsumerChannels.EVENT_NOTIFICATION_CHANNEL)
public void fetchEvent(Message asd){
    Event event = new Event("123123","google.io","This is an event","Singapore");
    eventObservable.getObservable().just(event);
    return;
}
}

@Component
public class SubscriptionGraphQlUtilities {

private Logger logger = LoggerFactory.getLogger(SubscriptionGraphQlUtilities.class);

@Value("classpath:schemas.graphqls")
private Resource schemaResource;

private final static EventPublisher EVENT_PUBLISHER = new EventPublisher();

private final GraphQLSchema graphQLSchema;

public SubscriptionGraphQlUtilities(){
    graphQLSchema = buildSchema();
}

private GraphQLSchema buildSchema() {
    //
    // reads a file that provides the schema types
    //
    Reader streamReader = loadSchemaFile("schemas.graphqls");
    TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(streamReader);

    RuntimeWiring wiring = RuntimeWiring.newRuntimeWiring()
            .type(newTypeWiring("subscription")
                    .dataFetcher("subscribeEvent", getEventsDataFetcher())
            )
            .build();

    return new SchemaGenerator().makeExecutableSchema(typeRegistry, wiring);
}

private DataFetcher getEventsDataFetcher(){
    return environment -> {
        return EVENT_PUBLISHER.getPublisher();
    };
}

public GraphQLSchema getGraphQLSchema() {
    return graphQLSchema;
}

@SuppressWarnings("SameParameterValue")
private Reader loadSchemaFile(String name) {
    InputStream stream = getClass().getClassLoader().getResourceAsStream(name);
    return new InputStreamReader(stream);
}
}

@EnableBinding(ConsumerChannels.class)
public class SubscriptionConsumer {

private Logger logger = LoggerFactory.getLogger(SubscriptionConsumer.class);

private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
private final SubscriptionGraphQlUtilities graphQlUtilities = new SubscriptionGraphQlUtilities();

Instrumentation instrumentation = new ChainedInstrumentation(
        Collections.singletonList(new TracingInstrumentation())
);

@StreamListener(ConsumerChannels.QUERY_INPUT)
public void fetchSubscription(Message asd){
    logger.info("Received subscription query:{}",asd.toString());
    try {
        String query = "subscription{subscribeEvent{name}}";
        String query2 = "subscription {\n    subscribeEvent{\n\t\tid\n        name\n\t\tdescription\n\t\tlocation\n\t}\n}";

        GraphQL graphQL = GraphQL
                .newGraphQL(graphQlUtilities.getGraphQLSchema())
                .instrumentation(instrumentation)
                .build();

        ExecutionResult executionResult = graphQL.execute(query2);

        Publisher<ExecutionResult> eventStream = executionResult.getData();
        eventStream.subscribe(new Subscriber<ExecutionResult>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                logger.info("Successfully subscribed");
                subscriptionRef.set(subscription);
                request(1);
            }

            @Override
            public void onNext(ExecutionResult executionResult) {
                logger.info("Sending event updates");
                Object result = executionResult.getData();
                logger.info("This is the event,{}", result);
                request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                logger.error("Subscription threw an exception", throwable);
            }

            @Override
            public void onComplete() {
                logger.info("Subscription complete");
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
    return;
}

private void request(int n) {
    Subscription subscription = subscriptionRef.get();
    if (subscription != null) {
        subscription.request(n);
    }
}
}
...