Как создать собственный BodyPublisher для Java 11 HttpRequest - PullRequest
4 голосов
/ 07 августа 2020

Я пытаюсь создать собственный BodyPublisher, который десериализует мой JSON объект. Я мог бы просто десериализовать JSON при создании запроса и использовать метод ofByteArray для BodyPublishers, но я бы предпочел использовать настраиваемого издателя.

public class CustomPublisher implements HttpRequest.BodyPublisher {
    private byte[] bytes;
    
    public CustomPublisher(ObjectNode jsonData) {
        ...
        // Deserialize jsonData to bytes
        ...
    }
    
    @Override
    public long contentLength() {
        if(bytes == null) return 0;
        return bytes.length
    }
    
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);       
    }

    private CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber<? super ByteBuffer> subscriber;
         private boolean cancelled;
         private Iterator<Byte> byterator;

         private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List<Byte> bytelist = new ArrayList<>();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if(cancelled) return;
             if(n < 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if(byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}

Эта реализация работает, но только if subscriptions request метод вызывается с 1 в качестве параметра. Но вот что происходит, когда я использую его с HttpRequest.

Я почти уверен, что это никоим образом не является предпочтительным или оптимальным способом создания пользовательской подписки, но я еще не нашел лучшего способа заставить его работать .

Я был бы очень признателен, если бы кто-нибудь мог указать мне лучший путь.

1 Ответ

0 голосов
/ 10 августа 2020

Вы правы, избегая создания из него байтового массива, так как это может создать проблемы с памятью для больших объектов.

Я бы не стал писать собственный издатель. Вместо этого просто воспользуйтесь заводским методом HttpRequest.BodyPublishers.ofInputStream .

HttpRequest.BodyPublisher publisher =
    HttpRequest.BodyPublishers.ofInputStream(() ->  {
        PipedInputStream in = new PipedInputStream();

        ForkJoinPool.commonPool().submit(() -> {
            try (PipedOutputStream out = new PipedOutputStream(in)) {
                objectMapper.writeTree(
                    objectMapper.getFactory().createGenerator(out),
                    jsonData);
            }
            return null;
        });

        return in;
    });

Как вы заметили, вы можете использовать HttpRequest.BodyPublishers.ofByteArray. Это нормально для относительно небольших объектов, но я программирую на масштабируемость по привычке. Проблема с предположением, что код не нужно масштабировать, заключается в том, что другие разработчики будут считать, что передача больших объектов безопасна, не осознавая влияния на производительность.

Написание собственного основного издателя потребует много работы. Его метод subscribe наследуется от Flow.Publisher .

Документация по методу subscribe начинается со следующего:

Добавляет данного подписчика, если возможно.

Каждый раз, когда вызывается ваш метод subscribe, вам нужно добавить подписчика в какую-то коллекцию, вам нужно создать реализацию Flow.Subscription , и вам нужно сразу передать его в метод подписчика onSubscribe. Ваш объект реализации подписки должен отправить обратно один или несколько ByteBuffers, только когда вызывается метод request подписки, вызывая соответствующий метод подписчика (а не только любой подписчик) onNext , и после того, как вы отправили все данные, вы должны вызвать тот же метод подписчика onComplete(). Кроме того, объект реализации подписки должен обрабатывать cancel запросов.

Вы можете значительно упростить это, расширив SubmissionPublisher , который является реализацией Flow.Publisher по умолчанию. , а затем добавив к нему метод contentLength(). Но, как показывает документация SubmissionPublisher, вам еще предстоит проделать изрядный объем работы, даже для минимальной работающей реализации.

Методы HttpRequest.BodyPublishers.of… сделают все это за вас. ofByteArray подходит для небольших объектов, но ofInputStream подойдет для любого объекта, через который вы когда-либо могли бы пройти.

...