{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# February 21, 2020"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## PLANNING IN THE REAL WORLD\n",
"---\n",
"## PROBLEM\n",
"The `Problem` class is a wrapper for `PlanningProblem` with some additional functionality and data-structures to handle real-world planning problems that involve time and resource constraints.\n",
"The `Problem` class includes everything that the `PlanningProblem` class includes.\n",
"Additionally, it also includes the following attributes essential to define a real-world planning problem:\n",
"- a list of `jobs` to be done\n",
"- a dictionary of `resources`\n",
"\n",
"It also overloads the `act` method to call the `do_action` method of the `HLA` class, \n",
"and also includes a new method `refinements` that finds refinements or primitive actions for high level actions.\n",
"
\n",
"`hierarchical_search` and `angelic_search` are also built into the `Problem` class to solve such planning problems."
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"from planningv2 import *\n",
"from notebook import psource"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"\n",
"
class Problem(PlanningProblem):\n",
" """\n",
" Define real-world problems by aggregating resources as numerical quantities instead of\n",
" named entities.\n",
"\n",
" This class is identical to PDLL, except that it overloads the act function to handle\n",
" resource and ordering conditions imposed by HLA as opposed to Action.\n",
" """\n",
" def __init__(self, init, goals, actions, jobs=None, resources=None):\n",
" super().__init__(init, goals, actions)\n",
" self.jobs = jobs\n",
" self.resources = resources or {}\n",
"\n",
" def act(self, action):\n",
" """\n",
" Performs the HLA given as argument.\n",
"\n",
" Note that this is different from the superclass action - where the parameter was an\n",
" Expression. For real world problems, an Expr object isn't enough to capture all the\n",
" detail required for executing the action - resources, preconditions, etc need to be\n",
" checked for too.\n",
" """\n",
" args = action.args\n",
" list_action = first(a for a in self.actions if a.name == action.name)\n",
" if list_action is None:\n",
" raise Exception("Action '{}' not found".format(action.name))\n",
" self.init = list_action.do_action(self.jobs, self.resources, self.init, args).clauses\n",
"\n",
" def refinements(hla, state, library): # refinements may be (multiple) HLA themselves ...\n",
" """\n",
" state is a Problem, containing the current state kb\n",
" library is a dictionary containing details for every possible refinement. eg:\n",
" {\n",
" 'HLA': [\n",
" 'Go(Home, SFO)',\n",
" 'Go(Home, SFO)',\n",
" 'Drive(Home, SFOLongTermParking)',\n",
" 'Shuttle(SFOLongTermParking, SFO)',\n",
" 'Taxi(Home, SFO)'\n",
" ],\n",
" 'steps': [\n",
" ['Drive(Home, SFOLongTermParking)', 'Shuttle(SFOLongTermParking, SFO)'],\n",
" ['Taxi(Home, SFO)'],\n",
" [],\n",
" [],\n",
" []\n",
" ],\n",
" # empty refinements indicate a primitive action\n",
" 'precond': [\n",
" ['At(Home) & Have(Car)'],\n",
" ['At(Home)'],\n",
" ['At(Home) & Have(Car)'],\n",
" ['At(SFOLongTermParking)'],\n",
" ['At(Home)']\n",
" ],\n",
" 'effect': [\n",
" ['At(SFO) & ~At(Home)'],\n",
" ['At(SFO) & ~At(Home)'],\n",
" ['At(SFOLongTermParking) & ~At(Home)'],\n",
" ['At(SFO) & ~At(SFOLongTermParking)'],\n",
" ['At(SFO) & ~At(Home)']\n",
" ]\n",
" }\n",
" """\n",
" e = Expr(hla.name, hla.args)\n",
" indices = [i for i, x in enumerate(library['HLA']) if expr(x).op == hla.name]\n",
" for i in indices:\n",
" actions = []\n",
" for j in range(len(library['steps'][i])):\n",
" # find the index of the step [j] of the HLA \n",
" index_step = [k for k,x in enumerate(library['HLA']) if x == library['steps'][i][j]][0]\n",
" precond = library['precond'][index_step][0] # preconditions of step [j]\n",
" effect = library['effect'][index_step][0] # effect of step [j]\n",
" actions.append(HLA(library['steps'][i][j], precond, effect))\n",
" yield actions\n",
"\n",
" def hierarchical_search(problem, hierarchy):\n",
" """\n",
" [Figure 11.5] 'Hierarchical Search, a Breadth First Search implementation of Hierarchical\n",
" Forward Planning Search'\n",
" The problem is a real-world problem defined by the problem class, and the hierarchy is\n",
" a dictionary of HLA - refinements (see refinements generator for details)\n",
" """\n",
" act = Node(problem.init, None, [problem.actions[0]])\n",
" frontier = deque()\n",
" frontier.append(act)\n",
" while True:\n",
" if not frontier:\n",
" return None\n",
" plan = frontier.popleft()\n",
" (hla, index) = Problem.find_hla(plan, hierarchy) # finds the first non primitive hla in plan actions\n",
" prefix = plan.action[:index]\n",
" outcome = Problem(Problem.result(problem.init, prefix), problem.goals , problem.actions )\n",
" suffix = plan.action[index+1:]\n",
" if not hla: # hla is None and plan is primitive\n",
" if outcome.goal_test():\n",
" return plan.action\n",
" else:\n",
" for sequence in Problem.refinements(hla, outcome, hierarchy): # find refinements\n",
" frontier.append(Node(outcome.init, plan, prefix + sequence+ suffix))\n",
"\n",
" def result(state, actions):\n",
" """The outcome of applying an action to the current problem"""\n",
" for a in actions: \n",
" if a.check_precond(state, a.args):\n",
" state = a(state, a.args).clauses\n",
" return state\n",
" \n",
"\n",
" def angelic_search(problem, hierarchy, initialPlan):\n",
" """\n",
"\t[Figure 11.8] A hierarchical planning algorithm that uses angelic semantics to identify and\n",
"\tcommit to high-level plans that work while avoiding high-level plans that don’t. \n",
"\tThe predicate MAKING-PROGRESS checks to make sure that we aren’t stuck in an infinite regression\n",
"\tof refinements. \n",
"\tAt top level, call ANGELIC -SEARCH with [Act ] as the initialPlan .\n",
"\n",
" initialPlan contains a sequence of HLA's with angelic semantics \n",
"\n",
" The possible effects of an angelic HLA in initialPlan are : \n",
" ~ : effect remove\n",
" $+: effect possibly add\n",
" $-: effect possibly remove\n",
" $$: possibly add or remove\n",
"\t"""\n",
" frontier = deque(initialPlan)\n",
" while True: \n",
" if not frontier:\n",
" return None\n",
" plan = frontier.popleft() # sequence of HLA/Angelic HLA's \n",
" opt_reachable_set = Problem.reach_opt(problem.init, plan)\n",
" pes_reachable_set = Problem.reach_pes(problem.init, plan)\n",
" if problem.intersects_goal(opt_reachable_set): \n",
" if Problem.is_primitive( plan, hierarchy ): \n",
" return ([x for x in plan.action])\n",
" guaranteed = problem.intersects_goal(pes_reachable_set) \n",
" if guaranteed and Problem.making_progress(plan, initialPlan):\n",
" final_state = guaranteed[0] # any element of guaranteed \n",
" return Problem.decompose(hierarchy, problem, plan, final_state, pes_reachable_set)\n",
" hla, index = Problem.find_hla(plan, hierarchy) # there should be at least one HLA/Angelic_HLA, otherwise plan would be primitive.\n",
" prefix = plan.action[:index]\n",
" suffix = plan.action[index+1:]\n",
" outcome = Problem(Problem.result(problem.init, prefix), problem.goals , problem.actions )\n",
" for sequence in Problem.refinements(hla, outcome, hierarchy): # find refinements\n",
" frontier.append(Angelic_Node(outcome.init, plan, prefix + sequence+ suffix, prefix+sequence+suffix))\n",
"\n",
"\n",
" def intersects_goal(problem, reachable_set):\n",
" """\n",
" Find the intersection of the reachable states and the goal\n",
" """\n",
" return [y for x in list(reachable_set.keys()) for y in reachable_set[x] if all(goal in y for goal in problem.goals)] \n",
"\n",
"\n",
" def is_primitive(plan, library):\n",
" """\n",
" checks if the hla is primitive action \n",
" """\n",
" for hla in plan.action: \n",
" indices = [i for i, x in enumerate(library['HLA']) if expr(x).op == hla.name]\n",
" for i in indices:\n",
" if library["steps"][i]: \n",
" return False\n",
" return True\n",
" \n",
"\n",
"\n",
" def reach_opt(init, plan): \n",
" """\n",
" Finds the optimistic reachable set of the sequence of actions in plan \n",
" """\n",
" reachable_set = {0: [init]}\n",
" optimistic_description = plan.action #list of angelic actions with optimistic description\n",
" return Problem.find_reachable_set(reachable_set, optimistic_description)\n",
" \n",
"\n",
" def reach_pes(init, plan): \n",
" """ \n",
" Finds the pessimistic reachable set of the sequence of actions in plan\n",
" """\n",
" reachable_set = {0: [init]}\n",
" pessimistic_description = plan.action_pes # list of angelic actions with pessimistic description\n",
" return Problem.find_reachable_set(reachable_set, pessimistic_description)\n",
"\n",
" def find_reachable_set(reachable_set, action_description):\n",
" """\n",
"\tFinds the reachable states of the action_description when applied in each state of reachable set.\n",
"\t"""\n",
" for i in range(len(action_description)):\n",
" reachable_set[i+1]=[]\n",
" if type(action_description[i]) is Angelic_HLA:\n",
" possible_actions = action_description[i].angelic_action()\n",
" else: \n",
" possible_actions = action_description\n",
" for action in possible_actions:\n",
" for state in reachable_set[i]:\n",
" if action.check_precond(state , action.args) :\n",
" if action.effect[0] :\n",
" new_state = action(state, action.args).clauses\n",
" reachable_set[i+1].append(new_state)\n",
" else: \n",
" reachable_set[i+1].append(state)\n",
" return reachable_set\n",
"\n",
" def find_hla(plan, hierarchy):\n",
" """\n",
" Finds the the first HLA action in plan.action, which is not primitive\n",
" and its corresponding index in plan.action\n",
" """\n",
" hla = None\n",
" index = len(plan.action)\n",
" for i in range(len(plan.action)): # find the first HLA in plan, that is not primitive\n",
" if not Problem.is_primitive(Node(plan.state, plan.parent, [plan.action[i]]), hierarchy):\n",
" hla = plan.action[i] \n",
" index = i\n",
" break\n",
" return hla, index\n",
"\n",
" def making_progress(plan, initialPlan):\n",
" """ \n",
" Prevents from infinite regression of refinements \n",
"\n",
" (infinite regression of refinements happens when the algorithm finds a plan that \n",
" its pessimistic reachable set intersects the goal inside a call to decompose on the same plan, in the same circumstances) \n",
" """\n",
" for i in range(len(initialPlan)):\n",
" if (plan == initialPlan[i]):\n",
" return False\n",
" return True \n",
"\n",
" def decompose(hierarchy, s_0, plan, s_f, reachable_set):\n",
" solution = [] \n",
" i = max(reachable_set.keys())\n",
" while plan.action_pes: \n",
" action = plan.action_pes.pop()\n",
" if (i==0): \n",
" return solution\n",
" s_i = Problem.find_previous_state(s_f, reachable_set,i, action) \n",
" problem = Problem(s_i, s_f , plan.action)\n",
" angelic_call = Problem.angelic_search(problem, hierarchy, [Angelic_Node(s_i, Node(None), [action],[action])])\n",
" if angelic_call:\n",
" for x in angelic_call: \n",
" solution.insert(0,x)\n",
" else: \n",
" return None\n",
" s_f = s_i\n",
" i-=1\n",
" return solution\n",
"\n",
"\n",
" def find_previous_state(s_f, reachable_set, i, action):\n",
" """\n",
" Given a final state s_f and an action finds a state s_i in reachable_set \n",
" such that when action is applied to state s_i returns s_f. \n",
" """\n",
" s_i = reachable_set[i-1][0]\n",
" for state in reachable_set[i-1]:\n",
" if s_f in [x for x in Problem.reach_pes(state, Angelic_Node(state, None, [action],[action]))[1]]:\n",
" s_i =state\n",
" break\n",
" return s_i\n",
"
def debug(f):\n",
" def g(*args):\n",
" print ("Calling: {} with args: {}".format(f.__name__, args))\n",
" val = f(*args)\n",
" print ("Exiting: {} with value: {}".format(f.__name__, val))\n",
" print()\n",
" return val\n",
" return g \n",
"
class HLA(Action):\n",
" """\n",
" Define Actions for the real-world (that may be refined further), and satisfy resource\n",
" constraints.\n",
" """\n",
" unique_group = 1\n",
"\n",
" def __init__(self, action, precond=None, effect=None, duration=0,\n",
" consume=None, use=None):\n",
" """\n",
" As opposed to actions, to define HLA, we have added constraints.\n",
" duration holds the amount of time required to execute the task\n",
" consumes holds a dictionary representing the resources the task consumes\n",
" uses holds a dictionary representing the resources the task uses\n",
" """\n",
" precond = precond or [None]\n",
" effect = effect or [None]\n",
" super().__init__(action, precond, effect)\n",
" self.duration = duration\n",
" self.consumes = consume or {}\n",
" self.uses = use or {}\n",
" self.completed = False\n",
" # self.priority = -1 # must be assigned in relation to other HLAs\n",
" # self.job_group = -1 # must be assigned in relation to other HLAs\n",
"\n",
" @debug\n",
" def do_action(self, job_order, available_resources, kb, args):\n",
" """\n",
" An HLA based version of act - along with knowledge base updation, it handles\n",
" resource checks, and ensures the actions are executed in the correct order.\n",
" """\n",
" # print(self.name)\n",
" if not self.has_usable_resource(available_resources):\n",
" raise Exception('Not enough usable resources to execute {}'.format(self.name))\n",
" if not self.has_consumable_resource(available_resources):\n",
" raise Exception('Not enough consumable resources to execute {}'.format(self.name))\n",
" if not self.inorder(job_order):\n",
" raise Exception("Can't execute {} - execute prerequisite actions first".\n",
" format(self.name))\n",
" kb = super().act(kb, args) # update knowledge base\n",
" for resource in self.consumes: # remove consumed resources\n",
" available_resources[resource] -= self.consumes[resource]\n",
" self.completed = True # set the task status to complete\n",
" return kb\n",
"\n",
" def has_consumable_resource(self, available_resources):\n",
" """\n",
" Ensure there are enough consumable resources for this action to execute.\n",
" """\n",
" for resource in self.consumes:\n",
" if available_resources.get(resource) is None:\n",
" return False\n",
" if available_resources[resource] < self.consumes[resource]:\n",
" return False\n",
" return True\n",
"\n",
" def has_usable_resource(self, available_resources):\n",
" """\n",
" Ensure there are enough usable resources for this action to execute.\n",
" """\n",
" for resource in self.uses:\n",
" if available_resources.get(resource) is None:\n",
" return False\n",
" if available_resources[resource] < self.uses[resource]:\n",
" return False\n",
" return True\n",
"\n",
" def inorder(self, job_order):\n",
" """\n",
" Ensure that all the jobs that had to be executed before the current one have been\n",
" successfully executed.\n",
" """\n",
" for jobs in job_order:\n",
" if self in jobs:\n",
" for job in jobs:\n",
" if job is self:\n",
" return True\n",
" if not job.completed:\n",
" return False\n",
" return True\n",
"
def job_shop_problem():\n",
" """\n",
" [Figure 11.1] JOB-SHOP-PROBLEM\n",
"\n",
" A job-shop scheduling problem for assembling two cars,\n",
" with resource and ordering constraints.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> p = job_shop_problem()\n",
" >>> p.goal_test()\n",
" False\n",
" >>> p.act(p.jobs[1][0])\n",
" >>> p.act(p.jobs[1][1])\n",
" >>> p.act(p.jobs[1][2])\n",
" >>> p.act(p.jobs[0][0])\n",
" >>> p.act(p.jobs[0][1])\n",
" >>> p.goal_test()\n",
" False\n",
" >>> p.act(p.jobs[0][2])\n",
" >>> p.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
" resources = {'EngineHoists': 1, 'WheelStations': 2, 'Inspectors': 2, 'LugNuts': 500}\n",
"\n",
" add_engine1 = HLA('AddEngine1', precond='~Has(C1, E1)', effect='Has(C1, E1)', duration=30, use={'EngineHoists': 1})\n",
" add_engine2 = HLA('AddEngine2', precond='~Has(C2, E2)', effect='Has(C2, E2)', duration=60, use={'EngineHoists': 1})\n",
" add_wheels1 = HLA('AddWheels1', precond='~Has(C1, W1)', effect='Has(C1, W1)', duration=30, use={'WheelStations': 1}, consume={'LugNuts': 20})\n",
" add_wheels2 = HLA('AddWheels2', precond='~Has(C2, W2)', effect='Has(C2, W2)', duration=15, use={'WheelStations': 1}, consume={'LugNuts': 20})\n",
" inspect1 = HLA('Inspect1', precond='~Inspected(C1)', effect='Inspected(C1)', duration=10, use={'Inspectors': 1})\n",
" inspect2 = HLA('Inspect2', precond='~Inspected(C2)', effect='Inspected(C2)', duration=10, use={'Inspectors': 1})\n",
"\n",
" actions = [add_engine1, add_engine2, add_wheels1, add_wheels2, inspect1, inspect2]\n",
"\n",
" job_group1 = [add_engine1, add_wheels1, inspect1]\n",
" job_group2 = [add_engine2, add_wheels2, inspect2]\n",
"\n",
" return Problem(init='Car(C1) & Car(C2) & Wheels(W1) & Wheels(W2) & Engine(E2) & Engine(E2) & ~Has(C1, E1) & ~Has(C2, E2) & ~Has(C1, W1) & ~Has(C2, W2) & ~Inspected(C1) & ~Inspected(C2)',\n",
" goals='Has(C1, W1) & Has(C1, E1) & Inspected(C1) & Has(C2, W2) & Has(C2, E2) & Inspected(C2)',\n",
" actions=actions,\n",
" jobs=[job_group1, job_group2],\n",
" resources=resources)\n",
"
def double_tennis_problem():\n",
" """\n",
" [Figure 11.10] DOUBLE-TENNIS-PROBLEM\n",
"\n",
" A multiagent planning problem involving two partner tennis players\n",
" trying to return an approaching ball and repositioning around in the court.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> dtp = double_tennis_problem()\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" False\n",
" >>> dtp.act(expr('Go(A, RightBaseLine, LeftBaseLine)'))\n",
" >>> dtp.act(expr('Hit(A, Ball, RightBaseLine)'))\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" False\n",
" >>> dtp.act(expr('Go(A, LeftNet, RightBaseLine)'))\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='At(A, LeftBaseLine) & At(B, RightNet) & Approaching(Ball, RightBaseLine) & Partner(A, B) & Partner(B, A)',\n",
" goals='Returned(Ball) & At(A, LeftNet) & At(B, RightNet)',\n",
" actions=[Action('Hit(actor, Ball, loc)',\n",
" precond='Approaching(Ball, loc) & At(actor, loc)',\n",
" effect='Returned(Ball)'),\n",
" Action('Go(actor, to, loc)', \n",
" precond='At(actor, loc)',\n",
" effect='At(actor, to) & ~At(actor, loc)')])\n",
"