{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## February 26, 2020\n", "\n", "AIMA Chapter 14" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "from probability import *\n", "from utils import print_table\n", "from notebook import psource, pseudocode, heatmap" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## BAYESIAN NETWORKS\n", "\n", "A Bayesian network is a representation of the joint probability distribution encoding a collection of conditional independence statements.\n", "\n", "A Bayes Network is implemented as the class **BayesNet**. It consists of a collection of nodes implemented by the class **BayesNode**. The implementation in the above mentioned classes focuses only on boolean variables. Each node is associated with a variable and it contains a **conditional probabilty table (cpt)**. The **cpt** represents the probability distribution of the variable conditioned on its parents **P(X | parents)**.\n", "\n", "Let us dive into the **BayesNode** implementation." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", "class BayesNode:\n",
" """A conditional probability distribution for a boolean variable,\n",
" P(X | parents). Part of a BayesNet."""\n",
"\n",
" def __init__(self, X, parents, cpt):\n",
" """X is a variable name, and parents a sequence of variable\n",
" names or a space-separated string. cpt, the conditional\n",
" probability table, takes one of these forms:\n",
"\n",
" * A number, the unconditional probability P(X=true). You can\n",
" use this form when there are no parents.\n",
"\n",
" * A dict {v: p, ...}, the conditional probability distribution\n",
" P(X=true | parent=v) = p. When there's just one parent.\n",
"\n",
" * A dict {(v1, v2, ...): p, ...}, the distribution P(X=true |\n",
" parent1=v1, parent2=v2, ...) = p. Each key must have as many\n",
" values as there are parents. You can use this form always;\n",
" the first two are just conveniences.\n",
"\n",
" In all cases the probability of X being false is left implicit,\n",
" since it follows from P(X=true).\n",
"\n",
" >>> X = BayesNode('X', '', 0.2)\n",
" >>> Y = BayesNode('Y', 'P', {T: 0.2, F: 0.7})\n",
" >>> Z = BayesNode('Z', 'P Q',\n",
" ... {(T, T): 0.2, (T, F): 0.3, (F, T): 0.5, (F, F): 0.7})\n",
" """\n",
" if isinstance(parents, str):\n",
" parents = parents.split()\n",
"\n",
" # We store the table always in the third form above.\n",
" if isinstance(cpt, (float, int)): # no parents, 0-tuple\n",
" cpt = {(): cpt}\n",
" elif isinstance(cpt, dict):\n",
" # one parent, 1-tuple\n",
" if cpt and isinstance(list(cpt.keys())[0], bool):\n",
" cpt = {(v,): p for v, p in cpt.items()}\n",
"\n",
" assert isinstance(cpt, dict)\n",
" for vs, p in cpt.items():\n",
" assert isinstance(vs, tuple) and len(vs) == len(parents)\n",
" assert all(isinstance(v, bool) for v in vs)\n",
" assert 0 <= p <= 1\n",
"\n",
" self.variable = X\n",
" self.parents = parents\n",
" self.cpt = cpt\n",
" self.children = []\n",
"\n",
" def p(self, value, event):\n",
" """Return the conditional probability\n",
" P(X=value | parents=parent_values), where parent_values\n",
" are the values of parents in event. (event must assign each\n",
" parent a value.)\n",
" >>> bn = BayesNode('X', 'Burglary', {T: 0.2, F: 0.625})\n",
" >>> bn.p(False, {'Burglary': False, 'Earthquake': True})\n",
" 0.375"""\n",
" assert isinstance(value, bool)\n",
" ptrue = self.cpt[event_values(event, self.parents)]\n",
" return ptrue if value else 1 - ptrue\n",
"\n",
" def sample(self, event):\n",
" """Sample from the distribution for this variable conditioned\n",
" on event's values for parent_variables. That is, return True/False\n",
" at random according with the conditional probability given the\n",
" parents."""\n",
" return probability(self.p(True, event))\n",
"\n",
" def __repr__(self):\n",
" return repr((self.variable, ' '.join(self.parents)))\n",
"
class BayesNet:\n",
" """Bayesian network containing only boolean-variable nodes."""\n",
"\n",
" def __init__(self, node_specs=None):\n",
" """Nodes must be ordered with parents before children."""\n",
" self.nodes = []\n",
" self.variables = []\n",
" node_specs = node_specs or []\n",
" for node_spec in node_specs:\n",
" self.add(node_spec)\n",
"\n",
" def add(self, node_spec):\n",
" """Add a node to the net. Its parents must already be in the\n",
" net, and its variable must not."""\n",
" node = BayesNode(*node_spec)\n",
" assert node.variable not in self.variables\n",
" assert all((parent in self.variables) for parent in node.parents)\n",
" self.nodes.append(node)\n",
" self.variables.append(node.variable)\n",
" for parent in node.parents:\n",
" self.variable_node(parent).children.append(node)\n",
"\n",
" def variable_node(self, var):\n",
" """Return the node for the variable named var.\n",
" >>> burglary.variable_node('Burglary').variable\n",
" 'Burglary'"""\n",
" for n in self.nodes:\n",
" if n.variable == var:\n",
" return n\n",
" raise Exception("No such variable: {}".format(var))\n",
"\n",
" def variable_values(self, var):\n",
" """Return the domain of var."""\n",
" return [True, False]\n",
"\n",
" def __repr__(self):\n",
" return 'BayesNet({0!r})'.format(self.nodes)\n",
"
def enumerate_all(variables, e, bn):\n",
" """Return the sum of those entries in P(variables | e{others})\n",
" consistent with e, where P is the joint distribution represented\n",
" by bn, and e{others} means e restricted to bn's other variables\n",
" (the ones other than variables). Parents must precede children in variables."""\n",
" if not variables:\n",
" return 1.0\n",
" Y, rest = variables[0], variables[1:]\n",
" Ynode = bn.variable_node(Y)\n",
" if Y in e:\n",
" return Ynode.p(e[Y], e) * enumerate_all(rest, e, bn)\n",
" else:\n",
" return sum(Ynode.p(y, e) * enumerate_all(rest, extend(e, Y, y), bn)\n",
" for y in bn.variable_values(Y))\n",
"
def enumeration_ask(X, e, bn):\n",
" """Return the conditional probability distribution of variable X\n",
" given evidence e, from BayesNet bn. [Figure 14.9]\n",
" >>> enumeration_ask('Burglary', dict(JohnCalls=T, MaryCalls=T), burglary\n",
" ... ).show_approx()\n",
" 'False: 0.716, True: 0.284'"""\n",
" assert X not in e, "Query variable must be distinct from evidence"\n",
" Q = ProbDist(X)\n",
" for xi in bn.variable_values(X):\n",
" Q[xi] = enumerate_all(bn.variables, extend(e, X, xi), bn)\n",
" return Q.normalize()\n",
"
def make_factor(var, e, bn):\n",
" """Return the factor for var in bn's joint distribution given e.\n",
" That is, bn's full joint distribution, projected to accord with e,\n",
" is the pointwise product of these factors for bn's variables."""\n",
" node = bn.variable_node(var)\n",
" variables = [X for X in [var] + node.parents if X not in e]\n",
" cpt = {event_values(e1, variables): node.p(e1[var], e1)\n",
" for e1 in all_events(variables, bn, e)}\n",
" return Factor(variables, cpt)\n",
"
def all_events(variables, bn, e):\n",
" """Yield every way of extending e with values for all variables."""\n",
" if not variables:\n",
" yield e\n",
" else:\n",
" X, rest = variables[0], variables[1:]\n",
" for e1 in all_events(rest, bn, e):\n",
" for x in bn.variable_values(X):\n",
" yield extend(e1, X, x)\n",
"
def pointwise_product(self, other, bn):\n",
" """Multiply two factors, combining their variables."""\n",
" variables = list(set(self.variables) | set(other.variables))\n",
" cpt = {event_values(e, variables): self.p(e) * other.p(e)\n",
" for e in all_events(variables, bn, {})}\n",
" return Factor(variables, cpt)\n",
"
def pointwise_product(factors, bn):\n",
" return reduce(lambda f, g: f.pointwise_product(g, bn), factors)\n",
"
def sum_out(self, var, bn):\n",
" """Make a factor eliminating var by summing over its values."""\n",
" variables = [X for X in self.variables if X != var]\n",
" cpt = {event_values(e, variables): sum(self.p(extend(e, var, val))\n",
" for val in bn.variable_values(var))\n",
" for e in all_events(variables, bn, {})}\n",
" return Factor(variables, cpt)\n",
"
def sum_out(var, factors, bn):\n",
" """Eliminate var from all factors by summing over its values."""\n",
" result, var_factors = [], []\n",
" for f in factors:\n",
" (var_factors if var in f.variables else result).append(f)\n",
" result.append(pointwise_product(var_factors, bn).sum_out(var, bn))\n",
" return result\n",
"
def elimination_ask(X, e, bn):\n",
" """Compute bn's P(X|e) by variable elimination. [Figure 14.11]\n",
" >>> elimination_ask('Burglary', dict(JohnCalls=T, MaryCalls=T), burglary\n",
" ... ).show_approx()\n",
" 'False: 0.716, True: 0.284'"""\n",
" assert X not in e, "Query variable must be distinct from evidence"\n",
" factors = []\n",
" for var in reversed(bn.variables):\n",
" factors.append(make_factor(var, e, bn))\n",
" if is_hidden(var, X, e):\n",
" factors = sum_out(var, factors, bn)\n",
" return pointwise_product(factors, bn).normalize()\n",
"
def sample(self, event):\n",
" """Sample from the distribution for this variable conditioned\n",
" on event's values for parent_variables. That is, return True/False\n",
" at random according with the conditional probability given the\n",
" parents."""\n",
" return probability(self.p(True, event))\n",
"
def prior_sample(bn):\n",
" """Randomly sample from bn's full joint distribution. The result\n",
" is a {variable: value} dict. [Figure 14.13]"""\n",
" event = {}\n",
" for node in bn.nodes:\n",
" event[node.variable] = node.sample(event)\n",
" return event\n",
"
def rejection_sampling(X, e, bn, N=10000):\n",
" """Estimate the probability distribution of variable X given\n",
" evidence e in BayesNet bn, using N samples. [Figure 14.14]\n",
" Raises a ZeroDivisionError if all the N samples are rejected,\n",
" i.e., inconsistent with e.\n",
" >>> random.seed(47)\n",
" >>> rejection_sampling('Burglary', dict(JohnCalls=T, MaryCalls=T),\n",
" ... burglary, 10000).show_approx()\n",
" 'False: 0.7, True: 0.3'\n",
" """\n",
" counts = {x: 0 for x in bn.variable_values(X)} # bold N in [Figure 14.14]\n",
" for j in range(N):\n",
" sample = prior_sample(bn) # boldface x in [Figure 14.14]\n",
" if consistent_with(sample, e):\n",
" counts[sample[X]] += 1\n",
" return ProbDist(X, counts)\n",
"
def consistent_with(event, evidence):\n",
" """Is event consistent with the given evidence?"""\n",
" return all(evidence.get(k, v) == v\n",
" for k, v in event.items())\n",
"
def weighted_sample(bn, e):\n",
" """Sample an event from bn that's consistent with the evidence e;\n",
" return the event and its weight, the likelihood that the event\n",
" accords to the evidence."""\n",
" w = 1\n",
" event = dict(e) # boldface x in [Figure 14.15]\n",
" for node in bn.nodes:\n",
" Xi = node.variable\n",
" if Xi in e:\n",
" w *= node.p(e[Xi], event)\n",
" else:\n",
" event[Xi] = node.sample(event)\n",
" return event, w\n",
"
def likelihood_weighting(X, e, bn, N=10000):\n",
" """Estimate the probability distribution of variable X given\n",
" evidence e in BayesNet bn. [Figure 14.15]\n",
" >>> random.seed(1017)\n",
" >>> likelihood_weighting('Burglary', dict(JohnCalls=T, MaryCalls=T),\n",
" ... burglary, 10000).show_approx()\n",
" 'False: 0.702, True: 0.298'\n",
" """\n",
" W = {x: 0 for x in bn.variable_values(X)}\n",
" for j in range(N):\n",
" sample, weight = weighted_sample(bn, e) # boldface x, w in [Figure 14.15]\n",
" W[sample[X]] += weight\n",
" return ProbDist(X, W)\n",
"
def gibbs_ask(X, e, bn, N=1000):\n",
" """[Figure 14.16]"""\n",
" assert X not in e, "Query variable must be distinct from evidence"\n",
" counts = {x: 0 for x in bn.variable_values(X)} # bold N in [Figure 14.16]\n",
" Z = [var for var in bn.variables if var not in e]\n",
" state = dict(e) # boldface x in [Figure 14.16]\n",
" for Zi in Z:\n",
" state[Zi] = random.choice(bn.variable_values(Zi))\n",
" for j in range(N):\n",
" for Zi in Z:\n",
" state[Zi] = markov_blanket_sample(Zi, state, bn)\n",
" counts[state[X]] += 1\n",
" return ProbDist(X, counts)\n",
"
class HiddenMarkovModel:\n",
" """A Hidden markov model which takes Transition model and Sensor model as inputs"""\n",
"\n",
" def __init__(self, transition_model, sensor_model, prior=None):\n",
" self.transition_model = transition_model\n",
" self.sensor_model = sensor_model\n",
" self.prior = prior or [0.5, 0.5]\n",
"\n",
" def sensor_dist(self, ev):\n",
" if ev is True:\n",
" return self.sensor_model[0]\n",
" else:\n",
" return self.sensor_model[1]\n",
"
def forward(HMM, fv, ev):\n",
" prediction = vector_add(scalar_vector_product(fv[0], HMM.transition_model[0]),\n",
" scalar_vector_product(fv[1], HMM.transition_model[1]))\n",
" sensor_dist = HMM.sensor_dist(ev)\n",
"\n",
" return normalize(element_wise_product(sensor_dist, prediction))\n",
"
def backward(HMM, b, ev):\n",
" sensor_dist = HMM.sensor_dist(ev)\n",
" prediction = element_wise_product(sensor_dist, b)\n",
"\n",
" return normalize(vector_add(scalar_vector_product(prediction[0], HMM.transition_model[0]),\n",
" scalar_vector_product(prediction[1], HMM.transition_model[1])))\n",
"