Я пытаюсь обрабатывать данные, которые постоянно поступают в нейронную сеть. Для оптимизации всего процесса у меня есть:
- Нить, отвечающая за заполнение FIFOQueue.
- Другой поток, который должен извлечь как можно больше элементов из очереди до порогового значения. То есть: min (queue.size (), MAX_BATCH_SIZE). MAX_BATCH_SIZE - это максимум, который я могу обрабатывать одновременно.
Мне удалось заставить его работать, но это что-то рудиментарное, и мне интересно, есть ли лучшая реализация этого метода. Я сосредотачиваюсь на потребительском потоке очереди.
Определение операций:
self.queue = tf.FIFOQueue ...
self.input_dequeue_op = self.queue.dequeue_many(
tf.maximum(
tf.constant(1),
tf.minimum(
tf.constant(MAX_BATCH_SIZE),
self.queue.size()
)
)
)
Потребительская резьба:
with network.sess.as_default():
while True:
results = network.sess.run(self.input_dequeue_op)
По сути, я намереваюсь отправить в нейронную сеть максимальное количество элементов, которые в данный момент находятся в очереди, до MAX_BATCH_SIZE (это максимальное значение, которое может обрабатываться одновременно).
Операция получает максимальную реальную партию, которую я могу загрузить.
Мне кажется, это очень элементарная форма, я также пытался использовать ее напрямую:
self.input_dequeue_op = self.queue.dequeue_up_to(tf.constant(MAX_BATCH_SIZE))
Но очередь блокируется на неопределенный срок, даже если добавляются элементы.
Есть ли другой лучший способ сделать это?