Низкая производительность в агентной модели Python - PullRequest
0 голосов
/ 28 июня 2019

Я первоначально разместил это в обзоре кода (отсюда и длинный код), но не смог получить ответ.

Моя модель основана на этой игре https://en.wikipedia.org/wiki/Ultimatum_game.Я не буду вдаваться в интуицию, стоящую за ней, но, вообще говоря, она функционирует следующим образом:

  1. Игра состоит из решетки тревог, на которой агент размещается в каждом узле.

  2. В течение каждого временного шага каждый игрок в каждом узле играет против случайного соседа, играя определенную стратегию.

  3. Каждая из их стратегий (значение от 1 до 9) имеет склонность (которая назначается случайным образом и представляет собой просто некоторое число).Склонность, в свою очередь, определяет вероятность игры в эту стратегию.Вероятность рассчитывается как склонность этой стратегии к сумме склонностей всех стратегий.

  4. Если игра приводит к положительному выигрышу, то выигрыши от этой игры добавляются к склонностям для этих стратегий.

  5. Эти склонности затем определяют вероятности для их стратегий на следующем временном шаге и так далее.

  6. Симуляция заканчивается после достижения временного шага N.

Для игр с большими решетками и большими временными шагами мой код работает действительно очень медленно.Я запустил cProfiler, чтобы проверить, где находятся узкие места, и, как я подозревал, функции update_probabilities и play_rounds, похоже, занимают много времени.Я хочу иметь возможность запускать игру с размером сетки 40х40 примерно за 100000+ временных шагов, но сейчас этого не происходит.

Так что может быть более эффективным способом для расчета и обновления вероятностей / склонностей каждого игрока в сетке?Я подумал о реализации массивов NumPy, но я не уверен, стоит ли здесь хлопот?


import numpy as np
import random
from random import randint
from numpy.random import choice
from numpy.random import multinomial
import cProfile

mew = 0.001 
error = 0.05

def create_grid(row, col):
    return [[0 for j in range(col)] for i in range(row)]

def create_random_propensities():
    propensities = {}
    pre_propensities = [random.uniform(0, 1) for i in range(9)]
    a = np.sum(pre_propensities)
    for i in range(1, 10):
        propensities[i] = (pre_propensities[i - 1]/a) * 10 # normalize sum of propensities to 10
    return propensities

class Proposer:
    def __init__(self):
        self.propensities = create_random_propensities()
        self.probabilites = []
        self.demand = 0 # the amount the proposer demands for themselves

    def pick_strat(self, n_trials): # gets strategy, an integer in the interval [1, 9]
        results = multinomial(n_trials, self.probabilites)
        i, = np.where(results == max(results))
        if len(i) > 1:
            return choice(i) + 1
        else:
            return i[0] + 1

    def calculate_probability(self, dict_data, index, total_sum): # calculates probability for particular strat, taking propensity
        return dict_data[index]/total_sum                           # of that strat as input

    def calculate_sum(self, dict_data):
        return sum(dict_data.values())

    def initialize(self):
        init_sum = self.calculate_sum(self.propensities)
        for strategy in range(1, 10):
            self.probabilites.append(self.calculate_probability(self.propensities, strategy, init_sum)) 
        self.demand = self.pick_strat(1)

    def update_strategy(self):
        self.demand = self.pick_strat(1)

    def update_probablities(self):
        for i in range(9):
            self.propensities[1 + i] *= 1 - mew 
        pensity_sum = self.calculate_sum(self.propensities)
        for i in range(9):
            self.probabilites[i] = self.calculate_probability(self.propensities, 1 + i, pensity_sum)

    def update(self):
        self.update_probablities()
        self.update_strategy()

class Responder: # methods same as proposer class, can skip-over
    def __init__(self):
        self.propensities = create_random_propensities()
        self.probabilites = []
        self.max_thresh = 0 # the maximum demand they are willing to accept 

    def pick_strat(self, n_trials):
        results = multinomial(n_trials, self.probabilites)
        i, = np.where(results == max(results))
        if len(i) > 1:
            return choice(i) + 1
        else:
            return i[0] + 1

    def calculate_probability(self, dict_data, index, total_sum):
        return dict_data[index]/total_sum

    def calculate_sum(self, dict_data):
        return sum(dict_data.values())

    def initialize(self):
        init_sum = self.calculate_sum(self.propensities)
        for strategy in range(1, 10):
            self.probabilites.append(self.calculate_probability(self.propensities, strategy, init_sum)) 
        self.max_thresh = self.pick_strat(1)

    def update_strategy(self):
        self.max_thresh = self.pick_strat(1)

    def update_probablities(self):
        for i in range(9):
            self.propensities[1 + i] *= 1 - mew # stops sum of propensites from growing without bound
        pensity_sum = self.calculate_sum(self.propensities)
        for i in range(9):
            self.probabilites[i] = self.calculate_probability(self.propensities, 1 + i, pensity_sum)

    def update(self):
        self.update_probablities()
        self.update_strategy()

class Agent:
    def __init__(self):
        self.prop_side = Proposer()
        self.resp_side = Responder()
        self.prop_side.initialize()
        self.resp_side.initialize()

    def update_all(self):
        self.prop_side.update()
        self.resp_side.update()

class Grid:
    def __init__(self, rowsize, colsize):
        self.rowsize = rowsize
        self.colsize = colsize

    def make_lattice(self):
        return [[Agent() for j in range(self.colsize)] for i in range(self.rowsize)]

    @staticmethod
    def von_neumann_neighbourhood(array, row, col, wrapped=True): # gets up, bottom, left, right neighbours of some node
        neighbours = set([])

        if row + 1 <= len(array) - 1:
            neighbours.add(array[row + 1][col])

        if row - 1 >= 0:
            neighbours.add(array[row - 1][col])

        if col + 1 <= len(array[0]) - 1:
            neighbours.add(array[row][col + 1])

        if col - 1 >= 0:    
            neighbours.add(array[row][col - 1])
        #if wrapped is on, conditions for out of bound points
        if row - 1 < 0 and wrapped == True:
            neighbours.add(array[-1][col])

        if col - 1 < 0 and wrapped == True:
            neighbours.add(array[row][-1])

        if row + 1 > len(array) - 1 and wrapped == True:
            neighbours.add(array[0][col])

        if col + 1 > len(array[0]) - 1 and wrapped == True:
            neighbours.add(array[row][0])
        return neighbours

def get_error_term(pay, strategy):
    index_strat_2, index_strat_8 = 2, 8
    if strategy == 1:
        return (1 - (error/2)) * pay, error/2 * pay, index_strat_2
    if strategy == 9:
        return (1 - (error/2)) * pay, error/2 * pay, index_strat_8
    else:
        return (1 - error) * pay, error/2 * pay, 0

class Games:
    def __init__(self, n_rows, n_cols, n_rounds):
        self.rounds = n_rounds
        self.rows = n_rows
        self.cols = n_cols
        self.lattice = Grid(self.rows, self.cols).make_lattice()
        self.lookup_table = np.full((self.rows, self.cols), False, dtype=bool)  # if player on grid has updated their strat, set to True 

    def reset_look_tab(self):
        self.lookup_table = np.full((self.rows, self.cols), False, dtype=bool)

    def run_game(self):
        n = 0
        while n < self.rounds:
            for r in range(self.rows):
                for c in range(self.cols):
                    if n != 0:
                        self.lattice[r][c].update_all() 
                        self.lookup_table[r][c] = True
                    self.play_rounds(self.lattice, r, c)
            self.reset_look_tab()
            n += 1

    def play_rounds(self, grid, row, col):  
        neighbours = Grid.von_neumann_neighbourhood(grid, row, col)
        neighbour = random.sample(neighbours, 1).pop() 
        neighbour_index = [(ix, iy) for ix, row in enumerate(self.lattice) for iy, i in enumerate(row) if i == neighbour]
        if self.lookup_table[neighbour_index[0][0]][neighbour_index[0][1]] == False: # see if neighbour has already updated their strat
            neighbour.update_all()                                                      
        player = grid[row][col]
        coin_toss = randint(0, 1) # which player acts as proposer or responder in game
        if coin_toss == 1:
            if player.prop_side.demand <= neighbour.resp_side.max_thresh: # postive payoff
                payoff, adjacent_payoff, index = get_error_term(player.prop_side.demand, player.prop_side.demand)
                if player.prop_side.demand == 1 or player.prop_side.demand == 9: # extreme strategies get bonus payoffs
                    player.prop_side.propensities[player.prop_side.demand] += payoff
                    player.prop_side.propensities[index] += adjacent_payoff
                else:
                    player.prop_side.propensities[player.prop_side.demand] += payoff
                    player.prop_side.propensities[player.prop_side.demand - 1] += adjacent_payoff
                    player.prop_side.propensities[player.prop_side.demand + 1] += adjacent_payoff
            else:
                return 0 # if demand > max thresh -> both get zero

        if coin_toss != 1:
            if neighbour.prop_side.demand <= player.resp_side.max_thresh:
                payoff, adjacent_payoff, index = get_error_term(10 - neighbour.prop_side.demand, player.resp_side.max_thresh)
                if player.resp_side.max_thresh == 1 or player.resp_side.max_thresh == 9:
                    player.resp_side.propensities[player.resp_side.max_thresh] += payoff
                    player.resp_side.propensities[index] += adjacent_payoff
                else:
                    player.resp_side.propensities[player.resp_side.max_thresh] += payoff
                    player.resp_side.propensities[player.resp_side.max_thresh - 1] += adjacent_payoff
                    player.resp_side.propensities[player.resp_side.max_thresh + 1] += adjacent_payoff
            else:
                return 0

#pr = cProfile.Profile()
#pr.enable()

my_game = Games(10, 10, 2000) # (rowsize, colsize, n_steps)
my_game.run_game()

#pr.disable()
#pr.print_stats(sort='time')

(Для тех, кому интересно, get_error_term просто возвращает склонности к стратегиям, которыеРядом со стратегиями, которые получают положительную отдачу, например, если стратегия 8 работает, то склонности 7 и 9 также корректируются в сторону увеличения, и это вычисляется указанной функцией. И первый цикл for внутри update_probabilities просто гарантирует, чтосумма склонностей не растет без границ).

...