Проект Spring Boot (Jhipster) с использованием kafka Producer и C # Consumer - PullRequest
0 голосов
/ 27 июня 2019

Я использую проект Jhipster kafka с этими предложениями

Greting.java

  package com.nunsys.lupetime.messaging;

public class Greeting {
    private String message;

    public Greeting() {
    }

    public String getMessage() {
        return message;
    }

    public Greeting setMessage(String message) {
        this.message = message;
        return this;
    }
}

ProducerChannel.java

public interface ProducerChannel {

    String CHANNEL = "messageChannel";

    @Output
    MessageChannel messageChannel();
}

ConsumerChannel.java

public interface ConsumerChannel {

    String CHANNEL = "subscribableChannel";

    @Input
    SubscribableChannel subscribableChannel();
}

ProducerResource.java

@RestController
@RequestMapping("/api")
public class ProducerResource{

    private MessageChannel channel;

    public ProducerResource(ProducerChannel channel) {
        this.channel = channel.messageChannel();
    }

    @GetMapping("/greetings/{count}")
    @Timed
    public void produce(@PathVariable int count) {
        while(count > 0) {
            channel.send(MessageBuilder.withPayload(new Greeting().setMessage("Kafka funciona!: " + count)).build());
            count--;
        }
    }

} ConsumerService.java

@Service
public class ConsumerService {

    private final Logger log = LoggerFactory.getLogger(ConsumerService.class);


    @StreamListener(ConsumerChannel.CHANNEL)
    public void consume(Greeting greeting) {
        log.info("Mensaje recibido: {}.", greeting.getMessage());
    }
}

на примере JHipster kafka

Все это работает, и когда я отправляючерез postmant что-нибудь для http://localhost:8080/api/greetings/2 (например) это работает, и это отображается в консоли.

Но теперь я хочу создать слушатель в консольном приложении C # .NET для воспроизведения так же, какконсоль Java, и я попытался сделать это:

using KafkaNet;
using KafkaNet.Model;
using System;
using System.Text;

namespace kafka
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new KafkaOptions(new Uri("http://localhost:9092"),
               new Uri("http://localhost:9092"));
            var router = new BrokerRouter(options);
            var consumer = new KafkaNet.Consumer(new ConsumerOptions("",
               new BrokerRouter(options)));

            //Consume returns a blocking IEnumerable (ie: never ending stream)
            foreach (var message in consumer.Consume())
            {
                 Console.WriteLine("Response: P{0},O{1} : {2}",
                   message.Meta.PartitionId, message.Meta.Offset,
                   Encoding.UTF8.GetString(message.Value));
            }
        }
    }
}

Но я не могу показать сообщение, буквально я ничего не могу получить.Кто-нибудь знает, что я делаю не так?Это мой первый раз с консольным приложением C #.

Спасибо!

...