Я обрабатываю основную нагрузку JSON с s3.
код следующий:
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.amazonaws.services.s3.model.S3Object;
import static com.fasterxml.jackson.core.JsonToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
public boolean sync(Job job)
throws IOException
//validating the json payload from s3.
try(InputStream s3Stream = readStreamFromS3())
{
validationService.validate(s3Stream);
}
catch (S3SdkInteractionException e) {
{
logger.error(e.getLocalizedMessage();
}
//process the json payload from s3.
try (InputStream s3Stream = readStreamFromS3())
{
syncService.process(s3Stream);
}
catch (S3SdkInteractionException e) {
{
logger.error(e.getLocalizedMessage();
}
}
public InputSteam readStreamFromS3()
{
return S3Object.getObjectContent();
}
// Process will sync the user data in the s3 stream.
// I am not closing the stream till the entire stream is processed. I
// need to handle as a stream processing.
// I dont want keep the contents in memory for processing, not
feasible for my use case.
public boolean process(InputStream s3Stream)
{
jsonFactory = objectMapper.getFactory();
try(JsonParser jsonParser = jsonFactory.createParser(s3Stream) {
JsonToken jsonToken = jsonParser.nextToken();
List<HttpResponseFuture<UserResponse> userFutures = new ArrayLsit<>(20);
while(true) {
for(int i = 0; i < 20; i++)
{
try {
// stream is processed fully
if (jsonToken == null || jsonToken == JSONTOKEN.END_OBJECT) { break; }
while (!jsonToken.isStructStart()) {
jsonToken = jsonParser.nextToken();
}
// Fetch the user record from the stream
if (jsonTokenn.isStructStart()) {
Map<String,Object> userNode = jsonParser.readValueAs(Map.class);
// calling an external service and adding future response
userFutures.add(executeAsync(httpClient, userNode);
//Move to the next user record
if (jsonToken == JSONTOKEN.START_OBJECT) {
jsonToken = jsonParser.nextToken();
}
}
}
catch (JsonParseException jpe) {
logger.error(jpe.getLocalizedMessage());
break;
}
}
for(ListenableFuture<UserResponse> responseFuture : Futures.inCompletionOrder(userFutures)) {
JsonResponse response = responseFuture.get();
}
}
}
return false;
}
Существует сервис A, через который мы загружаем данные (полезную нагрузку json) в S3.
Другой сервис B (псевдокод, показанный выше) обработает данные s3 и вызовет другой сервис C для синхронизации данных (полезная нагрузка json) в базовом хранилище.
Проблема:
Я вижу повторное предупреждение s3 в нашем коде. com.amazonaws.services.s3.internal.S3AbortableInputStream Не все байты были прочитаны из S3ObjectInputStream, прервав соединение HTTP. Это, вероятно, ошибка и может привести к неоптимальному поведению. Запросите только те байты, которые вам нужны, через дальний GET или истощите входной поток после использования
Этап проверки выполняется, как и ожидалось, без каких-либо проблем.
Однако при синхронизации данных (т.е. syncService.process ()) s3Stream закрывается до обработки всей полезной нагрузки.
Поскольку поток становится закрытым до того, как я обработаю весь поток, я нахожусь в несовместимом состоянии.
Информация о зависимостях выглядит следующим образом
AWS-ява-СДК-s3: 1.11.411
гуава: гуава-25,0-JRE
ДЖЕКСОН-ядро: 2.9.6
Полезная нагрузка Json может варьироваться от нескольких МБ до 2 ГБ.
Любая помощь будет оценена.