У меня проблема с моим приложением Ruby on Rails. Итак, есть рабочие, которые слушают разные темы rabbitMQ. Каждый рабочий вносит некоторые изменения в его таблицу DB (mariaDB) и в конце обновляет поле last_connection в общем объекте device, где у меня проблема.
Это один рабочий:
include Sneakers::Worker
# This worker will connect to "queue" queue
# env is set to nil since by default the actuall queue name would be
# "queue_development"
from_queue "sensor_log"
# work method receives message payload in raw format
def work(raw_message)
logger.info ("sensor_log " + raw_message)
msg = JSON.parse(raw_message)
# msg = {"deviceId" => 102,"timestamp" => 1487318555,"sensor" => 5, "values" => [1,2,3,4,5,6,7,8], "isNewSensor" => false, "logDelayed" => false}
begin
@device = Device.find(msg["deviceId"])
ActiveRecord::Base.transaction do
# Initialize
timestamp = Time.at(msg["timestamp"])
MiddlewareLog.create(
device_id: msg["deviceId"],
message: JSON.pretty_generate(msg),
queue: MiddlewareLog.queues[:sensor_log]
)
if(msg["ext_brd_type"] == 6)
logger.info ("Logging external sensors lm2, lm4, lm4")
# Logging external sensors lm2, lm4, lm4
Sensors::SENSORS_EXT.map do |code|
SensorLog.create(
device_id: msg["deviceId"],
sensor: code,
state: msg[code],
value: msg[code],
is_delayed: msg["logDelayed"],
created_at: timestamp
)
end
else
logger.info ("Logging native sensors")
# Logging native
device_sensors = @device.new_version? ? Sensors::SENSORS_CODES_NEW : Sensors::SENSORS_CODES
@sensors = device_sensors.reject{|code, sensor| code & msg["sensor"] == 0}
@sensors.map do |code, sensor|
SensorLog.create(
device_id: msg["deviceId"],
sensor: sensor[:name],
state: sensor[:state],
value: msg["values"][sensor[:bit_position]],
is_delayed: msg["logDelayed"],
created_at: timestamp
)
end
Rollbar.warning("Unknown device sensor", :message => msg, :sensor => msg["sensor"]) if @sensors.empty?
@device.update_sensors_state(msg["values"]) if @sensors.any?
end
# Avoid updated_at deadlock
@device.save!(touch: false)
end
# Touch updated_at and last_connection_at
@device.touch(:last_connection_at)
ack! # we need to let queue know that message was received
rescue => exception
logger.error ("sensors_log exception:")
logger.error exception
Rollbar.error(exception, :message => msg)
requeue!
end
end
end
Вот второй:
class SystemLogsWorker
include Sneakers::Worker
# This worker will connect to "queue" queue
# env is set to nil since by default the actuall queue name would be
# "queue_development"
from_queue "system_log"
# @logger = Logger.new(STDOUT)
# @logger.level = Logger::INFO
# work method receives message payload in raw format
def work(raw_message)
# @logger.info raw_message
logger.info ("system_log " + raw_message)
msg = JSON.parse(raw_message)
# msg = {"deviceId":102,"timestamp":1487318555,"system":2069,"logDelayed":false,"fault_code":1}
begin
@device = Device.find(msg["deviceId"])
ActiveRecord::Base.transaction do
# Initialize
timestamp = Time.at(msg["timestamp"])
MiddlewareLog.create(
device_id: msg["deviceId"],
message: JSON.pretty_generate(msg),
queue: MiddlewareLog.queues[:system_log]
)
@system = Systems::EVENTS_CODES[msg["system"]]
# @logger.warn("Unknown device system", :message => msg, :system => msg[:system]) unless @system
# logger.warn("Unknown device system", :message => msg, :system => msg["system"]) unless @system
# Rollbar.warning("Unknown device system", :message => msg, :system => msg["system"]) unless @system
logger.warn("Unknown device system. Message:" + raw_message) unless @system
Rollbar.warning("Unknown device system. Message:" + raw_message) unless @system
# Loggin
system_log = SystemLog.create(
device_id: msg["deviceId"],
system: @system[:name],
state: @system[:state],
is_delayed: msg["logDelayed"],
fault_code: msg["fault_code"],
created_at: timestamp
) if @system
@device.update_systems_state(system_log) if @system
# Avoid updated_at deadlock
@device.save!(touch: false)
end
# Touch updated_at and last_connection_at
@device.touch(:last_connection_at)
ack! # we need to let queue know that message was received
rescue => exception
logger.error ("system_log exception:")
logger.error exception
Rollbar.error(exception, :message => msg)
requeue!
end
end
end
Во время выполнения я получаю сообщение:
2020-06-18T11: 09: 08Z p-13299 t-gmvtsrza c ОШИБКА: исключение sensor_log: 2020-06-18T11: 09: 08Z p-13299 t-gmvtsrza c ОШИБКА: Mysql2 :: Ошибка: обнаружена взаимоблокировка при попытке получить блокировку; попробуйте перезапустить транзакцию: UPDATE devices
SET devices
. updated_at
= '2020-06-18 11:09:08', devices
. last_connection_at
= '2020-06-18 11:09:08' ГДЕ devices
. id
= 3024
2020-06-18T11: 09: 08Z p-13299 t-gmvtsq74w ОШИБКА: исключение system_log: 2020-06-18T11: 09: 08Z p-13299 t- gmvtsq74w ОШИБКА: Mysql2 :: Ошибка: обнаружена тупиковая ситуация при попытке получить блокировку; попробуйте перезапустить транзакцию: UPDATE devices
SET devices
. updated_at
= '2020-06-18 11:09:08', devices
. last_connection_at
= '2020-06-18 11:09:08' ГДЕ devices
. id
= 3024
Я думаю, что проблема в @device.touch(:last_connection_at)
, потому что за один раз оба воркера пытаются обновить одну строку таблицы. Я не очень хорошо разбираюсь в ruby и буду рад любой помощи с этим.