Я мигрирую данные один раз из таблицы.Теперь я должен поддерживать двойную запись, для этого я написал наблюдатель двойной записи.
Как только я вставляю данные в мою старую БД, я запускаю асинхронное задание в Сидеки, и эта работа записывает данные в новую БД, но некоторыекак мои записи пропускаются.Я также добавил ведение журнала, но они не приходят в Кибане.
module DualWriteObserver
extend ActiveSupport::Concern
included do
after_save :dual_write
before_save :stop_write
end
def stop_write
result = (Constance.cached_value('WRITE_DISABLE') || []).exclude?(self.class.name)
unless result
Rails.logger.info "WRITE_DISABLE in DualWriteObserver for #{self.class.name}"
self.errors.add(self.class.name.to_sym, "WRITE_DISABLE in DualWriteObserver")
end
result
end
def dual_write(mode = nil)
dual_write_config = DUAL_WRITE_CONFIG[self.class.name]
return unless dual_write_config.present?
return unless (Constance.cached_value('DUAL_WRITE_ENABLED') || []).include?(self.class.name)
if dual_write_config['mode'] == 'now' || mode == 'now'
db = dual_write_config['db']
Multidb.use(db.to_sym) do
# minor optimization to not create model everytime
model_without_callbacks ||= get_model_without_callbacks
existing_entity = model_without_callbacks.find_by_id(self.id)
if existing_entity.present?
# update
existing_entity.update_columns(self.attributes)
else
# create
model_without_callbacks.create!(self.attributes)
end
# Not handling destroy because you shouldn't anyways hard delete (almost) always
end
else #mode = 'async'
async_invoker_class_name = dual_write_config['invoker']
klass = Object.const_get("DualWrite::" + async_invoker_class_name)
klass.perform_async(self.class.name, self.id, self.updated_at)
end
end
private
def get_model_without_callbacks
# Validations need not and callbacks must not happen in dual write
base_table_name = self.class.table_name
Class.new(ActiveRecord::Base) do
self.table_name = base_table_name
self.inheritance_column = nil
end
end
end
module DualWrite
class GenericAsyncDualWriteInvoker
# Extend this class and specify your own sidekiq queue
def perform(class_name, id, updated_at, iteration = 1)
klass = Object.const_get(class_name)
entity = nil
Multidb.use(:slave) do
entity = klass.unscoped.find_by_id(id)
end
unless entity.present?
if iteration <= 3
# If entity not found, maybe because of master-slave replication lag
self.class.perform_in((3 ** iteration).seconds, class_name, id, updated_at, iteration + 1)
else
Rails.logger.info("Async Dual Write failed for #{class_name}, #{id}")
end
return
end
unless entity.updated_at.to_s == updated_at.to_s
# Maybe entity got updated again and this job processing is happening later
Rails.logger.info("Async Dual Write skipped for #{class_name}, #{id}")
return
end
entity.dual_write('now')
end
end
end