"""Markov Decision Processes (Chapter 17) First we define an MDP, and the special case of a GridMDP, in which states are laid out in a 2-dimensional grid. We also represent a policy as a dictionary of {state: action} pairs, and a Utility function as a dictionary of {state: number} pairs. We then define the value_iteration and policy_iteration algorithms.""" from utils import argmax, vector_add, orientations, turn_right, turn_left import random import numpy as np from collections import defaultdict class MDP: """A Markov Decision Process, defined by an initial state, transition model, and reward function. We also keep track of a gamma value, for use by algorithms. The transition model is represented somewhat differently from the text. Instead of P(s' | s, a) being a probability number for each state/state/action triplet, we instead have T(s, a) return a list of (p, s') pairs. We also keep track of the possible states, terminal states, and actions for each state. [page 646]""" def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9): if not (0 < gamma <= 1): raise ValueError("An MDP must have 0 < gamma <= 1") # collect states from transitions table if not passed. self.states = states or self.get_states_from_transitions(transitions) self.init = init if isinstance(actlist, list): # if actlist is a list, all states have the same actions self.actlist = actlist elif isinstance(actlist, dict): # if actlist is a dict, different actions for each state self.actlist = actlist self.terminals = terminals self.transitions = transitions or {} if not self.transitions: print("Warning: Transition table is empty.") self.gamma = gamma self.reward = reward or {s: 0 for s in self.states} # self.check_consistency() def R(self, state): """Return a numeric reward for this state.""" return self.reward[state] def T(self, state, action): """Transition model. From a state and an action, return a list of (probability, result-state) pairs.""" if not self.transitions: raise ValueError("Transition model is missing") else: return self.transitions[state][action] def actions(self, state): """Return a list of actions that can be performed in this state. By default, a fixed list of actions, except for terminal states. Override this method if you need to specialize by state.""" if state in self.terminals: return [None] else: return self.actlist def get_states_from_transitions(self, transitions): if isinstance(transitions, dict): s1 = set(transitions.keys()) s2 = set(tr[1] for actions in transitions.values() for effects in actions.values() for tr in effects) return s1.union(s2) else: print('Could not retrieve states from transitions') return None def check_consistency(self): # check that all states in transitions are valid assert set(self.states) == self.get_states_from_transitions(self.transitions) # check that init is a valid state assert self.init in self.states # check reward for each state assert set(self.reward.keys()) == set(self.states) # check that all terminals are valid states assert all(t in self.states for t in self.terminals) # check that probability distributions for all actions sum to 1 for s1, actions in self.transitions.items(): for a in actions.keys(): s = 0 for o in actions[a]: s += o[0] assert abs(s - 1) < 0.001 class MDP2(MDP): """ Inherits from MDP. Handles terminal states, and transitions to and from terminal states better. """ def __init__(self, init, actlist, terminals, transitions, reward=None, gamma=0.9): MDP.__init__(self, init, actlist, terminals, transitions, reward, gamma=gamma) def T(self, state, action): if action is None: return [(0.0, state)] else: return self.transitions[state][action] class GridMDP(MDP): """A two-dimensional grid MDP, as in [Figure 17.1]. All you have to do is specify the grid as a list of lists of rewards; use None for an obstacle (unreachable state). Also, you should specify the terminal states. An action is an (x, y) unit vector; e.g. (1, 0) means move east.""" def __init__(self, grid, terminals, init=(0, 0), gamma=.9): grid.reverse() # because we want row 0 on bottom, not on top reward = {} states = set() self.rows = len(grid) self.cols = len(grid[0]) self.grid = grid for x in range(self.cols): for y in range(self.rows): if grid[y][x]: states.add((x, y)) reward[(x, y)] = grid[y][x] self.states = states actlist = orientations transitions = {} for s in states: transitions[s] = {} for a in actlist: transitions[s][a] = self.calculate_T(s, a) MDP.__init__(self, init, actlist=actlist, terminals=terminals, transitions=transitions, reward=reward, states=states, gamma=gamma) def calculate_T(self, state, action): if action: return [(0.8, self.go(state, action)), (0.1, self.go(state, turn_right(action))), (0.1, self.go(state, turn_left(action)))] else: return [(0.0, state)] def T(self, state, action): return self.transitions[state][action] if action else [(0.0, state)] def go(self, state, direction): """Return the state that results from going in this direction.""" state1 = vector_add(state, direction) return state1 if state1 in self.states else state def to_grid(self, mapping): """Convert a mapping from (x, y) to v into a [[..., v, ...]] grid.""" return list(reversed([[mapping.get((x, y), None) for x in range(self.cols)] for y in range(self.rows)])) def to_arrows(self, policy): chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'} return self.to_grid({s: chars[a] for (s, a) in policy.items()}) # ______________________________________________________________________________ """ [Figure 17.1] A 4x3 grid environment that presents the agent with a sequential decision problem. """ sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1], [-0.04, None, -0.04, -1], [-0.04, -0.04, -0.04, -0.04]], terminals=[(3, 2), (3, 1)]) # ______________________________________________________________________________ def value_iteration(mdp, epsilon=0.001): """Solving an MDP by value iteration. [Figure 17.4]""" U1 = {s: 0 for s in mdp.states} R, T, gamma = mdp.R, mdp.T, mdp.gamma while True: U = U1.copy() delta = 0 for s in mdp.states: U1[s] = R(s) + gamma * max(sum(p*U[s1] for (p, s1) in T(s, a)) for a in mdp.actions(s)) delta = max(delta, abs(U1[s] - U[s])) if delta <= epsilon*(1 - gamma)/gamma: return U def best_policy(mdp, U): """Given an MDP and a utility function U, determine the best policy, as a mapping from state to action. (Equation 17.4)""" pi = {} for s in mdp.states: pi[s] = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp)) return pi def expected_utility(a, s, U, mdp): """The expected utility of doing a in state s, according to the MDP and U.""" return sum(p*U[s1] for (p, s1) in mdp.T(s, a)) # ______________________________________________________________________________ def policy_iteration(mdp): """Solve an MDP by policy iteration [Figure 17.7]""" U = {s: 0 for s in mdp.states} pi = {s: random.choice(mdp.actions(s)) for s in mdp.states} while True: U = policy_evaluation(pi, U, mdp) unchanged = True for s in mdp.states: a = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp)) if a != pi[s]: pi[s] = a unchanged = False if unchanged: return pi def policy_evaluation(pi, U, mdp, k=20): """Return an updated utility mapping U from each state in the MDP to its utility, using an approximation (modified policy iteration).""" R, T, gamma = mdp.R, mdp.T, mdp.gamma for i in range(k): for s in mdp.states: U[s] = R(s) + gamma*sum(p*U[s1] for (p, s1) in T(s, pi[s])) return U class POMDP(MDP): """A Partially Observable Markov Decision Process, defined by a transition model P(s'|s,a), actions A(s), a reward function R(s), and a sensor model P(e|s). We also keep track of a gamma value, for use by algorithms. The transition and the sensor models are defined as matrices. We also keep track of the possible states and actions for each state. [page 659].""" def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95): """Initialize variables of the pomdp""" if not (0 < gamma <= 1): raise ValueError('A POMDP must have 0 < gamma <= 1') self.states = states self.actions = actions # transition model cannot be undefined self.t_prob = transitions or {} if not self.t_prob: print('Warning: Transition model is undefined') # sensor model cannot be undefined self.e_prob = evidences or {} if not self.e_prob: print('Warning: Sensor model is undefined') self.gamma = gamma self.rewards = rewards def remove_dominated_plans(self, input_values): """ Remove dominated plans. This method finds all the lines contributing to the upper surface and removes those which don't. """ values = [val for action in input_values for val in input_values[action]] values.sort(key=lambda x: x[0], reverse=True) best = [values[0]] y1_max = max(val[1] for val in values) tgt = values[0] prev_b = 0 prev_ix = 0 while tgt[1] != y1_max: min_b = 1 min_ix = 0 for i in range(prev_ix + 1, len(values)): if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0: trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1]) if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b: min_b = trans_b min_ix = i prev_b = min_b prev_ix = min_ix tgt = values[min_ix] best.append(tgt) return self.generate_mapping(best, input_values) def remove_dominated_plans_fast(self, input_values): """ Remove dominated plans using approximations. Resamples the upper boundary at intervals of 100 and finds the maximum values at these points. """ values = [val for action in input_values for val in input_values[action]] values.sort(key=lambda x: x[0], reverse=True) best = [] sr = 100 for i in range(sr + 1): x = i / float(sr) maximum = (values[0][1] - values[0][0]) * x + values[0][0] tgt = values[0] for value in values: val = (value[1] - value[0]) * x + value[0] if val > maximum: maximum = val tgt = value if all(any(tgt != v) for v in best): best.append(np.array(tgt)) return self.generate_mapping(best, input_values) def generate_mapping(self, best, input_values): """Generate mappings after removing dominated plans""" mapping = defaultdict(list) for value in best: for action in input_values: if any(all(value == v) for v in input_values[action]): mapping[action].append(value) return mapping def max_difference(self, U1, U2): """Find maximum difference between two utility mappings""" for k, v in U1.items(): sum1 = 0 for element in U1[k]: sum1 += sum(element) sum2 = 0 for element in U2[k]: sum2 += sum(element) return abs(sum1 - sum2) class Matrix: """Matrix operations class""" @staticmethod def add(A, B): """Add two matrices A and B""" res = [] for i in range(len(A)): row = [] for j in range(len(A[0])): row.append(A[i][j] + B[i][j]) res.append(row) return res @staticmethod def scalar_multiply(a, B): """Multiply scalar a to matrix B""" for i in range(len(B)): for j in range(len(B[0])): B[i][j] = a * B[i][j] return B @staticmethod def multiply(A, B): """Multiply two matrices A and B element-wise""" matrix = [] for i in range(len(B)): row = [] for j in range(len(B[0])): row.append(B[i][j] * A[j][i]) matrix.append(row) return matrix @staticmethod def matmul(A, B): """Inner-product of two matrices""" return [[sum(ele_a*ele_b for ele_a, ele_b in zip(row_a, col_b)) for col_b in list(zip(*B))] for row_a in A] @staticmethod def transpose(A): """Transpose a matrix""" return [list(i) for i in zip(*A)] def pomdp_value_iteration(pomdp, epsilon=0.1): """Solving a POMDP by value iteration.""" U = {'':[[0]* len(pomdp.states)]} count = 0 while True: count += 1 prev_U = U values = [val for action in U for val in U[action]] value_matxs = [] for i in values: for j in values: value_matxs.append([i, j]) U1 = defaultdict(list) for action in pomdp.actions: for u in value_matxs: u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)], Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))), [[1], [1]]) u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]]) U1[action].append(u1[0]) U = pomdp.remove_dominated_plans_fast(U1) # replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations if count > 10: if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma: return U __doc__ += """ >>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01)) >>> sequential_decision_environment.to_arrows(pi) [['>', '>', '>', '.'], ['^', None, '^', '.'], ['^', '>', '^', '<']] >>> from utils import print_table >>> print_table(sequential_decision_environment.to_arrows(pi)) > > > . ^ None ^ . ^ > ^ < >>> print_table(sequential_decision_environment.to_arrows(policy_iteration(sequential_decision_environment))) > > > . ^ None ^ . ^ > ^ < """ # noqa """ s = { 'a' : { 'plan1' : [(0.2, 'a'), (0.3, 'b'), (0.3, 'c'), (0.2, 'd')], 'plan2' : [(0.4, 'a'), (0.15, 'b'), (0.45, 'c')], 'plan3' : [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')], }, 'b' : { 'plan1' : [(0.2, 'a'), (0.6, 'b'), (0.2, 'c'), (0.1, 'd')], 'plan2' : [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')], 'plan3' : [(0.3, 'a'), (0.3, 'b'), (0.4, 'c')], }, 'c' : { 'plan1' : [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')], 'plan2' : [(0.5, 'a'), (0.3, 'b'), (0.1, 'c'), (0.1, 'd')], 'plan3' : [(0.1, 'a'), (0.3, 'b'), (0.1, 'c'), (0.5, 'd')], }, } """