Почему Tensorflow tf.FIFOQueue закрывается рано в следующем коде? - PullRequest
0 голосов
/ 30 мая 2018

Я пытаюсь реализовать очередь, в которой enqueue работает в фоновом режиме и dequeue работает в главном потоке.

Цель состоит в том, чтобы запустить оптимизатор в цикле, который зависит отЗначение сохраняется в буфере и изменяется только с каждым шагом оптимизации.Вот простой пример, чтобы проиллюстрировать:

VarType = tf.int32

data0 = np.array([1.0])

init = tf.placeholder(VarType, [1])
q = tf.FIFOQueue(capacity=1, shapes=[1], dtypes=VarType)
nq_init = q.enqueue(init)
# I use a Variable intermediary because I will want to access the
# data multiple times, but I do not want the next data point in the
# queue until I initialize the variable again.
data_ = tf.Variable(q.dequeue(), trainable=False, collections=[])

# Notice that data_ is accessed twice, but should be the same
# in a single sess.run
# so "data_ = q.dequeue()" would not be correct
# plus there needs to be access to initial data
data1 = data_ + 1
data2 = data_ * data1
qr = tf.train.QueueRunner(q, [q.enqueue(data2)] * 1)
tf.train.add_queue_runner(qr)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    sess.run(nq_init, feed_dict={init:data0})
    # this first initialization works fine
    sess.run(data_.initializer)
    for n in range(10):
        print(sess.run(data2))
        # this second initialization errors out: 
        sess.run(data_.initializer)

    coord.request_stop()
    coord.join(threads)

print('Done')

Этот фрагмент кода выдает ошибку со следующей ошибкой:

"OutOfRangeError (см. Выше для отслеживания): FIFOQueue '_0_fifo_queue' закрытои имеет недостаточные элементы (запрошено 1, текущий размер 0) "

Почему и как это исправить?

1 Ответ

0 голосов
/ 30 мая 2018

Итак, я нашел часть «как это исправить», но не причину.

Кажется, что первая очередь / очередь должна быть запущена до того, как вторая очередь / очередь помещена в коллекцию QUEUE_RUNNERS -но с оговоркой нам нужно запустить sess.run(data_.initializer) дважды:

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    sess.run(nq_init, feed_dict={init:data0})
    sess.run(data_.initializer)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    for n in range(10):
        print(sess.run(data2))
        sess.run(data_.initializer)
        sess.run(data_.initializer)

    coord.request_stop()
    coord.join(threads)

Вывод такой, как ожидалось:

[2]; [6]; [42];...

Без двух вызовов я получаю следующее:

[2]; [6]; [6]; [42];...

Я подозреваю, что q.enqueue имеет свой собственный буфер, который содержит старый data2, поэтому он должен вызываться дважды.Это также соответствует тому, что первое значение не повторяется, потому что в это время второе q.enqueue все еще пусто.Не уверен, как преодолеть эту причуду.

...