{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### PARTIAL ORDER PLANNER\n",
"A partial-order planning algorithm is significantly different from a total-order planner.\n",
"The way a partial-order plan works enables it to take advantage of _problem decomposition_ and work on each subproblem separately.\n",
"It works on several subgoals independently, solves them with several subplans, and then combines the plan.\n",
"
\n",
"A partial-order planner also follows the **least commitment** strategy, where it delays making choices for as long as possible.\n",
"Variables are not bound unless it is absolutely necessary and new actions are chosen only if the existing actions cannot fulfil the required precondition.\n",
"
\n",
"Any planning algorithm that can place two actions into a plan without specifying which comes first is called a **partial-order planner**.\n",
"A partial-order planner searches through the space of plans rather than the space of states, which makes it perform better for certain problems.\n",
"
\n",
"
\n",
"Let's have a look at the `PartialOrderPlanner` class."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from planning import *\n",
"from notebook import psource"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"\n",
"
class PartialOrderPlanner:\n",
"\n",
" def __init__(self, planningproblem):\n",
" self.planningproblem = planningproblem\n",
" self.initialize()\n",
"\n",
" def initialize(self):\n",
" """Initialize all variables"""\n",
" self.causal_links = []\n",
" self.start = Action('Start', [], self.planningproblem.init)\n",
" self.finish = Action('Finish', self.planningproblem.goals, [])\n",
" self.actions = set()\n",
" self.actions.add(self.start)\n",
" self.actions.add(self.finish)\n",
" self.constraints = set()\n",
" self.constraints.add((self.start, self.finish))\n",
" self.agenda = set()\n",
" for precond in self.finish.precond:\n",
" self.agenda.add((precond, self.finish))\n",
" self.expanded_actions = self.expand_actions()\n",
"\n",
" def expand_actions(self, name=None):\n",
" """Generate all possible actions with variable bindings for precondition selection heuristic"""\n",
"\n",
" objects = set(arg for clause in self.planningproblem.init for arg in clause.args)\n",
" expansions = []\n",
" action_list = []\n",
" if name is not None:\n",
" for action in self.planningproblem.actions:\n",
" if str(action.name) == name:\n",
" action_list.append(action)\n",
" else:\n",
" action_list = self.planningproblem.actions\n",
"\n",
" for action in action_list:\n",
" for permutation in itertools.permutations(objects, len(action.args)):\n",
" bindings = unify(Expr(action.name, *action.args), Expr(action.name, *permutation))\n",
" if bindings is not None:\n",
" new_args = []\n",
" for arg in action.args:\n",
" if arg in bindings:\n",
" new_args.append(bindings[arg])\n",
" else:\n",
" new_args.append(arg)\n",
" new_expr = Expr(str(action.name), *new_args)\n",
" new_preconds = []\n",
" for precond in action.precond:\n",
" new_precond_args = []\n",
" for arg in precond.args:\n",
" if arg in bindings:\n",
" new_precond_args.append(bindings[arg])\n",
" else:\n",
" new_precond_args.append(arg)\n",
" new_precond = Expr(str(precond.op), *new_precond_args)\n",
" new_preconds.append(new_precond)\n",
" new_effects = []\n",
" for effect in action.effect:\n",
" new_effect_args = []\n",
" for arg in effect.args:\n",
" if arg in bindings:\n",
" new_effect_args.append(bindings[arg])\n",
" else:\n",
" new_effect_args.append(arg)\n",
" new_effect = Expr(str(effect.op), *new_effect_args)\n",
" new_effects.append(new_effect)\n",
" expansions.append(Action(new_expr, new_preconds, new_effects))\n",
"\n",
" return expansions\n",
"\n",
" def find_open_precondition(self):\n",
" """Find open precondition with the least number of possible actions"""\n",
"\n",
" number_of_ways = dict()\n",
" actions_for_precondition = dict()\n",
" for element in self.agenda:\n",
" open_precondition = element[0]\n",
" possible_actions = list(self.actions) + self.expanded_actions\n",
" for action in possible_actions:\n",
" for effect in action.effect:\n",
" if effect == open_precondition:\n",
" if open_precondition in number_of_ways:\n",
" number_of_ways[open_precondition] += 1\n",
" actions_for_precondition[open_precondition].append(action)\n",
" else:\n",
" number_of_ways[open_precondition] = 1\n",
" actions_for_precondition[open_precondition] = [action]\n",
"\n",
" number = sorted(number_of_ways, key=number_of_ways.__getitem__)\n",
" \n",
" for k, v in number_of_ways.items():\n",
" if v == 0:\n",
" return None, None, None\n",
"\n",
" act1 = None\n",
" for element in self.agenda:\n",
" if element[0] == number[0]:\n",
" act1 = element[1]\n",
" break\n",
"\n",
" if number[0] in self.expanded_actions:\n",
" self.expanded_actions.remove(number[0])\n",
"\n",
" return number[0], act1, actions_for_precondition[number[0]]\n",
"\n",
" def find_action_for_precondition(self, oprec):\n",
" """Find action for a given precondition"""\n",
"\n",
" # either\n",
" # choose act0 E Actions such that act0 achieves G\n",
" for action in self.actions:\n",
" for effect in action.effect:\n",
" if effect == oprec:\n",
" return action, 0\n",
"\n",
" # or\n",
" # choose act0 E Actions such that act0 achieves G\n",
" for action in self.planningproblem.actions:\n",
" for effect in action.effect:\n",
" if effect.op == oprec.op:\n",
" bindings = unify(effect, oprec)\n",
" if bindings is None:\n",
" break\n",
" return action, bindings\n",
"\n",
" def generate_expr(self, clause, bindings):\n",
" """Generate atomic expression from generic expression given variable bindings"""\n",
"\n",
" new_args = []\n",
" for arg in clause.args:\n",
" if arg in bindings:\n",
" new_args.append(bindings[arg])\n",
" else:\n",
" new_args.append(arg)\n",
"\n",
" try:\n",
" return Expr(str(clause.name), *new_args)\n",
" except:\n",
" return Expr(str(clause.op), *new_args)\n",
" \n",
" def generate_action_object(self, action, bindings):\n",
" """Generate action object given a generic action andvariable bindings"""\n",
"\n",
" # if bindings is 0, it means the action already exists in self.actions\n",
" if bindings == 0:\n",
" return action\n",
"\n",
" # bindings cannot be None\n",
" else:\n",
" new_expr = self.generate_expr(action, bindings)\n",
" new_preconds = []\n",
" for precond in action.precond:\n",
" new_precond = self.generate_expr(precond, bindings)\n",
" new_preconds.append(new_precond)\n",
" new_effects = []\n",
" for effect in action.effect:\n",
" new_effect = self.generate_expr(effect, bindings)\n",
" new_effects.append(new_effect)\n",
" return Action(new_expr, new_preconds, new_effects)\n",
"\n",
" def cyclic(self, graph):\n",
" """Check cyclicity of a directed graph"""\n",
"\n",
" new_graph = dict()\n",
" for element in graph:\n",
" if element[0] in new_graph:\n",
" new_graph[element[0]].append(element[1])\n",
" else:\n",
" new_graph[element[0]] = [element[1]]\n",
"\n",
" path = set()\n",
"\n",
" def visit(vertex):\n",
" path.add(vertex)\n",
" for neighbor in new_graph.get(vertex, ()):\n",
" if neighbor in path or visit(neighbor):\n",
" return True\n",
" path.remove(vertex)\n",
" return False\n",
"\n",
" value = any(visit(v) for v in new_graph)\n",
" return value\n",
"\n",
" def add_const(self, constraint, constraints):\n",
" """Add the constraint to constraints if the resulting graph is acyclic"""\n",
"\n",
" if constraint[0] == self.finish or constraint[1] == self.start:\n",
" return constraints\n",
"\n",
" new_constraints = set(constraints)\n",
" new_constraints.add(constraint)\n",
"\n",
" if self.cyclic(new_constraints):\n",
" return constraints\n",
" return new_constraints\n",
"\n",
" def is_a_threat(self, precondition, effect):\n",
" """Check if effect is a threat to precondition"""\n",
"\n",
" if (str(effect.op) == 'Not' + str(precondition.op)) or ('Not' + str(effect.op) == str(precondition.op)):\n",
" if effect.args == precondition.args:\n",
" return True\n",
" return False\n",
"\n",
" def protect(self, causal_link, action, constraints):\n",
" """Check and resolve threats by promotion or demotion"""\n",
"\n",
" threat = False\n",
" for effect in action.effect:\n",
" if self.is_a_threat(causal_link[1], effect):\n",
" threat = True\n",
" break\n",
"\n",
" if action != causal_link[0] and action != causal_link[2] and threat:\n",
" # try promotion\n",
" new_constraints = set(constraints)\n",
" new_constraints.add((action, causal_link[0]))\n",
" if not self.cyclic(new_constraints):\n",
" constraints = self.add_const((action, causal_link[0]), constraints)\n",
" else:\n",
" # try demotion\n",
" new_constraints = set(constraints)\n",
" new_constraints.add((causal_link[2], action))\n",
" if not self.cyclic(new_constraints):\n",
" constraints = self.add_const((causal_link[2], action), constraints)\n",
" else:\n",
" # both promotion and demotion fail\n",
" print('Unable to resolve a threat caused by', action, 'onto', causal_link)\n",
" return\n",
" return constraints\n",
"\n",
" def convert(self, constraints):\n",
" """Convert constraints into a dict of Action to set orderings"""\n",
"\n",
" graph = dict()\n",
" for constraint in constraints:\n",
" if constraint[0] in graph:\n",
" graph[constraint[0]].add(constraint[1])\n",
" else:\n",
" graph[constraint[0]] = set()\n",
" graph[constraint[0]].add(constraint[1])\n",
" return graph\n",
"\n",
" def toposort(self, graph):\n",
" """Generate topological ordering of constraints"""\n",
"\n",
" if len(graph) == 0:\n",
" return\n",
"\n",
" graph = graph.copy()\n",
"\n",
" for k, v in graph.items():\n",
" v.discard(k)\n",
"\n",
" extra_elements_in_dependencies = _reduce(set.union, graph.values()) - set(graph.keys())\n",
"\n",
" graph.update({element:set() for element in extra_elements_in_dependencies})\n",
" while True:\n",
" ordered = set(element for element, dependency in graph.items() if len(dependency) == 0)\n",
" if not ordered:\n",
" break\n",
" yield ordered\n",
" graph = {element: (dependency - ordered) for element, dependency in graph.items() if element not in ordered}\n",
" if len(graph) != 0:\n",
" raise ValueError('The graph is not acyclic and cannot be linearly ordered')\n",
"\n",
" def display_plan(self):\n",
" """Display causal links, constraints and the plan"""\n",
"\n",
" print('Causal Links')\n",
" for causal_link in self.causal_links:\n",
" print(causal_link)\n",
"\n",
" print('\\nConstraints')\n",
" for constraint in self.constraints:\n",
" print(constraint[0], '<', constraint[1])\n",
"\n",
" print('\\nPartial Order Plan')\n",
" print(list(reversed(list(self.toposort(self.convert(self.constraints))))))\n",
"\n",
" def execute(self, display=True):\n",
" """Execute the algorithm"""\n",
"\n",
" step = 1\n",
" self.tries = 1\n",
" while len(self.agenda) > 0:\n",
" step += 1\n",
" # select <G, act1> from Agenda\n",
" try:\n",
" G, act1, possible_actions = self.find_open_precondition()\n",
" except IndexError:\n",
" print('Probably Wrong')\n",
" break\n",
"\n",
" act0 = possible_actions[0]\n",
" # remove <G, act1> from Agenda\n",
" self.agenda.remove((G, act1))\n",
"\n",
" # For actions with variable number of arguments, use least commitment principle\n",
" # act0_temp, bindings = self.find_action_for_precondition(G)\n",
" # act0 = self.generate_action_object(act0_temp, bindings)\n",
"\n",
" # Actions = Actions U {act0}\n",
" self.actions.add(act0)\n",
"\n",
" # Constraints = add_const(start < act0, Constraints)\n",
" self.constraints = self.add_const((self.start, act0), self.constraints)\n",
"\n",
" # for each CL E CausalLinks do\n",
" # Constraints = protect(CL, act0, Constraints)\n",
" for causal_link in self.causal_links:\n",
" self.constraints = self.protect(causal_link, act0, self.constraints)\n",
"\n",
" # Agenda = Agenda U {<P, act0>: P is a precondition of act0}\n",
" for precondition in act0.precond:\n",
" self.agenda.add((precondition, act0))\n",
"\n",
" # Constraints = add_const(act0 < act1, Constraints)\n",
" self.constraints = self.add_const((act0, act1), self.constraints)\n",
"\n",
" # CausalLinks U {<act0, G, act1>}\n",
" if (act0, G, act1) not in self.causal_links:\n",
" self.causal_links.append((act0, G, act1))\n",
"\n",
" # for each A E Actions do\n",
" # Constraints = protect(<act0, G, act1>, A, Constraints)\n",
" for action in self.actions:\n",
" self.constraints = self.protect((act0, G, act1), action, self.constraints)\n",
"\n",
" if step > 200:\n",
" print('Couldn\\'t find a solution')\n",
" return None, None\n",
"\n",
" if display:\n",
" self.display_plan()\n",
" else:\n",
" return self.constraints, self.causal_links \n",
"