Pastebin

Paste #1289: builder-agent.py

< previous paste - next paste>

Pasted by tdn

Download View as text

# encoding=utf8
from utils import *
from random import random
from logger import *
from time import sleep

class BayesNet:
    def __init__(self, nodes=[]):
        update(self, nodes=[], vars=[])
        for node in nodes:
            self.add(node)

    def add(self, node):
        self.nodes.append(node)
        self.vars.append(node.variable)

    def observe(self, var, val):
        self.evidence[var] = val

    def __getitem__(self, name):
        """Returns a node with the given name"""
        for x in self.nodes:
            if x.variable == name:
                return x
    def __iter__(self):
        for node in self.nodes:
            yield node

    def __str__(self):
        return repr(self)
    
    def __repr__(self):
        #return "BayesNet(nodes: %s, vars: %s)" % (self.nodes, self.vars)
        return "BayesNet(vars: %s)" % (self.vars)

def probabilityOf(conditions, cpt):
    debug("probabilityOf(%s, %s)" % (conditions, cpt))
    prob = 0.0
    if type(cpt) == float:
        return cpt
    else:
        tmp = []
        for x in conditions:
            tmp.append(conditions[x])
        if len(tmp) > 1:
            tmp = tuple(tmp)
        else:
            tmp = tmp.pop()
        for c in cpt:
            if tmp == c:
                prob = cpt[c]
    return prob

class BayesNode:
    def __init__(self, variable, parents, cpt):
        if isinstance(parents, str): parents = parents.split()
        update(self, variable=variable, parents=parents, cpt=cpt)

    def sample(self, probability, modelBuiltUpSoFar):
        debug("BayesNode.sample(%s, %s, %s)" % (self, probability, modelBuiltUpSoFar))
        conditions = {}
        if self.parents:
            for p in self.parents:
                conditions[p] = modelBuiltUpSoFar[p]
        else:
            conditions[self.variable] = True
        return probability <= probabilityOf(conditions, self.cpt)

    def __str__(self):
        return str(self.variable)
    
    def __repr__(self):
        #return "BayesNode(variable: %s, parents: %s, cpt: %s)" % (self.variable, self.parents, self.cpt)
        return "BayesNode(variable: %s)" % (self.variable)

def LikelihoodWeighting(X, e, bn, N):
    """Returns an estimate of P(X|e)

    Parameters:
    X: the query variable
    e: the evidence specified as an event
    bn: a Bayesian network
    N: the total number of samples to be generated"""

    # A vector of counts over X, initially zero
    result = { 'Nothing': .0, 'Worker': .0, 'Soldier': .0, 'Tank': .0 }
    
    info("LikelihoodWeighting(%s, %s, %s, %s)" % (X, e, bn, N))
    for i in range(1, N):
        x, w = WeightedSample(bn, e)
        error("LikelihoodWeighting: x=%s, w=%s, X=%s" % (x,w, X))

        # W[x] = W[x] + w, where x is the value of X in x
        te = []
        for t in e:
            te.append(e[t])
        te = tuple(te)
        print e
        print bn[X].cpt[te]
        for xi in bn[X].cpt[te]:
            result[xi] += w
        print result
        print "---------"

        if X in result:
            result[X] += w
        else:
            result[X] = w

    rr = []
    for r in result:
        rr.append(result[r])
    error(rr)
    return normalize(rr)
    #return normalize(result)

def WeightedSample(bn, e):
    """Returns an event and a weight"""
    info("WeightedSample(%s, %s)" % (bn, e))
    x = {}
    w = 1.0
    for node in bn:
        # if X_i has a value x_i in e
        if node.variable in e:
            debug("Node %s in e" % node)
            debug("node=%s, w=%s, node.cpt=%s" % (node, w, node.cpt))
            # w = w * P(X_i = x_i | parents(X_i))
            w *= probabilityOf(x, node.cpt)
            x[node.variable] = e[node.variable]

        else:
            debug("Node %s NOT in e" % node)
            debug("node=%s, w=%s, node.cpt=%s" % (node, w, node.cpt))
            # x_i = a Random sample from P(X_i | parents(X_i))
            x[node.variable] = node.sample(random(), x)

    debug("WeightedSample: Returning %s, %s" % (x, w))
    return (x, w)

def PriorSample(bn):
    """Returns an event sampled from the prior specified by bn
    Parameters:
    bn: a Bayesian network specifying joint distribution P(X_1, ..., X_n)"""
    debug("PriorSample(%s)" % bn)
    x = {}
    for xi in bn.nodes:
        x[xi.variable] = xi.sample(random(), x) 
    return x

def consistent(sample, evidence):
    debug("consistent(%s, %s)" % (sample, evidence))

    for i, key in enumerate(evidence):
        k = evidence[key]
        val = evidence[key]
        if not val == sample[key]:
            return False
    return True

def normalize2(numbers, total=1.0):
    """Multiply each number by a constant such that the sum is 1.0 (or total).
    >>> normalize([1,2,1])
    [0.25, 0.5, 0.25]
    """
    k = total / sum(numbers)
    return [k * n for n in numbers]
    

def RejectionSampling(X, e, bn, N):
    """Returns an estimate of P(X|e) using the 
    Rejection-Sampling algorithm from AIMA
    
    Parameters:
    X: the query variable
    e: the evidence specified as an event
    bn: a Bayesian network
    N: the total number of samples to be generated"""

    debug("RejectionSampling(%s, %s, %s, %s)" % (X, e, bn, N))

    # A vector of counts over X, initially zero
    result = { 'Nothing': .0, 'Worker': .0, 'Soldier': .0, 'Tank': .0 }

    num_iterations = 0
    num_consistent = 0
    for i in range(1, N):
        num_iterations += 1
        x = PriorSample(bn)
        if consistent(x, e):
            num_consistent += 1
            idx = (x['M'], x['W'], x['U'])
            for t in bn[X].cpt[idx]:
                result[t] += bn[X].cpt[idx][t]
            query_value = x[X]
    info("RejectionSampling: num_iterations: %s, num_consistent: %s" % (num_iterations, num_consistent))
    rr = []
    for r in result:
        rr.append(result[r])
    return normalize(rr)


def main():
    node = BayesNode

    T, F = True, False

    burglary = BayesNet([
      node('Burglary', '', .001), 
      node('Earthquake', '', .002),
      node('Alarm', 'Burglary Earthquake', {
                        (T, T):.95,
                        (T, F):.94,
                        (F, T):.29,
                        (F, F):.001}),
      node('JohnCalls', 'Alarm', {T:.90, F:.05}),
      node('MaryCalls', 'Alarm', {T:.70, F:.01})
      ])

    rts = BayesNet([
        node('S', '', .5),
        node('T', '', .5),
        node('M', '', .5),
        node('U', 'S T', {
            (T, T): .4,
            (T, F): .9,
            (F, T): .1,
            (F, F): .6}),
        node('W', 'M', {T:.1, F:.9}),
        node('UW','U W M', {
            (T, T, T): { 'Nothing': .1, 'Worker': .2, 'Soldier': .6, 'Tank': .1 },
            (T, T, F): { 'Nothing': .1, 'Worker': .2, 'Soldier': .1, 'Tank': .6 },
            (T, F, T): { 'Nothing': .1, 'Worker': .1, 'Soldier': .7, 'Tank': .1 },
            (T, F, F): { 'Nothing': .1, 'Worker': .1, 'Soldier': .1, 'Tank': .7 },
            (F, T, T): { 'Nothing': .1, 'Worker': .6, 'Soldier': .2, 'Tank': .1 },
            (F, T, F): { 'Nothing': .1, 'Worker': .6, 'Soldier': .1, 'Tank': .2 },
            (F, F, T): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 },
            (F, F, F): { 'Nothing': .7, 'Worker': .1, 'Soldier': .1, 'Tank': .1 }})
        ])


    #info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 100))
    #info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 1000))
    #info(RejectionSampling('UW', {'S': T, 'T': T, 'W': T}, rts, 10000))

    info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 100))
    info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 1000))
    info(LikelihoodWeighting('UW', {'S': T, 'T': T, 'W': T}, rts, 10000))

if __name__ == '__main__':
    main()

New Paste


Do not write anything in this field if you're a human.

Go to most recent paste.