Demo / Gomoku_MCTS /mcts_pure.py
HuskyDoge's picture
finish model selections
e7a440c
raw
history blame
10 kB
# -*- coding: utf-8 -*-
import numpy as np
import copy
from operator import itemgetter
import time
def softmax(x):
probs = np.exp(x - np.max(x))
probs /= np.sum(probs)
return probs
def rollout_policy_fn(board):
"""a coarse, fast version of policy_fn used in the rollout phase."""
# rollout randomly
action_probs = np.random.rand(len(board.availables))
return zip(board.availables, action_probs)
# 决策价值函数
def policy_value_fn(board):
"""a function that takes in a state and outputs a list of (action, probability)
tuples and a score for the state"""
# return uniform probabilities and 0 score for pure MCTS
action_probs = np.ones(len(board.availables))/len(board.availables)
return zip(board.availables, action_probs), 0
class TreeNode(object):
"""A node in the MCTS tree. Each node keeps track of its own value Q,
prior probability P, and its visit-count-adjusted prior score u.
"""
def __init__(self, parent, prior_p):
self._parent = parent
self._children = {} # a map from action to TreeNode
self._n_visits = 0
self._Q = 0
self._u = 0
self._P = prior_p
def expand(self, action_priors):
"""Expand tree by creating new children.
action_priors: a list of tuples of actions and their prior probability
according to the policy function.
"""
for action, prob in action_priors:
if action not in self._children:
self._children[action] = TreeNode(self, prob)
def select(self, c_puct):
"""Select action among children that gives maximum action value Q
plus bonus u(P).
Return: A tuple of (action, next_node)
"""
return max(self._children.items(),
key=lambda act_node: act_node[1].get_value(c_puct))
def update(self, leaf_value):
"""Update node values from leaf evaluation.
leaf_value: the value of subtree evaluation from the current player's
perspective.
"""
# Count visit.
self._n_visits += 1
# Update Q, a running average of values for all visits.
# print("=====================================")
# print("Before, Q: {}, visits: {}, leaf_value: {}".format(self._Q, self._n_visits,leaf_value))
self._Q += 1.0*(leaf_value - self._Q) / self._n_visits
# print("After, Q: {}, visits: {}, leaf_value: {}".format(self._Q, self._n_visits,leaf_value))
def update_recursive(self, leaf_value):
"""Like a call to update(), but applied recursively for all ancestors.
"""
# If it is not root, this node's parent should be updated first.
if self._parent:
self._parent.update_recursive(-leaf_value)
self.update(leaf_value)
def get_value(self, c_puct):
"""Calculate and return the value for this node.
It is a combination of leaf evaluations Q, and this node's prior
adjusted for its visit count, u.
c_puct: a number in (0, inf) controlling the relative impact of
value Q, and prior probability P, on this node's score.
"""
self._u = (c_puct * self._P *
np.sqrt(self._parent._n_visits) / (1 + self._n_visits))
return self._Q + self._u
def is_leaf(self):
"""Check if leaf node (i.e. no nodes below this have been expanded).
"""
return self._children == {}
def is_root(self):
return self._parent is None
class MCTS(object):
"""A simple implementation of Monte Carlo Tree Search."""
def __init__(self, policy_value_fn, c_puct=5, n_playout=2000):
"""
policy_value_fn: a function that takes in a board state and outputs
a list of (action, probability) tuples and also a score in [-1, 1]
(i.e. the expected value of the end game score from the current
player's perspective) for the current player.
c_puct: a number in (0, inf) that controls how quickly exploration
converges to the maximum-value policy. A higher value means
relying on the prior more. ???
"""
self._root = TreeNode(None, 1.0)
self._policy = policy_value_fn
self._c_puct = c_puct
self._n_playout = n_playout
def _playout(self, state):
"""Run a single playout from the root to the leaf, getting a value at
the leaf and propagating it back through its parents.
State is modified in-place, so a copy must be provided.
"""
node = self._root
while(1):
if node.is_leaf():
break
# Greedily select next move.
action, node = node.select(self._c_puct)
state.do_move(action)
action_probs, _ = self._policy(state)
# Check for end of game
end, winner = state.game_end()
if not end:
node.expand(action_probs)
# Evaluate the leaf node by random rollout
leaf_value = self._evaluate_rollout(state)
# Update value and visit count of nodes in this traversal.
node.update_recursive(-leaf_value)
def _evaluate_rollout(self, state, limit=1000):
"""Use the rollout policy to play until the end of the game,
returning +1 if the current player wins, -1 if the opponent wins,
and 0 if it is a tie.
"""
player = state.get_current_player()
for i in range(limit):
end, winner = state.game_end()
if end:
break
action_probs = rollout_policy_fn(state)
max_action = max(action_probs, key=itemgetter(1))[0]
state.do_move(max_action)
else:
# If no break from the loop, issue a warning.
print("WARNING: rollout reached move limit")
if winner == -1: # tie
return 0
else:
return 1 if winner == player else -1
def get_move(self, state):
"""Runs all playouts sequentially and returns the most visited action.
state: the current game state
Return: the selected action
"""
start_time = time.time()
# n_playout 探索的次数
for n in range(self._n_playout):
state_copy = copy.deepcopy(state)
self._playout(state_copy)
need_time = time.time() - start_time
print(f" PureMCTS sum_time: {need_time / self._n_playout }, total_simulation: {self._n_playout}")
return max(self._root._children.items(),key=lambda act_node: act_node[1]._n_visits)[0], need_time / self._n_playout
def update_with_move(self, last_move):
"""Step forward in the tree, keeping everything we already know
about the subtree.
"""
if last_move in self._root._children:
self._root = self._root._children[last_move]
self._root._parent = None
else:
self._root = TreeNode(None, 1.0)
def get_move_probs(self, state, temp=1e-3):
"""Run all playouts sequentially and return the available actions and
their corresponding probabilities.
state: the current game state
temp: temperature parameter in (0, 1] controls the level of exploration
"""
start_time_averge = 0
### test multi-thread
# lock = threading.Lock()
# with ThreadPoolExecutor(max_workers=4) as executor:
# for n in range(self._n_playout):
# start_time = time.time()
# state_copy = copy.deepcopy(state)
# executor.submit(self._playout, state_copy, lock)
# start_time_averge += (time.time() - start_time)
### end test multi-thread
t = time.time()
for n in range(self._n_playout):
start_time = time.time()
state_copy = copy.deepcopy(state)
self._playout(state_copy)
start_time_averge += (time.time() - start_time)
total_time = time.time() - t
# print('!!time!!:', time.time() - t)
print(f" My MCTS sum_time: {total_time}, total_simulation: {self._n_playout}")
# calc the move probabilities based on visit counts at the root node
act_visits = [(act, node._n_visits)
for act, node in self._root._children.items()]
acts, visits = zip(*act_visits)
act_probs = softmax(1.0 / temp * np.log(np.array(visits) + 1e-10))
return 0, acts, act_probs, total_time
def __str__(self):
return "MCTS"
class MCTSPlayer(object):
"""AI player based on MCTS"""
def __init__(self, c_puct=5, n_playout=2000):
self.mcts = MCTS(policy_value_fn, c_puct, n_playout)
def set_player_ind(self, p):
self.player = p
def reset_player(self):
self.mcts.update_with_move(-1)
def get_action(self, board, return_time=False):
sensible_moves = board.availables
if len(sensible_moves) > 0:
move, simul_mean_time = self.mcts.get_move(board)
self.mcts.update_with_move(-1)
print("MCTS move:", move)
return move, simul_mean_time
else:
print("WARNING: the board is full")
def __str__(self):
return "MCTS {}".format(self.player)
# 多了下面这一串代码
class Human_Player(object):
def __init__(self):
pass
def set_player_ind(self, p):
self.player = p
def get_action(self, board):
sensible_moves = board.availables
if len(sensible_moves) > 0:
# print(sensible_moves)
move = int(input("Input the move:"))
while (move not in sensible_moves ):
print(sensible_moves)
move = int(input("Input the move again:"))
return move
else:
print("WARNING: the board is full")
def __str__(self):
return "Human {}".format(self.player)