Как исправить RuntimeError: deque мутировал во время итерации в Python - PullRequest
0 голосов
/ 08 июля 2019

Я пытаюсь реализовать имитацию рынка для алгоритмической торговли, и я нашел этот код на github https://github.com/DrAshBooth/PyLOB. Проблема в том, что я запускаю свой код для небольшого окна, например, 2 дня, всев порядке, и я получаю ожидаемые результаты.Но когда я увеличиваю окно до 20 дней и более, я получаю «RuntimeError: deque mutated во время итерации».Я проверил свой код, но я никогда не нашел ничего, что могло бы поменять deque во время моих пробежек.Ниже приведена часть кода, которая выдает ошибку:

    self.tape = deque(maxlen=None)
    .
    .
    .
    def avg_volume_traded(self):
       if self.tape != None and len(self.tape) > 0:
          num = 0
          total_volume = 0
          for entry in self.tape:
             total_volume = entry['qty'] + total_volume
             num += 1
          if num != 0 and total_volume != None:
             return total_volume, num
       else:
          return None, None

Это фактическое сообщение об ошибке:

    Exception in thread Thread-10986:
    Traceback (most recent call last):
      File "/home/hamid/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
        self.run()
      File "/home/hamid/anaconda3/lib/python3.6/threading.py", line 864, in run
        self._target(*self._args, **self._kwargs)
      File "exchange_envirnment.py", line 60, in _doit
        self.func(self.args[0], self.args[1])
      File "/home/hamid/dukto/src2/src_new/traders/market_maker_trader.py", line 46, in trade
        self.type_three(lob_obj, reporter_obj)
      File "/home/hamid/dukto/src2/src_new/traders/market_maker_trader.py", line 285, in type_three
        max_volume = lob_obj.max_volume_traded()
      File "/home/hamid/dukto/src2/src_new/PyLOB/orderbook.py", line 395, in max_volume_traded
        for entry in self.tape:
    RuntimeError: deque mutated during iteration

Это основная часть, которая использует многопоточность (классPeriodic и day_period):

class Periodic(object):
    def __init__(self, object, compression_factor, args=[], kwargs={}):
        self.compression_factor = compression_factor
        self.object = object
        self.func = object.trade
        self.args = args
        self.kwargs = kwargs
        self.seppuku = Event()
    def start(self):
        self.seppuku.clear()
        self.proc = Thread(target=self._doit)
        self.proc.start()
    def stop(self):
        self.seppuku.set()
        self.proc.join()
    def _doit(self):
        while True:
            self.seppuku.wait(self.object.interval / self.compression_factor)
            if self.seppuku.is_set():
                break
            self.func(self.args[0], self.args[1])

class day_period(object):
    def __init__(self, object, compression_factor, args=[], kwargs={}):
        self.period = (3600 * 4) / compression_factor
        self.func = object.run
        self.args = args
        self.kwargs = kwargs
        self.seppuku = Event()
    def start(self):
        self.seppuku.clear()
        self.proc = Thread(target=self._doit)
        self.proc.start()
    def stop(self):
        self.seppuku.set()
        self.proc.join()
    def _doit(self):
        while True:
            self.seppuku.wait(self.period)
            if self.seppuku.is_set():
                break
            self.func(self.args)

class intra_day_traders_mng(object):
    def __init__(self, simulation_config):
        self.config = simulation_config
        self.agents_list = []
        self.agents_dict = {}
        self.p_list = []
        self.compression_factor = simulation_config['simulation_config']['compression_factor']
        self.trader_creator()
        self.first_time = True
        self.day_of_simulation  = simulation_config['simulation_config']['day_number']

    def trader_creator(self):
        for agent_name in self.config['agents']['intra_day']:
            for config in self.config['agents']['intra_day'][agent_name]:
                if agent_name == 'nonclassified_trader':
                    for k in range(config['n_traders']):
                        self.agents_list.append(NON_CLASSIFIED_TRADER_INTRADAY(config))
                        time.sleep(.1)
        for agent_name in self.config['agents']['daily']:
            for config in self.config['agents']['daily'][agent_name]:
                if agent_name == 'nonclassified_trader':
                    for k in range(config['n_traders']):
                        self.agents_list.append(NON_CLASSIFIED_TRADER_DAILY(config))
                        time.sleep(0.1)
                if agent_name == "market_maker_trader":
                    for k in range(config['n_traders']):
                        self.agents_list.append(MARKET_MAKER_TRADER_DAILY(config))
                        time.sleep(0.1)
        for agent in self.agents_list:
            self.agents_dict.update({agent.id: agent})
        for agent in self.agents_list:
            agent.set_trader_dict(self.agents_dict)

    def random_initial(self):
        agents_random_list = random.choices(self.agents_list, k=len(self.agents_list))
        return agents_random_list

    def run(self, args):
        lob = args[0]
        reporter_obj = args[1]
        # when the trader running for first time
        if self.first_time == True:
            lob.time_obj.reset()
            agents_random_list = self.random_initial()
            for agent in agents_random_list:
                self.p_list.append(Periodic(agent, self.compression_factor, args=(lob,reporter_obj)))
                self.p_list[-1].start()
                time.sleep(.1)
            self.first_time = False
        else:
            for proc in self.p_list:
                proc.stop()
            for agent in self.agents_list:
                agent.reset_trader(lob)
            time_series = lob.ohcl()
            if len(time_series) == self.day_of_simulation :
                out = {'out':time_series}
                with open('output.json', 'w') as outfile:
                    json.dump(out, outfile)
                reporter_obj.save_as_csv()
                trade_summary = lob.trade_summary()
                with open('trade_report.csv', 'w') as csvFile:
                    writer = csv.writer(csvFile)
                    writer.writerows(trade_summary)
                csvFile.close()
                sys.exit()
            print("***********************************************************************************")
            print("day is:",lob.time_obj.day)
            lob.time_obj.reset()
            for proc in self.p_list:
                proc.start()
                time.sleep(.1)

if __name__ == '__main__':
    with open('config.json', 'r') as f:
        simulation_config = json.load(f)
    intra_day_mng_obj = intra_day_traders_mng(simulation_config)
    reporter_obj = REPORTER()
    # for synchronization of time
    time_obj = TIME_MANAGE(compression_factor=simulation_config['simulation_config']['compression_factor'])
    lob = OrderBook(time_obj, tick_size=simulation_config['simulation_config']['tickSize'])
    day_period(intra_day_mng_obj, simulation_config['simulation_config']['compression_factor'], args=(lob,reporter_obj)).start()

И, наконец, «Книга заказов», которая определяет «self.tape» в следующем коде:

class OrderBook():
    def __init__(self, time_obj, tick_size=0.0001):
        self.tape = deque(maxlen=None)  # Index [0] is most recent trade
        self.bids = OrderTree()
        self.asks = OrderTree()
        self.lastTick = None
        self.lastTimestamp = 0
        self.tickSize = tick_size
        self.time = 0
        self.nextQuoteID = 0
        self.time_series = []
        self.time_obj = time_obj

    def clipPrice(self, price):
        return round(price, int(math.log10(1 / self.tickSize)))

    def updateTime(self):
        self.time = int(self.time_obj.now()['time'])

    def processOrder(self, quote, fromData, verbose):
        orderType = quote['type']
        orderInBook = None
        if fromData:
            self.time = quote['timestamp']
        else:
            self.updateTime()
            quote['timestamp'] = self.time
        if quote['qty'] <= 0:
            sys.exit('processLimitOrder() given order of qty <= 0')
        if not fromData: self.nextQuoteID += 1
        if orderType == 'market':
            trades = self.processMarketOrder(quote, verbose)
        elif orderType == 'limit':
            quote['price'] = self.clipPrice(quote['price'])
            trades, orderInBook = self.processLimitOrder(quote, fromData, verbose)
        else:
            sys.exit("processOrder() given neither 'market' nor 'limit'")
        return trades, orderInBook

    def processOrderList(self, side, orderlist,
                         qtyStillToTrade, quote, verbose):
        trades = []
        qtyToTrade = qtyStillToTrade
        while len(orderlist) > 0 and qtyToTrade > 0:
            headOrder = orderlist.getHeadOrder()
            tradedPrice = headOrder.price
            counterparty = headOrder.tid
            if qtyToTrade < headOrder.qty:
                tradedQty = qtyToTrade
                newBookQty = headOrder.qty - qtyToTrade
                headOrder.updateQty(newBookQty, headOrder.timestamp)
                qtyToTrade = 0
            elif qtyToTrade == headOrder.qty:
                tradedQty = qtyToTrade
                if side == 'bid':
                    self.bids.removeOrderById(headOrder.idNum)
                else:
                    self.asks.removeOrderById(headOrder.idNum)
                qtyToTrade = 0
            else:
                tradedQty = headOrder.qty
                if side == 'bid':
                    self.bids.removeOrderById(headOrder.idNum)
                else:
                    self.asks.removeOrderById(headOrder.idNum)
                qtyToTrade -= tradedQty
            if verbose: print('>>> TRADE \nt=%d $%f n=%d p1=%d p2=%d' %
                              (self.time, tradedPrice, tradedQty,
                               counterparty, quote['tid']))

            transactionRecord = {'timestamp': self.time,
                                 'price': tradedPrice,
                                 'qty': tradedQty,
                                 'time': self.time,
                                 'day': self.time_obj.now()['day']}
            if side == 'bid':
                transactionRecord['party1'] = [counterparty,
                                               'bid',
                                               headOrder.idNum]
                transactionRecord['party2'] = [quote['tid'],
                                               'ask',
                                               None]
            else:
                transactionRecord['party1'] = [counterparty,
                                               'ask',
                                               headOrder.idNum]
                transactionRecord['party2'] = [quote['tid'],
                                               'bid',
                                               None]
            self.tape.append(transactionRecord)
            trades.append(transactionRecord)
        return qtyToTrade, trades

    def processMarketOrder(self, quote, verbose):
        trades = []
        qtyToTrade = quote['qty']
        side = quote['side']
        if side == 'bid':
            while qtyToTrade > 0 and self.asks:
                bestPriceAsks = self.asks.minPriceList()
                qtyToTrade, newTrades = self.processOrderList('ask',
                                                              bestPriceAsks,
                                                              qtyToTrade,
                                                              quote, verbose)
                trades += newTrades
        elif side == 'ask':
            while qtyToTrade > 0 and self.bids:
                bestPriceBids = self.bids.maxPriceList()
                qtyToTrade, newTrades = self.processOrderList('bid',
                                                              bestPriceBids,
                                                              qtyToTrade,
                                                              quote, verbose)
                trades += newTrades
        else:
            sys.exit('processMarketOrder() received neither "bid" nor "ask"')
        return trades

    def processLimitOrder(self, quote, fromData, verbose):
        orderInBook = None
        trades = []
        qtyToTrade = quote['qty']
        side = quote['side']
        price = quote['price']
        if side == 'bid':
            while (self.asks and
                   price >= self.asks.minPrice() and
                   qtyToTrade > 0):
                bestPriceAsks = self.asks.minPriceList()
                qtyToTrade, newTrades = self.processOrderList('ask',
                                                              bestPriceAsks,
                                                              qtyToTrade,
                                                              quote, verbose)

                trades += newTrades
            if qtyToTrade > 0:
                if not fromData:
                    quote['idNum'] = self.nextQuoteID
                quote['qty'] = qtyToTrade
                self.bids.insertOrder(quote)
                orderInBook = quote
        elif side == 'ask':
            while (self.bids and
                   price <= self.bids.maxPrice() and
                   qtyToTrade > 0):
                bestPriceBids = self.bids.maxPriceList()
                qtyToTrade, newTrades = self.processOrderList('bid',
                                                              bestPriceBids,
                                                              qtyToTrade,
                                                              quote, verbose)
                trades += newTrades
            if qtyToTrade > 0:
                if not fromData:
                    quote['idNum'] = self.nextQuoteID
                quote['qty'] = qtyToTrade
                self.asks.insertOrder(quote)
                orderInBook = quote
        else:
            sys.exit('processLimitOrder() given neither bid nor ask')
        return trades, orderInBook

    def avg_volume_traded(self):
        if self.tape != None and len(self.tape) > 0:
            num = 0
            total_volume = 0
            for entry in self.tape:
                total_volume = entry['qty'] + total_volume
                num += 1
            if num != 0 and total_volume != None:
                return total_volume, num
        else:
            return None, None

Ответы [ 2 ]

0 голосов
/ 08 июля 2019

Как заметил Том Далтон , это исключение может быть вызвано из-за модификации self.tape в другом потоке.Я столкнулся с этой проблемой раньше, так как для себя я исправил ее, создав блокировку.Я также могу предложить вам просто игнорировать это исключение, но это может привести к неопределенному поведению

0 голосов
/ 08 июля 2019

Похоже, вы используете темы.И очень вероятно, что self.tape изменяется другим потоком.Вы должны попытаться заблокировать этот поток от изменения типа self.tape во время выполнения avg_volume_traded.

...