Spring Cloud Dataflow | Кафка Биндер | Ошибка: KafkaTopicProvisioner - не удалось создать тему - PullRequest
0 голосов
/ 21 марта 2019

Я новичок в Spring Cloud Dataflow и пытаюсь создать базовое потоковое приложение с источником HTTP, пользовательским процессором и Log Sink.Я написал собственный процессор, который использует JAXB для демаршаллизации записей XML, полученных из источника HTTP, в объекты Java.

Пользовательский процессор:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.Transformer;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import java.io.StringReader;


@EnableBinding(Processor.class)
@SpringBootApplication
public class ProcessorApplication {

    //@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    @StreamListener(Processor.OUTPUT)
    @Output(Processor.OUTPUT)
    public Object transform(@Payload String xmlstream) {
        try{
            JAXBContext jaxbContext = JAXBContext.newInstance(Customer.class);
            Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
            XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(xmlstream));
            Customer customer = (Customer) jaxbUnmarshaller.unmarshal(reader);
            System.out.println(customer);
            return customer;
        }
        catch (Exception e){
            throw new IllegalStateException(e);
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class, args);
    }
}

Зарегистрировал приложение процессора следующим образом:

dataflow:>app register --name myprocessor --type processor --uri file://home/test/scdfCustomApps/processor-0.0.1-SNAPSHOT.jar

Создал и развернул поток как:

stream create --name httptest --definition "http --port=9000 | myprocessor | log" --deploy

В журнале процессора отображаются следующие исключения:

2019-03-21 09: 39: 50.254 ОШИБКА 10461 --- [ask-scheduler-1] oscsbkpKafkaTopicProvisioner: не удалось создать темы

org.apache.kafka.common.errors.NotControllerException: Это неправильный контроллер для этого кластера.

2019-03-21 09: 39: 50.255 ОШИБКА 10461 --- [ask-scheduler-1] oscloud.stream.binding.BindingService: Не удалось создать привязку производителя;повторная попытка через 30 секунд

org.springframework.cloud.stream.provisioning.ProvisioningException: Предоставление исключения;вложенное исключение: org.apache.kafka.common.errors.NotControllerException: это неправильный контроллер для этого кластера.

Вопросы:

  1. Нужна ли дополнительная настройка для регистрации и развертывания пользовательских приложений?
  2. Я создал другой поток 'http |log ', который работал без проблем.Как эти связи происходят между источником, процессором и приемником?

Версии:
Сервер потоков данных Spring Cloud: 1.7.3
Spring Cloud: Greenwich.SR1
Установленная версия Kafka: 0.10

...