S3Stream закрывается перед обработкой всей полезной нагрузки - PullRequest
0 голосов
/ 18 марта 2019

Я обрабатываю основную нагрузку JSON с s3. код следующий:

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.amazonaws.services.s3.model.S3Object;
import static com.fasterxml.jackson.core.JsonToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;   

public boolean sync(Job job)
        throws IOException

//validating the json payload from s3.
try(InputStream s3Stream = readStreamFromS3()) 
{
    validationService.validate(s3Stream);
} 
catch (S3SdkInteractionException e) {
{ 
   logger.error(e.getLocalizedMessage();
}

//process the json payload from s3.
try (InputStream s3Stream = readStreamFromS3())
{
    syncService.process(s3Stream);
}
catch (S3SdkInteractionException e) {
{
    logger.error(e.getLocalizedMessage();
}
}


public InputSteam readStreamFromS3()
{
    return S3Object.getObjectContent();
}

// Process will sync the user data in the s3 stream. 
// I am not closing the stream till the entire stream is processed.  I 
// need to handle as a stream processing. 
// I dont want keep the contents in memory for processing, not 
   feasible for my use case.
public boolean process(InputStream s3Stream)
{
    jsonFactory = objectMapper.getFactory();   
    try(JsonParser jsonParser = jsonFactory.createParser(s3Stream) {

        JsonToken jsonToken = jsonParser.nextToken();
        List<HttpResponseFuture<UserResponse> userFutures = new ArrayLsit<>(20);
        while(true) {
           for(int i = 0; i < 20; i++)
            {
              try {
                   // stream is processed fully
                    if (jsonToken == null || jsonToken == JSONTOKEN.END_OBJECT) { break; }

                   while (!jsonToken.isStructStart()) {
                           jsonToken = jsonParser.nextToken();    
                       }

                   // Fetch the user record from the stream
                   if (jsonTokenn.isStructStart()) {
                       Map<String,Object> userNode = jsonParser.readValueAs(Map.class);

                      // calling an external service and adding future response
                      userFutures.add(executeAsync(httpClient, userNode);

                    //Move to the next user record
                     if (jsonToken == JSONTOKEN.START_OBJECT) {
                           jsonToken = jsonParser.nextToken();     
                       }
                   }
                 }
              catch (JsonParseException jpe) {
                   logger.error(jpe.getLocalizedMessage());
                   break;
               }
             }

             for(ListenableFuture<UserResponse> responseFuture : Futures.inCompletionOrder(userFutures)) {
                 JsonResponse response = responseFuture.get();
            }

         } 

   }
   return false;
}

Существует сервис A, через который мы загружаем данные (полезную нагрузку json) в S3. Другой сервис B (псевдокод, показанный выше) обработает данные s3 и вызовет другой сервис C для синхронизации данных (полезная нагрузка json) в базовом хранилище.

Проблема:

Я вижу повторное предупреждение s3 в нашем коде. com.amazonaws.services.s3.internal.S3AbortableInputStream Не все байты были прочитаны из S3ObjectInputStream, прервав соединение HTTP. Это, вероятно, ошибка и может привести к неоптимальному поведению. Запросите только те байты, которые вам нужны, через дальний GET или истощите входной поток после использования

Этап проверки выполняется, как и ожидалось, без каких-либо проблем. Однако при синхронизации данных (т.е. syncService.process ()) s3Stream закрывается до обработки всей полезной нагрузки. Поскольку поток становится закрытым до того, как я обработаю весь поток, я нахожусь в несовместимом состоянии.

Информация о зависимостях выглядит следующим образом

AWS-ява-СДК-s3: 1.11.411

гуава: гуава-25,0-JRE

ДЖЕКСОН-ядро: 2.9.6

Полезная нагрузка Json может варьироваться от нескольких МБ до 2 ГБ.

Любая помощь будет оценена.

...