Превратить функции с обратным вызовом в генераторы Python? - PullRequest
29 голосов
/ 02 апреля 2012

Функция минимизации Scipy (используется только в качестве примера) имеет возможность добавления функции обратного вызова на каждом шаге.Так что я могу сделать что-то вроде,

def my_callback(x):
    print x
scipy.optimize.fmin(func, x0, callback=my_callback)

Есть ли способ использовать функцию обратного вызова для создания генераторной версии fmin, чтобы я мог сделать,

for x in my_fmin(func,x0):
    print x

Кажется,как это может быть возможно с некоторой комбинацией урожайности и посылки, но я вполне могу думать о чем угодно.

Ответы [ 5 ]

15 голосов
/ 02 апреля 2012

Как указано в комментариях, вы можете сделать это в новой теме, используя Queue. Недостаток в том, что вам все еще нужен какой-то способ доступа к конечному результату (что возвращает fmin в конце). В моем примере ниже используется необязательный обратный вызов, чтобы что-то с ним сделать (другой вариант - просто выдать его тоже, хотя ваш вызывающий код должен будет различать результаты итерации и окончательные результаты):

from thread import start_new_thread
from Queue import Queue

def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):

    q = Queue() # fmin produces, the generator consumes
    job_done = object() # signals the processing is done

    # Producer
    def my_callback(x):
        q.put(x)
    def task():
        ret = scipy.optimize.fmin(func,x0,callback=my_callback)
        q.put(job_done)
        end_callback(ret) # "Returns" the result of the main call

    # Starts fmin in a new thread
    start_new_thread(task,())

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item

Обновление: , чтобы заблокировать выполнение следующей итерации, пока потребитель не закончит обработку последней, также необходимо использовать task_done и join.

    # Producer
    def my_callback(x):
        q.put(x)
        q.join() # Blocks until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item
        q.task_done() # Unblocks the producer, so a new iteration can start

Обратите внимание, что maxsize=1 не требуется, поскольку новый элемент не будет добавлен в очередь до тех пор, пока не будет израсходован последний элемент.

Обновление 2: Также обратите внимание, что, если все генераторы не получат все элементы, созданный поток будет заблокирован (он будет заблокирован навсегда, а его ресурсы никогда не будут освобождены). Производитель ожидает в очереди, и, поскольку он хранит ссылку на эту очередь, он никогда не будет возвращен gc, даже если он является потребителем. Тогда очередь станет недоступной, поэтому никто не сможет снять блокировку.

Чистое решение для этого неизвестно, если вообще возможно (поскольку оно будет зависеть от конкретной функции, используемой вместо fmin). Обходной путь может быть сделан с использованием timeout, если производитель вызывает исключение, если put блокирует слишком долго:

    q = Queue(maxsize=1)

    # Producer
    def my_callback(x):
        q.put(x)
        q.put("dummy",True,timeout) # Blocks until the first result is retrieved
        q.join() # Blocks again until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        q.task_done()                   # (one "task_done" per "get")
        if next_item is job_done:
            break
        yield next_item
        q.get() # Retrieves the "dummy" object (must be after yield)
        q.task_done() # Unblocks the producer, so a new iteration can start
6 голосов
/ 29 апреля 2016

Генератор в качестве сопрограммы (без резьбы)

Давайте получим FakeFtp с функцией retrbinary с использованием обратного вызова, вызываемого при каждом успешном чтении фрагмента данных:

class FakeFtp(object):
    def __init__(self):
        self.data = iter(["aaa", "bbb", "ccc", "ddd"])

    def login(self, user, password):
        self.user = user
        self.password = password

    def retrbinary(self, cmd, cb):
        for chunk in self.data:
            cb(chunk)

Недостатком использования простой функции обратного вызова является то, что она вызывается повторно, а функция обратного вызова функция не может легко сохранять контекст между вызовами.

Следующий код определяет process_chunks генератор, который сможет получать порции данных один по одному и обрабатывая их. В отличие от простого обратного вызова, здесь мы можем сохранить все обработка внутри одной функции без потери контекста.

from contextlib import closing
from itertools import count


def main():
    processed = []

    def process_chunks():
        for i in count():
            try:
                # (repeatedly) get the chunk to process
                chunk = yield
            except GeneratorExit:
                # finish_up
                print("Finishing up.")
                return
            else:
                # Here process the chunk as you like
                print("inside coroutine, processing chunk:", i, chunk)
                product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
                processed.append(product)

    with closing(process_chunks()) as coroutine:
        # Get the coroutine to the first yield
        coroutine.next()
        ftp = FakeFtp()
        # next line repeatedly calls `coroutine.send(data)`
        ftp.retrbinary("RETR binary", cb=coroutine.send)
        # each callback "jumps" to `yield` line in `process_chunks`

    print("processed result", processed)
    print("DONE")

Чтобы увидеть код в действии, поместите класс FakeFtp, код, показанный выше, и следующую строку:

main()

в один файл и назовите его:

$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE

Как это работает

processed = [] здесь, чтобы показать, генератор process_chunks не должен иметь никаких проблем с сотрудничать с его внешним контекстом. Все завернуто в def main():, чтобы доказать, нет необходимости используйте глобальные переменные.

def process_chunks() является ядром решения. Может иметь один входной параметр (не используется здесь), но основной точкой, где он получает ввод, является каждая строка yield, возвращающая то, что кто-то отправляет через .send(data) в экземпляр этого генератора. Можно coroutine.send(chunk), но в этом примере это делается с помощью обратного вызова, ссылающегося на эту функцию callback.send.

Обратите внимание, что в реальном решении нет проблем иметь несколько yield s в коде, они обрабатывается по одному. Это может быть использовано, например, читать (и игнорировать) заголовок файла CSV, а затем продолжить обработку записей с данными.

Мы могли бы создать экземпляр и использовать генератор следующим образом:

coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()

ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`

# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()

Реальный код использует contextlib closing менеджер контекста, чтобы убедиться, что coroutine.close() всегда звонил.

Выводы

Это решение не предоставляет своего рода итератор для использования данных в традиционном стиле "из снаружи ". С другой стороны, мы можем:

  • использовать генератор "изнутри"
  • хранить всю итеративную обработку внутри одной функции без прерывания между обратными вызовами
  • опционально использовать внешний контекст
  • предоставляет полезные результаты за пределами
  • все это можно сделать без использования потоков

Кредиты : Решение в значительной степени вдохновлено ответом SO Итератор Python FTP «chunk» (без загрузки всего файла в память) Автор: user2357112

6 голосов
/ 02 апреля 2012

Концепция Используйте очередь блокировки с maxsize=1 и модель производителя / потребителя.

Производится обратный вызов, затем следующий вызов обратного вызова блокируется в полной очереди.

Потребитель затем возвращает значение из очереди, пытается получить другое значение и блокирует чтение.

Производителю разрешено выдвигать в очередь, промывать и повторять.

Использование:

def dummy(func, arg, callback=None):
  for i in range(100):
    callback(func(arg+i))

# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
  print(i)

# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
   print(i)

Может использоваться как и ожидалось для итератора:

for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
  print(i)

Итеративный класс:

from thread import start_new_thread
from Queue import Queue

class Iteratorize:
  """ 
  Transforms a function that takes a callback 
  into a lazy iterator (generator).
  """
  def __init__(self, func, ifunc, arg, callback=None):
    self.mfunc=func
    self.ifunc=ifunc
    self.c_callback=callback
    self.q = Queue(maxsize=1)
    self.stored_arg=arg
    self.sentinel = object()

    def _callback(val):
      self.q.put(val)

    def gentask():
      ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
      self.q.put(self.sentinel)
      if self.c_callback:
        self.c_callback(ret)

    start_new_thread(gentask, ())

  def __iter__(self):
    return self

  def next(self):
    obj = self.q.get(True,None)
    if obj is self.sentinel:
     raise StopIteration 
    else:
      return obj

Вероятно, можно выполнить некоторую очистку, чтобы принять *args и **kwargs для функции, которая будет упакована, и / или обратный вызов окончательного результата.

1 голос
/ 26 апреля 2019

Более питонический способ

Решение с использованием threading и queue хорошо, но не питонно, вот лучший способ (по крайней мере для меня ~) с использованием subprocess:

import pickle
import scipy
import select
import subprocess

def my_fmin(func, x0):
    # open a process to use as a pipeline
    proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    def my_callback(x):
        # x might be any object, not only str, so we use pickle to dump it
        proc.stdin.write(pickle.dumps(x) + '\n')

    scipy.optimize.fmin(func, x0, callback=my_callback)

    # use select in case that the callback is asynchronous;
    # otherwise, you can simply close proc.stdin and iterate over proc.stdout
    while select.select([proc.stdout], [], [], 0)[0]:
        yield pickle.loads(proc.stdout.readline()[:-1])

    # close the process
    proc.communicate()

Тогда вы можете использовать такую ​​функцию:

for x in my_fmin(func, x0):
    print x
0 голосов
/ 02 апреля 2012

Как насчет

data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
    print line

Если нет, что именно вы хотите сделать с данными генератора?

...