Я первоначально разместил это в обзоре кода (отсюда и длинный код), но не смог получить ответ.
Моя модель основана на этой игре https://en.wikipedia.org/wiki/Ultimatum_game.Я не буду вдаваться в интуицию, стоящую за ней, но, вообще говоря, она функционирует следующим образом:
Игра состоит из решетки тревог, на которой агент размещается в каждом узле.
В течение каждого временного шага каждый игрок в каждом узле играет против случайного соседа, играя определенную стратегию.
Каждая из их стратегий (значение от 1 до 9) имеет склонность (которая назначается случайным образом и представляет собой просто некоторое число).Склонность, в свою очередь, определяет вероятность игры в эту стратегию.Вероятность рассчитывается как склонность этой стратегии к сумме склонностей всех стратегий.
Если игра приводит к положительному выигрышу, то выигрыши от этой игры добавляются к склонностям для этих стратегий.
Эти склонности затем определяют вероятности для их стратегий на следующем временном шаге и так далее.
Симуляция заканчивается после достижения временного шага N.
Для игр с большими решетками и большими временными шагами мой код работает действительно очень медленно.Я запустил cProfiler, чтобы проверить, где находятся узкие места, и, как я подозревал, функции update_probabilities
и play_rounds
, похоже, занимают много времени.Я хочу иметь возможность запускать игру с размером сетки 40х40 примерно за 100000+ временных шагов, но сейчас этого не происходит.
Так что может быть более эффективным способом для расчета и обновления вероятностей / склонностей каждого игрока в сетке?Я подумал о реализации массивов NumPy, но я не уверен, стоит ли здесь хлопот?
import numpy as np
import random
from random import randint
from numpy.random import choice
from numpy.random import multinomial
import cProfile
mew = 0.001
error = 0.05
def create_grid(row, col):
return [[0 for j in range(col)] for i in range(row)]
def create_random_propensities():
propensities = {}
pre_propensities = [random.uniform(0, 1) for i in range(9)]
a = np.sum(pre_propensities)
for i in range(1, 10):
propensities[i] = (pre_propensities[i - 1]/a) * 10 # normalize sum of propensities to 10
return propensities
class Proposer:
def __init__(self):
self.propensities = create_random_propensities()
self.probabilites = []
self.demand = 0 # the amount the proposer demands for themselves
def pick_strat(self, n_trials): # gets strategy, an integer in the interval [1, 9]
results = multinomial(n_trials, self.probabilites)
i, = np.where(results == max(results))
if len(i) > 1:
return choice(i) + 1
else:
return i[0] + 1
def calculate_probability(self, dict_data, index, total_sum): # calculates probability for particular strat, taking propensity
return dict_data[index]/total_sum # of that strat as input
def calculate_sum(self, dict_data):
return sum(dict_data.values())
def initialize(self):
init_sum = self.calculate_sum(self.propensities)
for strategy in range(1, 10):
self.probabilites.append(self.calculate_probability(self.propensities, strategy, init_sum))
self.demand = self.pick_strat(1)
def update_strategy(self):
self.demand = self.pick_strat(1)
def update_probablities(self):
for i in range(9):
self.propensities[1 + i] *= 1 - mew
pensity_sum = self.calculate_sum(self.propensities)
for i in range(9):
self.probabilites[i] = self.calculate_probability(self.propensities, 1 + i, pensity_sum)
def update(self):
self.update_probablities()
self.update_strategy()
class Responder: # methods same as proposer class, can skip-over
def __init__(self):
self.propensities = create_random_propensities()
self.probabilites = []
self.max_thresh = 0 # the maximum demand they are willing to accept
def pick_strat(self, n_trials):
results = multinomial(n_trials, self.probabilites)
i, = np.where(results == max(results))
if len(i) > 1:
return choice(i) + 1
else:
return i[0] + 1
def calculate_probability(self, dict_data, index, total_sum):
return dict_data[index]/total_sum
def calculate_sum(self, dict_data):
return sum(dict_data.values())
def initialize(self):
init_sum = self.calculate_sum(self.propensities)
for strategy in range(1, 10):
self.probabilites.append(self.calculate_probability(self.propensities, strategy, init_sum))
self.max_thresh = self.pick_strat(1)
def update_strategy(self):
self.max_thresh = self.pick_strat(1)
def update_probablities(self):
for i in range(9):
self.propensities[1 + i] *= 1 - mew # stops sum of propensites from growing without bound
pensity_sum = self.calculate_sum(self.propensities)
for i in range(9):
self.probabilites[i] = self.calculate_probability(self.propensities, 1 + i, pensity_sum)
def update(self):
self.update_probablities()
self.update_strategy()
class Agent:
def __init__(self):
self.prop_side = Proposer()
self.resp_side = Responder()
self.prop_side.initialize()
self.resp_side.initialize()
def update_all(self):
self.prop_side.update()
self.resp_side.update()
class Grid:
def __init__(self, rowsize, colsize):
self.rowsize = rowsize
self.colsize = colsize
def make_lattice(self):
return [[Agent() for j in range(self.colsize)] for i in range(self.rowsize)]
@staticmethod
def von_neumann_neighbourhood(array, row, col, wrapped=True): # gets up, bottom, left, right neighbours of some node
neighbours = set([])
if row + 1 <= len(array) - 1:
neighbours.add(array[row + 1][col])
if row - 1 >= 0:
neighbours.add(array[row - 1][col])
if col + 1 <= len(array[0]) - 1:
neighbours.add(array[row][col + 1])
if col - 1 >= 0:
neighbours.add(array[row][col - 1])
#if wrapped is on, conditions for out of bound points
if row - 1 < 0 and wrapped == True:
neighbours.add(array[-1][col])
if col - 1 < 0 and wrapped == True:
neighbours.add(array[row][-1])
if row + 1 > len(array) - 1 and wrapped == True:
neighbours.add(array[0][col])
if col + 1 > len(array[0]) - 1 and wrapped == True:
neighbours.add(array[row][0])
return neighbours
def get_error_term(pay, strategy):
index_strat_2, index_strat_8 = 2, 8
if strategy == 1:
return (1 - (error/2)) * pay, error/2 * pay, index_strat_2
if strategy == 9:
return (1 - (error/2)) * pay, error/2 * pay, index_strat_8
else:
return (1 - error) * pay, error/2 * pay, 0
class Games:
def __init__(self, n_rows, n_cols, n_rounds):
self.rounds = n_rounds
self.rows = n_rows
self.cols = n_cols
self.lattice = Grid(self.rows, self.cols).make_lattice()
self.lookup_table = np.full((self.rows, self.cols), False, dtype=bool) # if player on grid has updated their strat, set to True
def reset_look_tab(self):
self.lookup_table = np.full((self.rows, self.cols), False, dtype=bool)
def run_game(self):
n = 0
while n < self.rounds:
for r in range(self.rows):
for c in range(self.cols):
if n != 0:
self.lattice[r][c].update_all()
self.lookup_table[r][c] = True
self.play_rounds(self.lattice, r, c)
self.reset_look_tab()
n += 1
def play_rounds(self, grid, row, col):
neighbours = Grid.von_neumann_neighbourhood(grid, row, col)
neighbour = random.sample(neighbours, 1).pop()
neighbour_index = [(ix, iy) for ix, row in enumerate(self.lattice) for iy, i in enumerate(row) if i == neighbour]
if self.lookup_table[neighbour_index[0][0]][neighbour_index[0][1]] == False: # see if neighbour has already updated their strat
neighbour.update_all()
player = grid[row][col]
coin_toss = randint(0, 1) # which player acts as proposer or responder in game
if coin_toss == 1:
if player.prop_side.demand <= neighbour.resp_side.max_thresh: # postive payoff
payoff, adjacent_payoff, index = get_error_term(player.prop_side.demand, player.prop_side.demand)
if player.prop_side.demand == 1 or player.prop_side.demand == 9: # extreme strategies get bonus payoffs
player.prop_side.propensities[player.prop_side.demand] += payoff
player.prop_side.propensities[index] += adjacent_payoff
else:
player.prop_side.propensities[player.prop_side.demand] += payoff
player.prop_side.propensities[player.prop_side.demand - 1] += adjacent_payoff
player.prop_side.propensities[player.prop_side.demand + 1] += adjacent_payoff
else:
return 0 # if demand > max thresh -> both get zero
if coin_toss != 1:
if neighbour.prop_side.demand <= player.resp_side.max_thresh:
payoff, adjacent_payoff, index = get_error_term(10 - neighbour.prop_side.demand, player.resp_side.max_thresh)
if player.resp_side.max_thresh == 1 or player.resp_side.max_thresh == 9:
player.resp_side.propensities[player.resp_side.max_thresh] += payoff
player.resp_side.propensities[index] += adjacent_payoff
else:
player.resp_side.propensities[player.resp_side.max_thresh] += payoff
player.resp_side.propensities[player.resp_side.max_thresh - 1] += adjacent_payoff
player.resp_side.propensities[player.resp_side.max_thresh + 1] += adjacent_payoff
else:
return 0
#pr = cProfile.Profile()
#pr.enable()
my_game = Games(10, 10, 2000) # (rowsize, colsize, n_steps)
my_game.run_game()
#pr.disable()
#pr.print_stats(sort='time')
(Для тех, кому интересно, get_error_term
просто возвращает склонности к стратегиям, которыеРядом со стратегиями, которые получают положительную отдачу, например, если стратегия 8 работает, то склонности 7 и 9 также корректируются в сторону увеличения, и это вычисляется указанной функцией. И первый цикл for
внутри update_probabilities
просто гарантирует, чтосумма склонностей не растет без границ).