Демаршаллинг json в нуклио - PullRequest
0 голосов
/ 06 августа 2020

Я пытаюсь создать триггер кафки с помощью nuclio.

Мой файл function.yaml:

apiVersion: "nuclio.io/v1beta1"
kind: "NuclioFunction"
spec:
  description: >
    Uses the inception model of the TensorFlow open-source machine-learning library to classify images.
    The function demonstrates advanced uses of nuclio with a custom base image, third-party Python packages,
    pre-loading data into function memory (the AI Model), structured logging, and exception handling.
  runtime: "python:3.6"
  handler: handler:consumer
  minReplicas: 1
  maxReplicas: 1
  triggers:
    myKafkaTrigger:
      kind: kafka-cluster
    attributes:
      initialOffset: earliest
    topics:
        - mytopic
    brokers:
      - broker:9092
    consumerGroup: my-consumer-group

Мой файл handler.py:

def consumer(context, event):
    if event.trigger.kind == 'kafka-cluster':
        context.logger.info('Invoked from kafka-cluster')

    else:
        return 'A string response'

Я использую файл docker для запуска nuclio, указав пути к моим файлам yml и py:

FROM nuclio/uhttpc:0.0.1-amd64 as uhttpc

# Supplies processor binary, wrapper
FROM ${NUCLIO_ONBUILD_IMAGE} as processor

# From the base image
FROM ${NUCLIO_BASE_IMAGE}

# Copy required objects from the suppliers
COPY --from=processor /home/nuclio/bin/processor /usr/local/bin/processor
COPY --from=processor /home/nuclio/bin/py /opt/nuclio/
COPY --from=uhttpc /home/nuclio/bin/uhttpc /usr/local/bin/uhttpc

RUN pip install nuclio-sdk msgpack --no-index --find-links /opt/nuclio/whl


# Readiness probe
HEALTHCHECK --interval=1s --timeout=3s CMD /usr/local/bin/uhttpc --url http://127.0.0.1:8082/ready || exit 1

# USER CONTENT
ADD ./NuclioKafkaHandler.py /opt/nuclio
ADD ./function.yaml /etc/nuclio/config/processor/processor.yaml
# END OF USER CONTENT

# Run processor with configuration and platform configuration
CMD [ "processor" ]

Когда я запускаю свой docker файл в сети с помощью kafka (оба zookeeper и kafka работают в сети rmoff_kafka):

docker run --network=rmoff_kafka --rm --name kafka -p 8080:8080 kafka

Получаю ошибку как:

Error - error unmarshaling JSON: json: cannot unmarshal array into Go value of type functionconfig.Trigger
    .../nuclio/nuclio/pkg/processor/config/reader.go:47

Call stack:
Failed to write configuration
    .../nuclio/nuclio/pkg/processor/config/reader.go:47
Failed to open configuration file
    .../nuclio/nuclio/cmd/processor/app/processor.go:262
...