Я новичок в Spring Cloud Dataflow и пытаюсь создать базовое потоковое приложение с источником HTTP, пользовательским процессором и Log Sink.Я написал собственный процессор, который использует JAXB для демаршаллизации записей XML, полученных из источника HTTP, в объекты Java.
Пользовательский процессор:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.Transformer;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import java.io.StringReader;
@EnableBinding(Processor.class)
@SpringBootApplication
public class ProcessorApplication {
//@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
@StreamListener(Processor.OUTPUT)
@Output(Processor.OUTPUT)
public Object transform(@Payload String xmlstream) {
try{
JAXBContext jaxbContext = JAXBContext.newInstance(Customer.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(xmlstream));
Customer customer = (Customer) jaxbUnmarshaller.unmarshal(reader);
System.out.println(customer);
return customer;
}
catch (Exception e){
throw new IllegalStateException(e);
}
}
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
}
Зарегистрировал приложение процессора следующим образом:
dataflow:>app register --name myprocessor --type processor --uri file://home/test/scdfCustomApps/processor-0.0.1-SNAPSHOT.jar
Создал и развернул поток как:
stream create --name httptest --definition "http --port=9000 | myprocessor | log" --deploy
В журнале процессора отображаются следующие исключения:
2019-03-21 09: 39: 50.254 ОШИБКА 10461 --- [ask-scheduler-1] oscsbkpKafkaTopicProvisioner: не удалось создать темы
org.apache.kafka.common.errors.NotControllerException: Это неправильный контроллер для этого кластера.
2019-03-21 09: 39: 50.255 ОШИБКА 10461 --- [ask-scheduler-1] oscloud.stream.binding.BindingService: Не удалось создать привязку производителя;повторная попытка через 30 секунд
org.springframework.cloud.stream.provisioning.ProvisioningException: Предоставление исключения;вложенное исключение: org.apache.kafka.common.errors.NotControllerException: это неправильный контроллер для этого кластера.
Вопросы:
- Нужна ли дополнительная настройка для регистрации и развертывания пользовательских приложений?
- Я создал другой поток 'http |log ', который работал без проблем.Как эти связи происходят между источником, процессором и приемником?
Версии:
Сервер потоков данных Spring Cloud: 1.7.3
Spring Cloud: Greenwich.SR1
Установленная версия Kafka: 0.10