Как использовать Thrift для передачи сообщений вместо RPC - PullRequest
0 голосов
/ 01 сентября 2018

Я знаю, что Thrift в основном нацелен на полноценный клиент-серверный RPC, но, глядя на высокоуровневую архитектуру, мне кажется, что он также должен быть идеально пригоден для двунаправленной передачи сообщений.

То, что я хотел бы построить на обоих концах (C, .NET Core), следующее:

  • Метод приема: имеет ссылку на сокет, читает полное сообщение, десериализует его, возвращает его
  • Метод отправки: имеет ссылку на сокет, сериализует данное сообщение, отправляет его на провод.

Мне не нужны многопоточные серверы, ... что-нибудь необычное. По существу, что я хотел бы получить на вершине того, что, например. Protobuffs предлагает готовую обработку буферизации всего сообщения на приемном конце и, как правило, формирование сообщения.

Проблема в том, что я не смог найти никакой документации о том, как начать создавать это с использованием текущих API библиотек libs (лично меня интересуют .NET Core и C). Единственное, что я нашел, это вопрос , но он не указывает ни на какие ресурсы.

Ответы [ 2 ]

0 голосов
/ 14 сентября 2018

Некоторые заметки о чем-то очень похожем:

  • Клиенты, использующие как C # (сочетание .Net Core и Framework), так и C ++
  • Использование Thrift RPC, а также "простых сообщений" для публикации / подписки и общей сериализации

Предложение поместить все сообщения в объединение верхнего уровня является хорошим, поскольку оно облегчит десериализацию сообщений.

С учетом следующей заботы:

struct SubscribeRequest {
    1: string topic,
    2: string appid,
}

struct SubscribeReply {
    1: bool success,
    2: string topic,
}
service HttpService {
    HttpSDKDataTypes.SubscribeReply Subscribe(1: HttpSDKDataTypes.SubscribeRequest message),
}

thrift -gen netcore дает вам:

  public async Task<Ruyi.SDK.Http.SubscribeReply> SubscribeAsync(Ruyi.SDK.Http.SubscribeRequest message, CancellationToken cancellationToken)
  {
    await OutputProtocol.WriteMessageBeginAsync(new TMessage("Subscribe", TMessageType.Call, SeqId), cancellationToken);

Идентификатор сообщения включается вызовом RPC. Если вы не используете вызов RPC, вы получаете «сырые» структуры без указания того, как их десериализовать.

Помещая их в union:

union UnionExample {
    1: SubscribeRequest request,
    2: SubscribeReply reply,    
}

Заботится об этом для вас:

public async Task WriteAsync(TProtocol oprot, CancellationToken cancellationToken)
{
  oprot.IncrementRecursionDepth();
  try
  {
    var struc = new TStruct("SubscribeRequest");
    await oprot.WriteStructBeginAsync(struc, cancellationToken);
    var field = new TField();
    if (Topic != null && __isset.topic)
    {
      field.Name = "topic";
      field.Type = TType.String;
      field.ID = 1;

После десериализации их можно обработать с помощью:

    public void handler(UnionExample example)
    {
        if (example.__isset.request)
        {
            SubscribeRequest msg = example.request;
            // ...
        }
        else if (example.__isset.reply)
        {
            SubscribeReply msg = example.reply;
            // ...
        }

Проверьте параметры, доступные в генераторах. thrift -gen "csharp:union,async" давайте использовать сопоставление с образцом :

   public void handler(UnionExample example)
    {
        switch (example.Data)
        {
            case SubscribeRequest msg:
                //...
            case SubscribeReply msg:
                //...

К сожалению, генератор netcore не делает этого в 0.11.0.

В репозитории Thrift Github есть примеров сериализации в память (вместо использования RPC). В целом это похоже на:

Stream stm = new MemoryStream();
TTransport trans = new TStreamTransport(null, stm);
TProtocol prot = new TJSONProtocol(trans);

Если вы собираетесь создавать много-много экземпляров MemoryStream, взгляните на Microsoft.IO.RecycableMemoryStream.

Использование собственного протокола / транспорта упростит процесс отправки сообщения, поскольку он будет обрабатывать шаблон (и позволит избежать лишних объектов, которые сначала сериализуются в память, а затем все, что вы с ним делаете). Папка thrift contrib/ уже упоминалась. Вот наш пример на C # использования Thrift поверх ZeroMQ.

Последнее замечание. Если вы вообще собираетесь использовать функциональность RPC, напишите свои сервисные методы с одним параметром struct. Значение:

service HttpService {
    // Do this
    string Subscribe(1: SubscribeRequest message),
    // Not this
    string Subscribe(1: string topic, 2: string appid,),
}

Это облегчит избавление от RPC и / или повторное использование сообщения.

0 голосов
/ 03 сентября 2018

Thrift - это среда RPC и сериализации. Это означает, что вы также можете использовать только часть сериализации без RPC.

В сочетании с системами обмена сообщениями путь обычно (приблизительно) следующий:

  • сериализует сообщение в буфер
  • отправьте этот буфер любыми способами
  • принимающая сторона (и) десериализует (ы) буфер и обрабатывает данные

Если вы планируете отправлять различные виды сообщений по одному и тому же каналу, может быть хорошей идеей иметь структуру конверта union, которая содержит все возможные тела сообщения:

 struct MessageOne {
      // contents of this message type
 }

 struct MessageTwo {
      // contents of this message type
 }

 struct MessageThree {
      // contents of this message type
 }

 union MyMessageEnvelope {
      1: MessageOne   one
      2: MessageTwo   two
      3: MessageThree  three
      // easily extendable 
 }

Чтобы сделать его более элегантным / многократно используемым, можно также реализовать собственный транспорт, чтобы соответствовать потребностям и еще более инкапсулировать логику. Модульная структура Thrift упрощает эту задачу (ссылка, на которую вы ссылаетесь, также относится к этому). В папке /contrib дерева исходных текстов есть несколько примеров, которые могут служить отправной точкой.

Если вы абсолютно не представляете, с чего начать: посмотрите учебное пособие, а затем программы тестового набора, и то, и другое отлично подойдут для обучения начинающим Thrift.

...