gRP C, оставляя потоки ответов открытыми для подписок - PullRequest
5 голосов
/ 17 июня 2020

Я попытался определить службу gRP C, где клиент может подписаться на получение широковещательных сообщений, а также может отправлять их.

syntax = "proto3";

package Messenger;

service MessengerService {
    rpc SubscribeForMessages(User) returns (stream Message) {}
    rpc SendMessage(Message) returns (Close) {}
}

message User {
    string displayName = 1;
}

message Message {
    User from = 1;
    string message = 2;
}

message Close {}

Моя идея заключалась в том, что когда клиент запрашивает подписку на сообщений, поток ответов будет добавлен в коллекцию потоков ответов, и когда сообщение отправлено, сообщение отправляется через все потоки ответов.

Однако, когда мой сервер пытается записать в ответ потоки, я получаю исключение System.InvalidOperationException: 'Response stream has already been completed.'

Есть ли способ указать серверу, чтобы потоки оставались открытыми, чтобы через них можно было отправлять новые сообщения? Или это не то, для чего был разработан gRP C, и следует использовать другую технологию?

Конечная цель услуги будет разрешать несколько типов подписок (может быть на новые сообщения, обновления погоды и т. Д. c ...) через разных клиентов, написанных на разных языках (C#, Java, et c ...). Часть, связанная с различными языками, в основном является причиной, по которой я выбрал gRP C, чтобы попробовать это, хотя я намереваюсь написать сервер в C#.


Пример реализации

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Messenger;

namespace SimpleGrpcTestStream
{
    /*
    Dependencies
Install-Package Google.Protobuf
Install-Package Grpc
Install-Package Grpc.Tools
Install-Package System.Interactive.Async
Install-Package System.Linq.Async

    */
    internal static class Program
    {
        private static void Main()
        {
            var messengerServer = new MessengerServer();
            messengerServer.Start();

            var channel = Common.GetNewInsecureChannel();
            var client = new MessengerService.MessengerServiceClient(channel);
            var clientUser = Common.GetUser("Client");
            var otherUser = Common.GetUser("Other");

            var cancelClientSubscription = AddCancellableMessageSubscription(client, clientUser);
            var cancelOtherSubscription = AddCancellableMessageSubscription(client, otherUser);

            client.SendMessage(new Message { From = clientUser, Message_ = "Hello" });
            client.SendMessage(new Message { From = otherUser, Message_ = "World" });
            client.SendMessage(new Message { From = clientUser, Message_ = "Whoop" });

            cancelClientSubscription.Cancel();
            cancelOtherSubscription.Cancel();
            channel.ShutdownAsync().Wait();
            messengerServer.ShutDown().Wait();
        }

        private static CancellationTokenSource AddCancellableMessageSubscription(
            MessengerService.MessengerServiceClient client,
            User user)
        {
            var cancelMessageSubscription = new CancellationTokenSource();

            var messages = client.SubscribeForMessages(user);

            var messageSubscription = messages
                .ResponseStream
                .ToAsyncEnumerable()
                .Finally(() => messages.Dispose());

            messageSubscription.ForEachAsync(
                message => Console.WriteLine($"New Message: {message.Message_}"),
                cancelMessageSubscription.Token);

            return cancelMessageSubscription;
        }
    }

    public static class Common
    {
        private const int Port = 50051;

        private const string Host = "localhost";

        private static readonly string ChannelAddress = $"{Host}:{Port}";

        public static User GetUser(string name) => new User { DisplayName = name };

        public static readonly User ServerUser = GetUser("Server");

        public static readonly Close EmptyClose = new Close();

        public static Channel GetNewInsecureChannel() => new Channel(ChannelAddress, ChannelCredentials.Insecure);

        public static ServerPort GetNewInsecureServerPort() => new ServerPort(Host, Port, ServerCredentials.Insecure);
    }

    public sealed class MessengerServer : MessengerService.MessengerServiceBase
    {
        private readonly Server _server;

        public MessengerServer()
        {
            _server = new Server
            {
                Ports = { Common.GetNewInsecureServerPort() },
                Services = { MessengerService.BindService(this) },
            };
        }

        public void Start()
        {
            _server.Start();
        }

        public async Task ShutDown()
        {
            await _server.ShutdownAsync().ConfigureAwait(false);
        }

        private readonly ConcurrentDictionary<User, IServerStreamWriter<Message>> _messageSubscriptions = new ConcurrentDictionary<User, IServerStreamWriter<Message>>();

        public override async Task<Close> SendMessage(Message request, ServerCallContext context)
        {
            await Task.Run(() =>
            {
                foreach (var (_, messageStream) in _messageSubscriptions)
                {
                    messageStream.WriteAsync(request);
                }
            }).ConfigureAwait(false);

            return await Task.FromResult(Common.EmptyClose).ConfigureAwait(false);
        }

        public override async Task SubscribeForMessages(User request, IServerStreamWriter<Message> responseStream, ServerCallContext context)
        {
            await Task.Run(() =>
            {
                responseStream.WriteAsync(new Message
                {
                    From = Common.ServerUser,
                    Message_ = $"{request.DisplayName} is listening for messages!",
                });
                _messageSubscriptions.TryAdd(request, responseStream);
            }).ConfigureAwait(false);
        }
    }

    public static class AsyncStreamReaderExtensions
    {
        public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IAsyncStreamReader<T> asyncStreamReader)
        {
            if (asyncStreamReader is null) { throw new ArgumentNullException(nameof(asyncStreamReader)); }

            return new ToAsyncEnumerableEnumerable<T>(asyncStreamReader);
        }

        private sealed class ToAsyncEnumerableEnumerable<T> : IAsyncEnumerable<T>
        {
            public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
                => new ToAsyncEnumerator<T>(_asyncStreamReader, cancellationToken);

            private readonly IAsyncStreamReader<T> _asyncStreamReader;

            public ToAsyncEnumerableEnumerable(IAsyncStreamReader<T> asyncStreamReader)
            {
                _asyncStreamReader = asyncStreamReader;
            }

            private sealed class ToAsyncEnumerator<TEnumerator> : IAsyncEnumerator<TEnumerator>
            {
                public TEnumerator Current => _asyncStreamReader.Current;

                public async ValueTask<bool> MoveNextAsync() => await _asyncStreamReader.MoveNext(_cancellationToken);

                public ValueTask DisposeAsync() => default;

                private readonly IAsyncStreamReader<TEnumerator> _asyncStreamReader;
                private readonly CancellationToken _cancellationToken;

                public ToAsyncEnumerator(IAsyncStreamReader<TEnumerator> asyncStreamReader, CancellationToken cancellationToken)
                {
                    _asyncStreamReader = asyncStreamReader;
                    _cancellationToken = cancellationToken;
                }
            }
        }
    }
}

Ответы [ 2 ]

1 голос
/ 20 июня 2020

Проблема, с которой вы столкнулись, связана с тем, что MessengerServer.SubscribeForMessages возвращается немедленно. Как только этот метод возвращается, поток закрывается.

Вам понадобится реализация, подобная этой, чтобы поддерживать поток в рабочем состоянии:

public class MessengerService : MessengerServiceBase
{
    private static readonly ConcurrentDictionary<User, IServerStreamWriter<Message>> MessageSubscriptions =
        new Dictionary<User, IServerStreamWriter<Message>>();

    public override async Task SubscribeForMessages(User request, IServerStreamWriter<ReferralAssignment> responseStream, ServerCallContext context)
    {
        if (!MessageSubscriptions.TryAdd(request))
        {
            // User is already subscribed
            return;
        }

        // Keep the stream open so we can continue writing new Messages as they are pushed
        while (!context.CancellationToken.IsCancellationRequested)
        {
            // Avoid pegging CPU
            await Task.Delay(100);
        }

        // Cancellation was requested, remove the stream from stream map
        MessageSubscriptions.TryRemove(request);
    }
}

Что касается отмены подписки / отмены, есть два возможных подхода:

  1. Клиент может удерживать CancellationToken и вызывать Cancel(), когда он хочет отключиться
  2. Сервер может удерживать CancellationToken, который вы бы затем сохраните вместе с IServerStreamWriter в словаре MessageSubscriptions через Tuple или аналогичный. Затем вы можете ввести на сервере метод Unsubscribe, который ищет CancellationToken по User и вызывает Cancel на стороне сервера
0 голосов
/ 21 июня 2020

Подобно ответу Джона Холлидея, можно использовать неопределенно длинный Task.Delay(-1) и передать токен отмены контекста.

Попытка может быть использована для удаления конца потока ответа сервера при отмене задания.

public override async Task SubscribeForMessages(User request, IServerStreamWriter<Message> responseStream, ServerCallContext context)
{
    if (_messageSubscriptions.ContainsKey(request))
    {
        return;
    }

    await responseStream.WriteAsync(new Message
    {
        From = Common.ServerUser,
        Message_ = $"{request.DisplayName} is listening for messages!",
    }).ConfigureAwait(false);

    _messageSubscriptions.TryAdd(request, responseStream);

    try
    {
        await Task.Delay(-1, context.CancellationToken);
    }
    catch (TaskCanceledException)
    {
        _messageSubscriptions.TryRemove(request, out _);
    }
}
...