TPL Dataflow двухфазная фиксация - PullRequest
0 голосов
/ 15 марта 2019

Я хотел реализовать что-то вроде протокола двухфазной фиксации для потребления сообщений.

Для этого я сам реализовал ITargetBlock:

  public class Worker : ITargetBlock<Message>
  {
    // Is connected to remote server
    // Maintaining connection removed for brevity in this example
    private bool _isConnectionAlive;
    private readonly ActionBlock<MessageWithSource> _action;

    public Worker()
    {
      _action = new ActionBlock<MessageWithSource>(DoWork);
    }

    public DataflowMessageStatus OfferMessage(
      DataflowMessageHeader messageHeader, Message messageValue,
      ISourceBlock<Message> source, bool consumeToAccept)
    {
      if (consumeToAccept || source == null)
      {
        return DataflowMessageStatus.Declined;
      }

      if (!_isConnectionAlive)
      {
        return DataflowMessageStatus.Postponed;
      }

      var reservedMessage = source.ReserveMessage(messageHeader, this);
      if (reservedMessage)
      {
        _action.Post(new MessageWithSource(messageValue, source, messageHeader));
      }

      return DataflowMessageStatus.Postponed;
    }

    // Other methods removed for brevity

    private async Task DoWork(MessageWithSource value)
    {
      try
      {
        // sending message to the server removed for brevity


        // commit that we finished processing without error
        var message = value.SourceBlock.ConsumeMessage(value.MessageHeader, this, out _);

        if (message != value.Message)
        {
          // In which cases can we get here?
          throw new InvalidOperationException("Consumed some other message... oh my");
        }
      }
      catch (WebSocketException)
      {
        // Release reservation if we can't finish work, so other Workers can pickup this message and process it
        value.SourceBlock.ReleaseReservation(value.MessageHeader, this);
      }
    }

    private class MessageWithSource
    {
      public Message Message { get; }
      public ISourceBlock<Message> SourceBlock { get; }
      public DataflowMessageHeader MessageHeader { get; }
    }
  }

В docs он говорит, что ConsumeMessage может вернуть экземпляр, отличный от того, который был предложен ранее.

Интересно, в каких случаях и как это происходит?

...