""" hm6.py Hidden Markov & Viterbi Algorithm v6 Elias Zeidan & Jim Mahoney - Oct-Nov 2011 Playing around with Hidden Markov Models in Natural Language Processing This was the assignment from last week : 1. Generate a long (1000 or more) sequence of hidden states. From that data alone, write code that finds a) the probability of each state, b) the conditional probabilities P(i|j), and c) compare those with what you calculate should be the theoretical values from the markov definitions. 2. a) Do the same for a long (1000 or more) sequence of obversables. b) How do you calculate the theoretical P(i|j) in this case? c) Do the values estimated from the data agree? 3. What does the Verterbi algorithm do? a) what is the input? b) what is the output? """ import random import doctest import string import pprint pp = pprint.PrettyPrinter(indent=4) #List tuples of States and Outputs states = ('1', '2', '3') outputs = ('a', 'b', 'c', 'd', 'e') #Probabilities for first 'visible' node start_prob = {'1' : 0.4, '2' : 0.5, '3' : 0.1} #Probabilities for node transitions transition_prob = { '1' : {'1' : 0.2, '2' : 0.5, '3' : 0.3}, '2' : {'1' : 0.4, '2' : 0.0, '3' : 0.6}, '3' : {'1' : 0.5, '2' : 0.2, '3' : 0.3} } #Output probabilities from each node emission_prob = { '1' : {'a': 0.5, 'b' : 0.1, 'c' : 0.3, 'd' : 0.1, 'e' : 0.0}, '2' : {'a': 0.1, 'b' : 0.2, 'c' : 0.3, 'd' : 0.2, 'e' : 0.2}, '3' : {'a': 0.2, 'b' : 0.3, 'c' : 0.1, 'd' : 0.0, 'e' : 0.4}, } def weighted_random_choice(probabilities, x=False): """Given a dictionary of probabilities i.e. {'a':0.8, 'b':0.2}, return one of the keys with probability given by the values. >>> weighted_random_choice({'a':0.5, 'b':0.1, 'c':0.4}, x=0.1) 'a' >>> weighted_random_choice({'a':0.5, 'b':0.1, 'c':0.4}, x=0.55) 'b' >>> weighted_random_choice({'a':0.5, 'b':0.1, 'c':0.4}, x=0.8) 'c' """ threshold = 0.0 if not x: x = random.random() # 0 <= x < 1.0 choices = probabilities.keys() choices.sort() for choice in choices: threshold += probabilities[choice] #print "choice, prob, threshold, x = %s, %f, %f, %f" % \ # (choice, probabilities[choice], threshold, x) if x < threshold: return choice return None def make_markov_states(how_many=1000): """Return one sequence of hidden markov states""" states = [weighted_random_choice(start_prob)] for i in range(how_many-1): prob_next_state = transition_prob[states[-1]] states.append(weighted_random_choice(prob_next_state)) return states def make_observables(hidden_states): """Return list of observables given hidden states""" return [weighted_random_choice(emission_prob[s]) for s in hidden_states] def calc_probabilities(states): """Return {state:p(state), ...} for each state in list [s1,s2,...] >>> p = calc_probabilities(['b', 'b', 'a', 'a', 'b', 'b', 'a', 'b']) >>> "a: %4.2f, b: %4.2f" % (p['a'], p['b']) 'a: 0.38, b: 0.62' """ result = {} for state in states: result[state] = result.get(state, 0) + 1 for state in result.keys(): result[state] = float(result[state])/len(states) # normalize return result # float probabilities def calc_conditionals(states): """Return P(j|i) {j1:{i1:prob, i2:prob}, ...} for consecutive pairs in the list of states [i,j,...] is the probability of getting j given an i immediately before it. >>> p = calc_conditionals(['b', 'b', 'a', 'a', 'b', 'b', 'a', 'b']) >>> "P(a|a) = %4.2f" % p['a']['a'] 'P(a|a) = 0.33' >>> "P(b|a) = %4.2f" % p['a']['b'] 'P(b|a) = 0.67' >>> "P(a|b) = %4.2f" % p['b']['a'] 'P(a|b) = 0.50' >>> "P(b|b) = %4.2f" % p['b']['b'] 'P(b|b) = 0.50' """ # If we have (..., i, j, ...) in the sequence, # then we use these two to look at P(j|i), the probability of j given i, # which in the matrix is result[i][j] probabilities = calc_probabilities(states) state_list = probabilities.keys() state_list.sort() #print state_list result = {} for i in state_list: result[i] = {} for j in state_list: result[i][j] = 0 #print result counts = {} for index in range(len(states)-1): #print index (i, j) = (states[index], states[index+1]) result[i][j] += 1 for i in state_list: total = sum(result[i].values()) # = sum of one row; see transition_prob for j in state_list: result[i][j] = float(result[i][j])/total return result # Helps visualize the steps of Viterbi. def print_dptable(V): print " ", for i in range(len(V)): print "%7s" % ("%d" % i), print for y in V[0].keys(): print "%.5s: " % y, for t in range(len(V)): print "%.7s" % ("%f" % V[t][y]), print def viterbi(obs, states = states, start_p = start_prob, trans_p = transition_prob, emit_p = emission_prob): V = [{}] path = {} # Initialize base cases (t == 0) for y in states: V[0][y] = start_p[y] * emit_p[y][obs[0]] path[y] = [y] # Run Viterbi for t > 0 for t in range(1,len(obs)): V.append({}) newpath = {} for y in states: (prob, state) = max([(V[t-1][y0] * trans_p[y0][y] * emit_p[y][obs[t]], y0) for y0 in states]) V[t][y] = prob newpath[y] = path[state] + [y] # Don't need to remember the old paths path = newpath print_dptable(V) (prob, state) = max([(V[len(obs) - 1][y], y) for y in states]) #print V # I want to know what this really does. return (prob, path[state]) def test_markov(n): """Compare n markov states and observables with expected outcome""" states = make_markov_states(n) # state_prob = calc_probabilities(states) state_cond = calc_conditionals(states) print "--- actual conditional state prob ---" pp.pprint(transition_prob) print "--- measured conditional state prob ---" pp.pprint(state_cond) # state_prob = calc_probabilities(states) print "--- measure state probabilities ---" pp.pprint(state_prob) # obs = make_observables(states) def test_viterbi(n_steps): print "--------------------------" print "Running Viterbi Algorithm" print "--------------------------" states = make_markov_states(n_steps) seen = make_observables(states) (prob, path) = viterbi(seen) print " actual states : " + str(states) print " viterbi says : " + str(path) if __name__ == "__main__": doctest.testmod() test_markov(int(1e4)) test_viterbi(10)