Я пытаюсь создать наблюдаемое из ответа Streamlistener, но не могу этого сделать.Я совершенно новичок в идее EventObservable и Reactive Java.Было бы полезно, если бы кто-то мог заглянуть в мой код, чтобы убедиться, что он построен правильно, и если мое понимание концепции находится на правильном пути.
@Component
public class EventObservable {
private Observable<Event> observable = new Observable<Event>() {
@Override
protected void subscribeActual(Observer<? super Event> observer) {
return;
}
};
public Observable<Event> getObservable () {
return this.observable;
}
}
@EnableBinding(ConsumerChannels.class)
public class EventConsumer {
@Autowired
private EventObservable eventObservable;
@StreamListener(ConsumerChannels.EVENT_NOTIFICATION_CHANNEL)
public void fetchEvent(Message asd){
Event event = new Event("123123","google.io","This is an event","Singapore");
eventObservable.getObservable().just(event);
return;
}
}
@Component
public class SubscriptionGraphQlUtilities {
private Logger logger = LoggerFactory.getLogger(SubscriptionGraphQlUtilities.class);
@Value("classpath:schemas.graphqls")
private Resource schemaResource;
private final static EventPublisher EVENT_PUBLISHER = new EventPublisher();
private final GraphQLSchema graphQLSchema;
public SubscriptionGraphQlUtilities(){
graphQLSchema = buildSchema();
}
private GraphQLSchema buildSchema() {
//
// reads a file that provides the schema types
//
Reader streamReader = loadSchemaFile("schemas.graphqls");
TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(streamReader);
RuntimeWiring wiring = RuntimeWiring.newRuntimeWiring()
.type(newTypeWiring("subscription")
.dataFetcher("subscribeEvent", getEventsDataFetcher())
)
.build();
return new SchemaGenerator().makeExecutableSchema(typeRegistry, wiring);
}
private DataFetcher getEventsDataFetcher(){
return environment -> {
return EVENT_PUBLISHER.getPublisher();
};
}
public GraphQLSchema getGraphQLSchema() {
return graphQLSchema;
}
@SuppressWarnings("SameParameterValue")
private Reader loadSchemaFile(String name) {
InputStream stream = getClass().getClassLoader().getResourceAsStream(name);
return new InputStreamReader(stream);
}
}
@EnableBinding(ConsumerChannels.class)
public class SubscriptionConsumer {
private Logger logger = LoggerFactory.getLogger(SubscriptionConsumer.class);
private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
private final SubscriptionGraphQlUtilities graphQlUtilities = new SubscriptionGraphQlUtilities();
Instrumentation instrumentation = new ChainedInstrumentation(
Collections.singletonList(new TracingInstrumentation())
);
@StreamListener(ConsumerChannels.QUERY_INPUT)
public void fetchSubscription(Message asd){
logger.info("Received subscription query:{}",asd.toString());
try {
String query = "subscription{subscribeEvent{name}}";
String query2 = "subscription {\n subscribeEvent{\n\t\tid\n name\n\t\tdescription\n\t\tlocation\n\t}\n}";
GraphQL graphQL = GraphQL
.newGraphQL(graphQlUtilities.getGraphQLSchema())
.instrumentation(instrumentation)
.build();
ExecutionResult executionResult = graphQL.execute(query2);
Publisher<ExecutionResult> eventStream = executionResult.getData();
eventStream.subscribe(new Subscriber<ExecutionResult>() {
@Override
public void onSubscribe(Subscription subscription) {
logger.info("Successfully subscribed");
subscriptionRef.set(subscription);
request(1);
}
@Override
public void onNext(ExecutionResult executionResult) {
logger.info("Sending event updates");
Object result = executionResult.getData();
logger.info("This is the event,{}", result);
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("Subscription threw an exception", throwable);
}
@Override
public void onComplete() {
logger.info("Subscription complete");
}
});
} catch (Exception e) {
e.printStackTrace();
}
return;
}
private void request(int n) {
Subscription subscription = subscriptionRef.get();
if (subscription != null) {
subscription.request(n);
}
}
}