Rails ActiveJob показывает устаревшие данные БД - PullRequest
0 голосов
/ 09 февраля 2019

Проблема, которую я пытаюсь решить:

Несколько проектов подключаются / обновляют одну и ту же БД.« Project A » выполняет обновление базы данных, « Project B » - это клиентский интерфейс, отображающий обновленные данные.

Решение:

Когда Project A запланирован для обновления записей, Project B создаст фоновое задание для запуска в течение 15 минут, чтобы отследить сделанные обновления, обновляя интерфейс каждые 15 секунд с помощьюActionCable (webhooks).

Проблема:

ActiveJob завис на "3 remaining records", не замечая каких-либо изменений в БД.Если Модель вызывается напрямую (вне работы), все работает как положено;Изменения БД замечены, интерфейс клиента обновляется, и все довольны.

Но как только он вызывается из задания (для запуска задачи в фоновом режиме), он застревает.


Задание, которое вызывает логику модели:

class BatchWatchJob < ApplicationJob
    queue_as :default

    def perform(**args)
        if args[:batch_number]
            p "=== Begin - BatchWatch for batch_number: #{args[:batch_number]} ==="
            begin
                # Set 15 minute timeout
                # ReImport.watch_batch() not updating 
                # it's count when DB data changes :/
                #   BUT if I call the method directly (not from this Job), it works
                Timeout.timeout(900) { ReImport.watch_batch(args[:batch_number]) }
            rescue Timeout::Error
                puts '=== TIMEOUT ERROR ==='
                puts 'BatchWatch job failed to finish in the requisite amount of time (15 minutes)'
            end
        else
            p "=== Unable to start BatchWatch, batch_number not specified ==="
        end
    end
end

Модель, содержащая мою логику:

class ReImport < ApplicationRecord

        [...]

        def self.watch_batch(num)
                batch = ReImport.where(batch: num)
                total = batch.count
                remaining = batch.where(completed: false) # NOTE: starts with 3 remaining
                remaining_count = remaining.count
                p "initial remaining: #{remaining_count}"
                while !remaining.empty? do
                        p 'wait 10 seconds...'

                        sleep(10) # wait 10 seconds

                        p '...seconds finished...'
                        # batch.reload # doesn't work
                        # remaining.reload # doesn't work
                        batch = ReImport.where(batch: num)
                        remaining = batch.where(completed: false) # NOTE: this should now be 2
                        p "remaining_count: #{remaining_count}"
                        p "remaining . count: #{remaining.count}"
                        p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
                        if remaining_count > remaining.count
                                p '=== WATCH_BATCH: update found! ==='
                                # Update count
                                remaining_count = remaining.count

                                # Broadcast DataTables to update records
                                ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
                        else
                                p '=== WATCH_BATCH: nothing changed yet ==='
                        end 
                end 
                p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
        end

        [...]

end

Консоль Rails - Вывод:

 > BatchWatchJob.perform_later(batch_number: 7)

Enqueued BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) to Async(default) with arguments: {:batch_number=>7}

 => #<BatchWatchJob:0x000000055c2100 @arguments=[{:batch_number=>7}], @job_id="7d1beeaf-c4d8-4489-885c-13b44d1037cf", @queue_name="default", @priority=nil, @executions=0, @provider_job_id="011e004c-34f5-4c7a-9925-052df1aa1774"> 

Performing BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) with arguments: {:batch_number=>7}

"=== Begin - BatchWatch for batch_number: 7 ==="
     (2.1ms)  SET  @@SESSION.sql_mode = CONCAT(CONCAT(@@sql_mode, ',STRICT_ALL_TABLES'), ',NO_AUTO_VALUE_ON_ZERO'),  @@SESSION.sql_auto_is_null = 0, @@SESSION.wait_timeout = 2147483
     (0.9ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7
     (0.7ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0
"initial remaining: 3"
    ReImport Exists (0.5ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1
"wait 10 seconds..."
"...seconds finished..."

"remaining_count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"remaining . count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
    CACHE ReImport Exists (0.0ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1  [["batch", 7], ["completed", 0], ["LIMIT", 1]]

"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"remaining . count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
    CACHE ReImport Exists (0.0ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1  [["batch", 7], ["completed", 0], ["LIMIT", 1]]

"wait 10 seconds..."
[...]
Performed BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) in 22863.44ms

I'mне просто замечая CACHE ReImport Exists, пытаясь понять, как это отключить ...

1 Ответ

0 голосов
/ 09 февраля 2019

Я обнаружил, что проблема в Кэширование SQL , и исправил его с помощью не кэшированного метода ActiveRecords ReImport.uncached do в цикле while , например:

Модель

class ReImport < ApplicationRecord

    [...]

    def self.watch_batch(num)
        batch = ReImport.where(batch: num)
        total = batch.count
        remaining = batch.where(completed: false)
        remaining_count = remaining.count
        p "initial remaining: #{remaining_count}"
        while !remaining.empty? do
            ReImport.uncached do # <--- UNCACHE SQL QUERIES
                p 'wait 10 seconds...'

                sleep(10) # wait 10 seconds

                p '...seconds finished...'
                batch = ReImport.where(batch: num)
                remaining = batch.where(completed: false)
                p "remaining_count: #{remaining_count}"
                p "remaining . count: #{remaining.count}"
                p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
                if remaining_count > remaining.count
                    p '=== WATCH_BATCH: update found! ==='
                    # Update count
                    remaining_count = remaining.count

                    # Broadcast DataTables to update records
                    ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
                else
                    p '=== WATCH_BATCH: nothing changed yet ==='
                end
            end
        end
        p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
    end

    [...]

end

Источник

...