Объективировать транзакции и многопоточные задачи, которые не работают должным образом на локальном dev-сервере - PullRequest
0 голосов
/ 24 ноября 2018

Я сталкиваюсь с проблемой при использовании транзакций objectify и задач GAE на локальном dev-сервере app-engine.

Я хочу обработать огромный CSV-файл (анализировать и записывать данныегде-то еще) параллельно.Для этого первый сервлет заботится о:

  • Сохранении сущности Job в хранилище данных, которая идентифицирует всю рабочую единицу и отслеживает общее количество фрагментов и обработанных фрагментов.
  • Разделите большой CSV на более мелкие части.Для каждого из них сохраняется сущность Чанка в хранилище данных, имеющая в качестве родителя задание (с помощью аннотации @parent)
  • После завершения разбиения сервлет запускает столько же задач, сколько было создано чанков.

Каждая задача заботится об обработке своего меньшего объекта csv chunk.

Теперь, возникает проблема .Чтобы определить, когда все фрагменты были правильно обработаны, я хочу, чтобы последний порождал другую задачу, которая завершает какую-то работу.

Для этого идея состоит в том, чтобы использовать транзакцию, чтобы гарантировать, чтокаждая завершенная обработка фрагмента увеличивает счетчик экземпляра задания, чтобы последний мог запустить последнюю задачу.

Однако это не работает .Я пробовал с заданием, разделенным на 7 блоков, и кажется, что два последних блока обрабатываются одновременно (что хорошо!), Но оба они успешны (с разными временами).Как вы можете себе представить, обе задачи начинают выполняться, когда число обработанных кусков равно 5, и обе пытаются установить 6 как обработанные куски.Я ожидаю, что при выходе из транзакции одна из них завершится неудачно, и только у одного появится возможность обновить значение processingChunks родительского объекта.Итак, в следующий раз, когда он будет повторен, отсчет чанка начинается с 6, а не с 5.

Итак, для вашего удобства здесь выполняется фрагмент кода.

// import static com.googlecode.objectify.ObjectifyService.ofy;
// ...
// ...
// Perform the data manipulation on the Chunk
// ...
// Load the parent job from Datastore
// job = ...

// Check if there still are valid chunks to process. If not, we are done!
ImportJob fctUpdatedJob = ofy().transact(() -> {
    long threadId = Thread.currentThread().getId();
    log.info("T{}: Transaction started", threadId);

    ImportJob updatedJob = ofy().load().key(Key.create(ImportJob.class, job.getId())).now();
    log.info("T{}: loaded job {}", threadId, updatedJob);

    int processedChunks = updatedJob.getProcessedChunks() + 1;
    updatedJob.setProcessedChunks(processedChunks);
    updatedJob.setTotalChunkSeconds(updatedJob.getTotalChunkSeconds() + seconds);

    // TODO Double check this stop condition
    if (processedChunks == updatedJob.getTotalChunks() && !updatedJob.isDisposing()) {
        updatedJob.setDisposing(true);
    }

    ofy().save().entity(updatedJob).now();
    log.info("T{}: job saved. Job: {}", threadId, updatedJob);

    return updatedJob;
});

if (job.getProcessedChunks()==job.getTotalChunks()) {
    // We are done!
    // Launch the final task
}

Ниже выводится (только соответствующая часть с указанием резьбы):

INFO: T18: Transaction started
INFO: T18: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=0, totalChunks=7, totalChunkSeconds=0)
INFO: T18: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: Processed 1 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: Transaction started
INFO: T13: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: Processed 2 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: Transaction started
INFO: T17: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: Processed 3 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: Transaction started
INFO: T14: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: Processed 4 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: Transaction started
INFO: T19: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: Processed 5 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: Transaction started
INFO: T22: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: T20: Transaction started
INFO: T20: loaded job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T20: job saved. Job: Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)

Как вы можете видеть, T22 начинается с 5 обработанных фрагментов, сохраняет его до 6, но затем T20 запускается снова с 5 исохраняет его в 6. Таким образом, условие processingChunks == totalChunks никогда не возникает.

Я использую objectify версии 5.1.6, на локальном сервере AppEngine dev и в моем appengine-web.xml я включил эту опцию.

Что мне здесь не хватает?

...