# -*- coding: utf-8 -*- import numpy as np import copy from operator import itemgetter import time def softmax(x): probs = np.exp(x - np.max(x)) probs /= np.sum(probs) return probs def rollout_policy_fn(board): """a coarse, fast version of policy_fn used in the rollout phase.""" # rollout randomly action_probs = np.random.rand(len(board.availables)) return zip(board.availables, action_probs) # 决策价值函数 def policy_value_fn(board): """a function that takes in a state and outputs a list of (action, probability) tuples and a score for the state""" # return uniform probabilities and 0 score for pure MCTS action_probs = np.ones(len(board.availables))/len(board.availables) return zip(board.availables, action_probs), 0 class TreeNode(object): """A node in the MCTS tree. Each node keeps track of its own value Q, prior probability P, and its visit-count-adjusted prior score u. """ def __init__(self, parent, prior_p): self._parent = parent self._children = {} # a map from action to TreeNode self._n_visits = 0 self._Q = 0 self._u = 0 self._P = prior_p def expand(self, action_priors): """Expand tree by creating new children. action_priors: a list of tuples of actions and their prior probability according to the policy function. """ for action, prob in action_priors: if action not in self._children: self._children[action] = TreeNode(self, prob) def select(self, c_puct): """Select action among children that gives maximum action value Q plus bonus u(P). Return: A tuple of (action, next_node) """ return max(self._children.items(), key=lambda act_node: act_node[1].get_value(c_puct)) def update(self, leaf_value): """Update node values from leaf evaluation. leaf_value: the value of subtree evaluation from the current player's perspective. """ # Count visit. self._n_visits += 1 # Update Q, a running average of values for all visits. # print("=====================================") # print("Before, Q: {}, visits: {}, leaf_value: {}".format(self._Q, self._n_visits,leaf_value)) self._Q += 1.0*(leaf_value - self._Q) / self._n_visits # print("After, Q: {}, visits: {}, leaf_value: {}".format(self._Q, self._n_visits,leaf_value)) def update_recursive(self, leaf_value): """Like a call to update(), but applied recursively for all ancestors. """ # If it is not root, this node's parent should be updated first. if self._parent: self._parent.update_recursive(-leaf_value) self.update(leaf_value) def get_value(self, c_puct): """Calculate and return the value for this node. It is a combination of leaf evaluations Q, and this node's prior adjusted for its visit count, u. c_puct: a number in (0, inf) controlling the relative impact of value Q, and prior probability P, on this node's score. """ self._u = (c_puct * self._P * np.sqrt(self._parent._n_visits) / (1 + self._n_visits)) return self._Q + self._u def is_leaf(self): """Check if leaf node (i.e. no nodes below this have been expanded). """ return self._children == {} def is_root(self): return self._parent is None class MCTS(object): """A simple implementation of Monte Carlo Tree Search.""" def __init__(self, policy_value_fn, c_puct=5, n_playout=2000): """ policy_value_fn: a function that takes in a board state and outputs a list of (action, probability) tuples and also a score in [-1, 1] (i.e. the expected value of the end game score from the current player's perspective) for the current player. c_puct: a number in (0, inf) that controls how quickly exploration converges to the maximum-value policy. A higher value means relying on the prior more. ??? """ self._root = TreeNode(None, 1.0) self._policy = policy_value_fn self._c_puct = c_puct self._n_playout = n_playout def _playout(self, state): """Run a single playout from the root to the leaf, getting a value at the leaf and propagating it back through its parents. State is modified in-place, so a copy must be provided. """ node = self._root while(1): if node.is_leaf(): break # Greedily select next move. action, node = node.select(self._c_puct) state.do_move(action) action_probs, _ = self._policy(state) # Check for end of game end, winner = state.game_end() if not end: node.expand(action_probs) # Evaluate the leaf node by random rollout leaf_value = self._evaluate_rollout(state) # Update value and visit count of nodes in this traversal. node.update_recursive(-leaf_value) def _evaluate_rollout(self, state, limit=1000): """Use the rollout policy to play until the end of the game, returning +1 if the current player wins, -1 if the opponent wins, and 0 if it is a tie. """ player = state.get_current_player() for i in range(limit): end, winner = state.game_end() if end: break action_probs = rollout_policy_fn(state) max_action = max(action_probs, key=itemgetter(1))[0] state.do_move(max_action) else: # If no break from the loop, issue a warning. print("WARNING: rollout reached move limit") if winner == -1: # tie return 0 else: return 1 if winner == player else -1 def get_move(self, state): """Runs all playouts sequentially and returns the most visited action. state: the current game state Return: the selected action """ start_time = time.time() # n_playout 探索的次数 for n in range(self._n_playout): state_copy = copy.deepcopy(state) self._playout(state_copy) need_time = time.time() - start_time print(f" PureMCTS sum_time: {need_time / self._n_playout }, total_simulation: {self._n_playout}") return max(self._root._children.items(),key=lambda act_node: act_node[1]._n_visits)[0], need_time / self._n_playout def update_with_move(self, last_move): """Step forward in the tree, keeping everything we already know about the subtree. """ if last_move in self._root._children: self._root = self._root._children[last_move] self._root._parent = None else: self._root = TreeNode(None, 1.0) def get_move_probs(self, state, temp=1e-3): """Run all playouts sequentially and return the available actions and their corresponding probabilities. state: the current game state temp: temperature parameter in (0, 1] controls the level of exploration """ start_time_averge = 0 ### test multi-thread # lock = threading.Lock() # with ThreadPoolExecutor(max_workers=4) as executor: # for n in range(self._n_playout): # start_time = time.time() # state_copy = copy.deepcopy(state) # executor.submit(self._playout, state_copy, lock) # start_time_averge += (time.time() - start_time) ### end test multi-thread t = time.time() for n in range(self._n_playout): start_time = time.time() state_copy = copy.deepcopy(state) self._playout(state_copy) start_time_averge += (time.time() - start_time) total_time = time.time() - t # print('!!time!!:', time.time() - t) print(f" My MCTS sum_time: {total_time}, total_simulation: {self._n_playout}") # calc the move probabilities based on visit counts at the root node act_visits = [(act, node._n_visits) for act, node in self._root._children.items()] acts, visits = zip(*act_visits) act_probs = softmax(1.0 / temp * np.log(np.array(visits) + 1e-10)) return 0, acts, act_probs, total_time def __str__(self): return "MCTS" class MCTSPlayer(object): """AI player based on MCTS""" def __init__(self, c_puct=5, n_playout=2000): self.mcts = MCTS(policy_value_fn, c_puct, n_playout) def set_player_ind(self, p): self.player = p def reset_player(self): self.mcts.update_with_move(-1) def get_action(self, board, return_time=False): sensible_moves = board.availables if len(sensible_moves) > 0: move, simul_mean_time = self.mcts.get_move(board) self.mcts.update_with_move(-1) print("MCTS move:", move) return move, simul_mean_time else: print("WARNING: the board is full") def __str__(self): return "MCTS {}".format(self.player) # 多了下面这一串代码 class Human_Player(object): def __init__(self): pass def set_player_ind(self, p): self.player = p def get_action(self, board): sensible_moves = board.availables if len(sensible_moves) > 0: # print(sensible_moves) move = int(input("Input the move:")) while (move not in sensible_moves ): print(sensible_moves) move = int(input("Input the move again:")) return move else: print("WARNING: the board is full") def __str__(self): return "Human {}".format(self.player)