# encoding=utf8 from utils import * from random import random from logger import * from time import sleep class BayesNet: def __init__(self, nodes=[]): update(self, nodes=[], vars=[]) for node in nodes: self.add(node) def add(self, node): self.nodes.append(node) self.vars.append(node.variable) def observe(self, var, val): self.evidence[var] = val def __getitem__(self, name): """Returns a node with the given name""" for x in self.nodes: if x.variable == name: return x def __iter__(self): for node in self.nodes: yield node def __str__(self): return repr(self) def __repr__(self): #return "BayesNet(nodes: %s, vars: %s)" % (self.nodes, self.vars) return "BayesNet(vars: %s)" % (self.vars) def probabilityOf(conditions, cpt): debug("probabilityOf(%s, %s)" % (conditions, cpt)) prob = 0.0 if type(cpt) == float: return cpt else: tmp = [] for x in conditions: tmp.append(conditions[x]) if len(tmp) > 1: tmp = tuple(tmp) else: tmp = tmp.pop() for c in cpt: if tmp == c: prob = cpt[c] return prob class BayesNode: def __init__(self, variable, parents, cpt): if isinstance(parents, str): parents = parents.split() update(self, variable=variable, parents=parents, cpt=cpt) def sample(self, probability, modelBuiltUpSoFar): debug("BayesNode.sample(%s, %s, %s)" % (self, probability, modelBuiltUpSoFar)) conditions = {} if self.parents: for p in self.parents: conditions[p] = modelBuiltUpSoFar[p] else: conditions[self.variable] = True return probability <= probabilityOf(conditions, self.cpt) def __str__(self): return str(self.variable) def __repr__(self): #return "BayesNode(variable: %s, parents: %s, cpt: %s)" % (self.variable, self.parents, self.cpt) return "BayesNode(variable: %s)" % (self.variable) def LikelihoodWeighting(X, e, bn, N): """Returns an estimate of P(X|e) Parameters: X: the query variable e: the evidence specified as an event bn: a Bayesian network N: the total number of samples to be generated""" # A vector of counts over X, initially zero result = { 'Nothing': .0, 'Worker': .0, 'Soldier': .0, 'Tank': .0 } info("LikelihoodWeighting(%s, %s, %s, %s)" % (X, e, bn, N)) for i in range(1, N): x, w = WeightedSample(bn, e) error("LikelihoodWeighting: x=%s, w=%s, X=%s" % (x,w, X)) # W[x] = W[x] + w, where x is the value of X in x te = [] for t in e: te.append(e[t]) te = tuple(te) print e print bn[X].cpt[te] for xi in bn[X].cpt[te]: result[xi] += w print result print "---------" if X in result: result[X] += w else: result[X] = w rr = [] for r in result: rr.append(result[r]) error(rr) return normalize(rr) #return normalize(result) def WeightedSample(bn, e): """Returns an event and a weight""" info("WeightedSample(%s, %s)" % (bn, e)) x = {} w = 1.0 for node in bn: # if X_i has a value x_i in e if node.variable in e: debug("Node %s in e" % node) debug("node=%s, w=%s, node.cpt=%s" % (node, w, node.cpt)) # w = w * P(X_i = x_i | parents(X_i)) w *= probabilityOf(x, node.cpt) x[node.variable] = e[node.variable] else: debug("Node %s NOT in e" % node) debug("node=%s, w=%s, node.cpt=%s" % (node, w, node.cpt)) # x_i = a Random sample from P(X_i | parents(X_i)) x[node.variable] = node.sample(random(), x) debug("WeightedSample: Returning %s, %s" % (x, w)) return (x, w) def PriorSample(bn): """Returns an event sampled from the prior specified by bn Parameters: bn: a Bayesian network specifying joint distribution P(X_1, ..., X_n)""" debug("PriorSample(%s)" % bn) x = {} for xi in bn.nodes: x[xi.variable] = xi.sample(random(), x) return x def consistent(sample, evidence): debug("consistent(%s, %s)" % (sample, evidence)) for i, key in enumerate(evidence): k = evidence[key] val = evidence[key] if not val == sample[key]: return False return True def normalize2(numbers, total=1.0): """Multiply each number by a constant such that the sum is 1.0 (or total). >>> normalize([1,2,1]) [0.25, 0.5, 0.25] """ k = total / sum(numbers) return [k * n for n in numbers] def RejectionSampling(X, e, bn, N): """Returns an estimate of P(X|e) using the Rejection-Sampling algorithm from AIMA Parameters: X: the query variable e: the evidence specified as an event bn: a Bayesian network N: the total number of samples to be generated""" debug("RejectionSampling(%s, %s, %s, %s)" % (X, e, bn, N)) # A vector of counts over X, initially zero result = { 'Nothing': .0, 'Worker': .0, 'Soldier': .0, 'Tank': .0 } num_iterations = 0 num_consistent = 0 for i in range(1, N): num_iterations += 1 x = PriorSample(bn) if consistent(x, e): num_consistent += 1 idx = (x['M'], x['W'], x['U']) for t in bn[X].cpt[idx]: result[t] += bn[X].cpt[idx][t] query_value = x[X] info("RejectionSampling: num_iterations: %s, num_consistent: %s" % (num_iterations, num_consistent)) rr = [] for r in result: rr.append(result[r]) return normalize(rr) def main(): node = BayesNode T, F = True, False burglary = BayesNet([ node('Burglary', '', .001), node('Earthquake', '', .002), node('Alarm', 'Burglary Earthquake', { (T, T):.95, (T, F):.94, (F, T):.29, (F, F):.001}), node('JohnCalls', 'Alarm', {T:.90, F:.05}), node('MaryCalls', 'Alarm', {T:.70, F:.01}) ]) rts = BayesNet([ node('S', '', .5), node('T', '', .5), node('M', '', .5), node('U', 'S T', { (T, T): .4, (T, F): .9, (F, T): .1, (F, F): .6}), node('W', 'M', {T:.1, F:.9}), node('UW','U W M', { (T, T, T): { 'Nothing': .1, 'Worker': .2, 'Soldier': .6, 'Tank': .1 }, (T, T, F): { 'Nothing': .1, 'Worker': .2, 'Soldier': .1, 'Tank': .6 }, (T, F, T): { 'Nothing': .1, 'Worker': .1, 'Soldier': .7, 'Tank': .1 }, (T, F, F): { 'Nothing': .1, 'Worker': .1, 'Soldier': .1, 'Tank': .7 }, (F, T, T): { 'Nothing': .1, 'Worker': .6, 'Soldier': .2, 'Tank': .1 }, (F, T, F): { 'Nothing': .1, 'Worker': .6, 'Soldier': .1, 'Tank': .2 }, (F, F, T): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 }, (F, F, F): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 }}) ]) #info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 100)) #info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 1000)) #info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 10000)) info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 100)) info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 1000)) info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 10000)) if __name__ == '__main__': main()