Pastebin
Paste #1288: builder-agent.py
< previous paste - next paste>
Pasted by tdn
# encoding=utf8
from utils import *
from random import random
from logger import *
from time import sleep
class BayesNet:
def __init__(self, nodes=[]):
update(self, nodes=[], vars=[])
for node in nodes:
self.add(node)
def add(self, node):
self.nodes.append(node)
self.vars.append(node.variable)
def observe(self, var, val):
self.evidence[var] = val
def __getitem__(self, name):
"""Returns a node with the given name"""
for x in self.nodes:
if x.variable == name:
return x
def __iter__(self):
for node in self.nodes:
yield node
def __str__(self):
return repr(self)
def __repr__(self):
#return "BayesNet(nodes: %s, vars: %s)" % (self.nodes, self.vars)
return "BayesNet(vars: %s)" % (self.vars)
def probabilityOf(conditions, cpt):
debug("probabilityOf(%s, %s)" % (conditions, cpt))
prob = 0.0
if type(cpt) == float:
return cpt
else:
tmp = []
for x in conditions:
tmp.append(conditions[x])
if len(tmp) > 1:
tmp = tuple(tmp)
else:
tmp = tmp.pop()
for c in cpt:
if tmp == c:
prob = cpt[c]
return prob
class BayesNode:
def __init__(self, variable, parents, cpt):
if isinstance(parents, str): parents = parents.split()
update(self, variable=variable, parents=parents, cpt=cpt)
def sample(self, probability, modelBuiltUpSoFar):
debug("BayesNode.sample(%s, %s, %s)" % (self, probability, modelBuiltUpSoFar))
conditions = {}
if self.parents:
for p in self.parents:
conditions[p] = modelBuiltUpSoFar[p]
else:
conditions[self.variable] = True
return probability <= probabilityOf(conditions, self.cpt)
def __str__(self):
return str(self.variable)
def __repr__(self):
#return "BayesNode(variable: %s, parents: %s, cpt: %s)" % (self.variable, self.parents, self.cpt)
return "BayesNode(variable: %s)" % (self.variable)
def LikelihoodWeighting(X, e, bn, N):
"""Returns an estimate of P(X|e)
Parameters:
X: the query variable
e: the evidence specified as an event
bn: a Bayesian network
N: the total number of samples to be generated"""
# A vector of counts over X, initially zero
result = { 'Nothing': .0, 'Worker': .0, 'Soldier': .0, 'Tank': .0 }
info("LikelihoodWeighting(%s, %s, %s, %s)" % (X, e, bn, N))
for i in range(1, N):
x, w = WeightedSample(bn, e)
error("LikelihoodWeighting: x=%s, w=%s, X=%s" % (x,w, X))
# W[x] = W[x] + w, where x is the value of X in x
te = []
for t in e:
te.append(e[t])
te = tuple(te)
print e
print bn[X].cpt[te]
for xi in bn[X].cpt[te]:
result[xi] += w
print result
print "---------"
if X in result:
result[X] += w
else:
result[X] = w
rr = []
for r in result:
rr.append(result[r])
error(rr)
return normalize(rr)
#return normalize(result)
def WeightedSample(bn, e):
"""Returns an event and a weight"""
info("WeightedSample(%s, %s)" % (bn, e))
x = {}
w = 1.0
for node in bn:
# if X_i has a value x_i in e
if node.variable in e:
debug("Node %s in e" % node)
# w = w * P(X_i = x_i | parents(X_i))
w *= probabilityOf(x, node.cpt)
debug("node=%s, w=%s, node.cpt=%s" % (node, w, node.cpt))
debug("probabilityOf(%s, %s) = %s" % (e, node.cpt, probabilityOf(e, node.cpt)))
debug("node=%s, w=%s, node.cpt=%s" % (node, w, node.cpt))
x[node.variable] = e[node.variable]
#w *= node.cpt['x']
# x.put(node.getVariable(), evidence.get(node.getVariable()))
else:
# x_i = a Random sample from P(X_i | parents(X_i))
debug("Node %s NOT in e" % node)
debug("node=%s, w=%s, node.cpt=%s" % (node, w, node.cpt))
x[node.variable] = node.sample(random(), x)
#x.put(node.getVariable(), node.isTrueFor(r.nextDouble(), x))
debug("WeightedSample: Returning %s, %s" % (x, w))
return (x, w)
def PriorSample(bn):
"""Returns an event sampled from the prior specified by bn
Parameters:
bn: a Bayesian network specifying joint distribution P(X_1, ..., X_n)"""
debug("PriorSample(%s)" % bn)
x = {}
for xi in bn.nodes:
x[xi.variable] = xi.sample(random(), x)
return x
def consistent(sample, evidence):
debug("consistent(%s, %s)" % (sample, evidence))
for i, key in enumerate(evidence):
k = evidence[key]
val = evidence[key]
if not val == sample[key]:
return False
return True
def normalize2(numbers, total=1.0):
"""Multiply each number by a constant such that the sum is 1.0 (or total).
>>> normalize([1,2,1])
[0.25, 0.5, 0.25]
"""
k = total / sum(numbers)
return [k * n for n in numbers]
def RejectionSampling(X, e, bn, N):
"""Returns an estimate of P(X|e) using the
Rejection-Sampling algorithm from AIMA
Parameters:
X: the query variable
e: the evidence specified as an event
bn: a Bayesian network
N: the total number of samples to be generated"""
debug("RejectionSampling(%s, %s, %s, %s)" % (X, e, bn, N))
# A vector of counts over X, initially zero
result = { 'Nothing': .0, 'Worker': .0, 'Soldier': .0, 'Tank': .0 }
num_iterations = 0
num_consistent = 0
for i in range(1, N):
num_iterations += 1
x = PriorSample(bn)
if consistent(x, e):
num_consistent += 1
idx = (x['M'], x['W'], x['U'])
for t in bn[X].cpt[idx]:
result[t] += bn[X].cpt[idx][t]
query_value = x[X]
info("RejectionSampling: num_iterations: %s, num_consistent: %s" % (num_iterations, num_consistent))
rr = []
for r in result:
rr.append(result[r])
return normalize(rr)
def main():
node = BayesNode
T, F = True, False
burglary = BayesNet([
node('Burglary', '', .001),
node('Earthquake', '', .002),
node('Alarm', 'Burglary Earthquake', {
(T, T):.95,
(T, F):.94,
(F, T):.29,
(F, F):.001}),
node('JohnCalls', 'Alarm', {T:.90, F:.05}),
node('MaryCalls', 'Alarm', {T:.70, F:.01})
])
rts = BayesNet([
node('S', '', .5),
node('T', '', .5),
node('M', '', .5),
node('U', 'S T', {
(T, T): .4,
(T, F): .9,
(F, T): .1,
(F, F): .6}),
node('W', 'M', {T:.1, F:.9}),
node('UW','U W M', {
(T, T, T): { 'Nothing': .1, 'Worker': .2, 'Soldier': .6, 'Tank': .1 },
(T, T, F): { 'Nothing': .1, 'Worker': .2, 'Soldier': .1, 'Tank': .6 },
(T, F, T): { 'Nothing': .1, 'Worker': .1, 'Soldier': .7, 'Tank': .1 },
(T, F, F): { 'Nothing': .1, 'Worker': .1, 'Soldier': .1, 'Tank': .7 },
(F, T, T): { 'Nothing': .1, 'Worker': .6, 'Soldier': .2, 'Tank': .1 },
(F, T, F): { 'Nothing': .1, 'Worker': .6, 'Soldier': .1, 'Tank': .2 },
(F, F, T): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 },
(F, F, F): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 }})
])
#info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 100))
#info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 1000))
#info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 10000))
info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 100))
info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 1000))
info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 10000))
if __name__ == '__main__':
main()
New Paste
Go to most recent paste.