Как контролировать размер буфера записи в grpc (или: как обрабатывать медленные потоковые считыватели в grpc)? - PullRequest
0 голосов
/ 15 февраля 2019

Мы широко использовали двунаправленные потоки grpc в нашей архитектуре, но у нас возникли проблемы, когда загрузка процессора на одном конце не позволяет ему читать данные так же быстро, как другой конец отправляет их.Я изо всех сил пытался понять, как grpc должен справиться с этой ситуацией.Я создал тестовую программу, которая просто имитирует эту ситуацию (используя C # и асинхронные API, которые мы используем исключительно).Серверная сторона не может успевать за скоростью данных, поступающих от клиента.Результатом является постоянное увеличение использования памяти на стороне клиента, пока, в конечном итоге, OOMing машина.Насколько я понимаю, GRPC должен иметь какую-то защиту от этого.

Я нашел аргумент канала с именем GRPC_ARG_HTT2_WRITE_BUFFER_SIZE, полагая, что он может ограничить использование памяти на стороне клиента (и может вызвать блокировку или исключение на стороне клиента после заполнения этого буфера).Эта тестовая программа отправляет 10000 байтовых сообщений, а размер буфера записи установлен в 11000, но, похоже, установка этого аргумента не имеет никакого эффекта.

Мои вопросы: правильно ли работает grpc в этом примере, и если да, то каково это?Я делаю неправильно?Почему GRPC_ARG_HTT2_WRITE_BUFFER_SIZE, кажется, не имеет никакого эффекта (и, возможно, каков предполагаемый эффект)?

Пример написан с использованием следующего сообщения / службы:

syntax = "proto3";
package test;
option csharp_namespace = "Test";

service TestService {
  rpc Publish(stream TestMsg) returns (stream TestMsg);
}

message TestMsg {
  string value = 1;
  bytes dummy = 2;
}

и программа на C # здесь:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;

namespace Test
{
  class Program
  {
    static void Main( string[] args )
    {
      if ( args[0] == "client" )
      {
        using ( new Client() )
          Console.ReadKey();
      }
      else if ( args[0] == "server" )
      {
        using ( new Server() )
          Console.ReadKey();
      }
    }
  }

  public class Server : TestService.TestServiceBase, IDisposable
  {
    private readonly Grpc.Core.Server _server;

    public Server()
    {
      _server = new Grpc.Core.Server
      {
        Services = { TestService.BindService( this ) },
        Ports = { new ServerPort( "localhost", 1234, ServerCredentials.Insecure ) }
      };
      _server.Start();
      Console.WriteLine( "Server started" );
    }

    public void Dispose()
    {
      _server.ShutdownAsync();
    }

    public override async Task Publish(
      IAsyncStreamReader<TestMsg> requestStream,
      IServerStreamWriter<TestMsg> responseStream,
      ServerCallContext context )
    {
      try
      {
        Console.WriteLine( "Client connected" );

        for (; ; )
        {
          var requestTask = requestStream.MoveNext();
          await requestTask;

          if ( requestTask.Status == TaskStatus.Canceled )
            break;
          if ( !requestTask.Result )
            break;

          var request = requestStream.Current;
          if ( request != null )
          {
            try
            {
              Console.Write( request.Value + ".." );
              // We're really working hard.
              Thread.Sleep( 1000 );
            }
            catch ( Exception ex )
            {
              await responseStream.WriteAsync( new TestMsg { Value = ex.Message } );
              Console.WriteLine( ex );
            }
          }
        }
      }
      finally
      {
        Console.WriteLine( "Client disconnected" );
      }
    }
  }

  public class Client : IDisposable
  {
    private bool _isSending;
    private readonly TestService.TestServiceClient _client;
    private readonly Channel _channel;
    private readonly IClientStreamWriter<TestMsg> _requestStream;
    private readonly Timer _timer;
    private int _i;

    public Client()
    {
      Console.WriteLine( "Client started" );

      var options = new List<ChannelOption>();
      options.Add( new ChannelOption( "GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE", 11000 ) );

      _channel = new Channel( "localhost:1234", ChannelCredentials.Insecure, options );
      _client = new TestService.TestServiceClient( _channel );
      var call = _client.Publish();
      _requestStream = call.RequestStream;

      _timer = new Timer( OnTimerTick, null, 0, 1000 / 25 );
    }

    public void Dispose()
    {
      _channel.ShutdownAsync();
      _timer.Dispose();
    }

    private async void OnTimerTick( object state )
    {
      // Skip timer ticks if the previous isn't done yet.
      if ( _isSending )
        return;

      try
      {

        _isSending = true;
        var bytes = new byte[10000];
        var msg = new TestMsg { Value = _i.ToString(), Dummy = ByteString.CopyFrom( bytes ) };

        Console.Write( _i + ".." );
        await _requestStream.WriteAsync( msg );
        ++_i;
      }
      catch ( Exception e )
      {
        Console.WriteLine( e );
      }
      finally
      {
        _isSending = false;
      }
    }
  }
}

Примечание. Для тестирования я использую пакет nuget grpc.core.1.18.0.

...