ReadOnlySequence - разрез на заданную позицию SequencePosition + 1 - PullRequest
2 голосов
/ 25 октября 2019

Я пытаюсь прочитать некоторые данные из ReadOnlySequence. Данные форматируются как кадры. Каждый кадр заканчивается нулевым байтом (октет 0).

Мой код ищет конец кадра, используя ReadOnlySequence.PositionOf. Когда он находит NULL-байт, он обрабатывает все байты до позиции NULL-байта. После обработки я хотел бы обработать следующий кадр, разрезав вход и повторив предыдущие шаги. Поскольку кадр заканчивался до байта NULL, байт NULL был бы частью следующей последовательности байтов, если бы я не разрезал входные данные снова (start = 1).

Это способ нарезатьReadOnlySequence с SequencePosition + 1 элементом / байтом в качестве начального значения?

Я пытался использовать SequencePosition.GetInteger + 1 в качестве начального значения, но это не работает, так как GetInteger иногда возвращает значения больше, чемдлина ReadOnlySequence. При выделении значения, возвращаемого GetInteger, возникает следующее исключение: System.ArgumentOutOfRangeException: Specified argument was out of the range of valid values. (Parameter 'start')

Пример минимальной воспроизводимости

using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class Program
    {
        private static IDuplexPipe _pipe;

        public static async Task Main( String[] args )
        {
            var pipe = new Pipe();
            _pipe = new DuplexPipe( pipe.Reader, pipe.Writer );

            var firstMessage = Encoding.UTF8.GetBytes( "CONNECTED\nversion:1.1\nsession:2a840965\nserver:ActiveMQ-Artemis/2.8.0 ActiveMQ Artemis Messaging Engine\nheart-beat:10000,10000\n\n\0\n" );
            await _pipe.Output.WriteAsync( firstMessage );
            await _pipe.Output.FlushAsync();

            var secondMessage =
                Encoding.UTF8.GetBytes(
                    "\n\nMESSAGE\nsubscription:test-839c7766-0f38-4579-a3fc-74de35408536Sub1\ncontent-length:4\nmessage-id:2147486350\ndestination:/queue/TestQ\nexpires:1572278642017\nredelivered:false\npriority:5\npersistent:true\ntimestamp:1572278582050\ndestination-type:ANYCAST\nreceipt:2\ntest:test\nNMSXDeliveryMode:true\ntransformation:jms-byte\ntimestamp:1572278582017\n\nHello World\0\n" );
            await _pipe.Output.WriteAsync( secondMessage );
            await _pipe.Output.FlushAsync();

            var readResult = await _pipe.Input.ReadAsync();
            var buffer = readResult.Buffer;
            while ( TryParseFrame( ref buffer ) )
            {
                // ...
            }

            _pipe.Input.AdvanceTo( buffer.Start, buffer.End );

            Console.ReadLine();
        }

        private static Boolean TryParseFrame( ref ReadOnlySequence<Byte> inputBuffer )
        {
            var endOfFrame = inputBuffer.PositionOf( ByteConstants.Null );
            if ( endOfFrame == null )
                return false;

            var frameBuffer = inputBuffer.Slice( 0, endOfFrame.Value );
            // parse and process the frame...

            // This works....
            //inputBuffer = inputBuffer.Slice( frameBuffer.End );
            //inputBuffer = inputBuffer.Slice( 1 );

            // This does NOT.
            try
            {
                var end = frameBuffer.End.GetInteger();
                var length = inputBuffer.Length;
                Console.WriteLine( $" END: {end}, LENGTH: {length} " );
                inputBuffer = inputBuffer.Slice( end + 1 );
            }
            catch ( Exception ex )
            {
                Console.WriteLine( ex );
                // Make sure we can read the next frame...
                inputBuffer = inputBuffer.Slice( frameBuffer.End );
                inputBuffer = inputBuffer.Slice( 1 );
            }

            return true;
        }
    }

    public class DuplexPipe : IDuplexPipe
    {
        public DuplexPipe( PipeReader input, PipeWriter output )
        {
            Input = input;
            Output = output;
        }

        public PipeReader Input { get; }
        public PipeWriter Output { get; }
    }

    public static class ByteConstants
    {
        public const Byte HeaderDelimiter = 58;
        public const Byte LineFeed = 10;
        public const Byte Null = 0;
    }
}
...