# encoding=utf8 from utils import * from random import random from logger import * class BayesNet: def __init__(self, nodes=[]): update(self, nodes=[], vars=[]) for node in nodes: self.add(node) def add(self, node): self.nodes.append(node) self.vars.append(node.variable) def observe(self, var, val): self.evidence[var] = val def __str__(self): return repr(self) def __repr__(self): #return "BayesNet(nodes: %s, vars: %s)" % (self.nodes, self.vars) return "BayesNet(vars: %s)" % (self.vars) class BayesNode: def __init__(self, variable, parents, cpt): if isinstance(parents, str): parents = parents.split() update(self, variable=variable, parents=parents, cpt=cpt) def sample(self, probability, modelBuiltUpSoFar): # FIXME: Denne samplefunktion virker slet ikke, hvad gør jeg galt? info("sample(%s, %s, %s)" % (self, probability, modelBuiltUpSoFar)) conditions = {} if self.parents: for p in self.parents: debug("self.parents %s" % self.parents) debug("modelBuiltUpSoFar: %s" % modelBuiltUpSoFar) debug("") try: conditions[p] = modelBuiltUpSoFar[p] except KeyError: conditions[p] = False else: conditions[self.variable] = True #debug("sample: %s" % conditions) #debug("sample: %s" % dir(conditions)) #debug("sample: %s" % self.variable) #debug("sample: %s" % self.cpt) return probability <= self.cpt def __str__(self): return str(self.variable) def __repr__(self): #return "BayesNode(variable: %s, parents: %s, cpt: %s)" % (self.variable, self.parents, self.cpt) return "BayesNode(variable: %s)" % (self.variable) class BuilderAgent: pass node = BayesNode T, F = True, False burglary = BayesNet([ node('Burglary', '', .001), node('Earthquake', '', .002), node('Alarm', 'Burglary Earthquake', { (T, T):.95, (T, F):.94, (F, T):.29, (F, F):.001}), node('JohnCalls', 'Alarm', {T:.90, F:.05}), node('MaryCalls', 'Alarm', {T:.70, F:.01}) ]) rts = BayesNet([ node('S', '', .5), node('T', '', .5), node('U', 'S T', { (T, T): .4, (T, F): .9, (F, T): .1, (F, F): .6}), node('W', 'M', {T:.1, F:.9}), node('UW','U W M', { (T, T, T): { 'Nothing': .1, 'Worker': .2, 'Soldier': .6, 'Tank': .1 }, (T, T, F): { 'Nothing': .1, 'Worker': .2, 'Soldier': .1, 'Tank': .6 }, (T, F, T): { 'Nothing': .1, 'Worker': .1, 'Soldier': .7, 'Tank': .1 }, (T, F, F): { 'Nothing': .1, 'Worker': .1, 'Soldier': .1, 'Tank': .7 }, (F, T, T): { 'Nothing': .1, 'Worker': .6, 'Soldier': .2, 'Tank': .1 }, (F, T, F): { 'Nothing': .1, 'Worker': .6, 'Soldier': .1, 'Tank': .2 }, (F, F, T): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 }, (F, F, F): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 }}), node('M', '', .5) ]) def consistent(sample, evidence): info("consistent(%s, %s)" % (sample, evidence)) for v, k in enumerate(evidence): debug("v, k: %s, %s, %s" % (v, k, evidence[k])) for i, key in enumerate(evidence): k = evidence[key] debug("k: %s" % k) val = evidence[key] if not val == sample[key]: return False return True def PriorSample(bn): """Returns an event sampled from the prior specified by bn Parameters: bn: a Bayesian network specifying joint distribution P(X_1, ..., X_n)""" info("PriorSample(%s)" % bn) x = {} for xi in bn.nodes: x[xi.variable] = xi.sample(random(), x) return x def RejectionSampling(X, e, bn, N): """Returns an estimate of P(X|e) using the Rejection-Sampling algorithm from AIMA Parameters: X: the query variable e: the evidence specified as an event bn: a Bayesian network N: the total number of samples to be generated""" info("RejectionSampling(%s, %s, %s, %s)" % (X, e, bn, N)) # A vector of counts over X, initially zero result = [.0, .0] for i in range(1, N): x = PriorSample(bn) if consistent(x, e): query_value = x[X] # FIXME: maybe this should be if query_value < .5? warn(query_value) if query_value: result[0] += 1 else: result[1] += 1 return normalize(result) def WeightedSample(bn, e): """Returns an event and a weight""" # FIXME: Not implemented pass def LikelihoodWeighting(X, e, bn, N): """Returns an estimate of P(X|e) Parameters: X: the query variable e: the evidence specified as an event bn: a Bayesian network N: the total number of samples to be generated""" result = [.0, .0] for i in range(1, N): x, w = WeightedSample(bn, e) query_value = x[X] # FIXME: maybe this should be if query_value < .5? if query_value: result[0] += w else: result[1] += w return normalize(result) def main(): warn(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 100)) warn(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 1000)) warn(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 10000)) warn(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 100)) warn(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 1000)) warn(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 10000)) if __name__ == '__main__': main()