""" hm5_viterbi.py Hidden Markov v5, with Viterbi Algorithm Elias Zeidan & Jim Mahoney - Oct-Nov 2011 Playing around with Hidden Markov Models in Natural Language Processing This was the assignment from last week : 1. Generate a long (1000 or more) sequence of hidden states. From that data alone, write code that finds a) the probability of each state, b) the conditional probabilities P(i|j), and c) compare those with what you calculate should be the theoretical values from the markov definitions. 2. a) Do the same for a long (1000 or more) sequence of obversables. b) How do you calculate the theoretical P(i|j) in this case? c) Do the values estimated from the data agree? 3. What does the Verterbi algorithm do? a) what is the input? b) what is the output? """ import random import doctest import string #List tuples of States and Outputs states = ('1', '2', '3') outputs = ('a', 'b', 'c', 'd', 'e') #Probabilities for first 'visible' node start_prob = {'1' : 0.4, '2' : 0.5, '3' : 0.1} #Probabilities for node transitions transition_prob = { '1' : {'1' : 0.2, '2' : 0.5, '3' : 0.3}, '2' : {'1' : 0.4, '2' : 0.0, '3' : 0.6}, '3' : {'1' : 0.5, '2' : 0.2, '3' : 0.3} } #Output probabilities from each node emission_prob = { '1' : {'a': 0.5, 'b' : 0.1, 'c' : 0.3, 'd' : 0.1, 'e' : 0.0}, '2' : {'a': 0.1, 'b' : 0.2, 'c' : 0.3, 'd' : 0.2, 'e' : 0.2}, '3' : {'a': 0.2, 'b' : 0.3, 'c' : 0.1, 'd' : 0.0, 'e' : 0.4}, } def weighted_random_choice(probabilities, x=False): """Given a dictionary of probabilities i.e. {'a':0.8, 'b':0.2}, return one of the keys with probability given by the values. >>> weighted_random_choice({'a':0.5, 'b':0.1, 'c':0.4}, x=0.1) 'a' >>> weighted_random_choice({'a':0.5, 'b':0.1, 'c':0.4}, x=0.55) 'b' >>> weighted_random_choice({'a':0.5, 'b':0.1, 'c':0.4}, x=0.8) 'c' """ threshold = 0.0 if not x: x = random.random() # 0 <= x < 1.0 choices = probabilities.keys() choices.sort() for choice in choices: threshold += probabilities[choice] #print "choice, prob, threshold, x = %s, %f, %f, %f" % \ # (choice, probabilities[choice], threshold, x) if x < threshold: return choice return None def make_markov_states(how_many=1000): """Return one sequence of hidden markov states""" states = [weighted_random_choice(start_prob)] for i in range(how_many-1): prob_next_state = transition_prob[states[-1]] states.append(weighted_random_choice(prob_next_state)) return states def make_observables(hidden_states): """Return list of observables given hidden states""" return [weighted_random_choice(emission_prob[s]) for s in hidden_states] def calc_probabilities(states): """Return {state:p(state), ...} for each state in list [s1,s2,...] >>> calc_probabilities(['b', 'a', 'a', 'a', 'b']) {'a': 0.60, 'b': 0.40} """ result = {} for state in states: result[state] = result.get(state, 0) + 1 for state in result.keys(): result[state] = float(result[state])/len(states) # normalize return result # float probabilities def calc_conditionals(states): """Return P(j|i) {j1:{i1:prob, i2:prob}, ...} for consecutive pairs in the list of states [i,j,...] is the probability of getting j given an i immediately before it. >>> calc_conditionals(['b', 'a', 'a', 'a']) {'a':{'a': 0.66, 'b':0.0}, 'b':{'a':0.33, 'b':0.0}} """ probabilities = calc_probabilities(states) state_list = probabilities.keys() state_list.sort() #print state_list result = {} for j in state_list: result[j] = {} for i in state_list: result[j][i] = 0 #print result counts = {} for index in range(len(states)-1): #print index (j, i) = (states[index], states[index+1]) result[j][i] += 1 counts[j] = counts.get(j,0) + 1 # for normalization for index in range(len(states)-1, len(states)): # to fix error thrown counts[states[index]] = counts.get(j,0) + 1 # at (j,i) = (n-2, n-1) #print result, counts, len(states) for j in state_list: # total = sum(result[j].values()) # this should be the same as counts[j] # = sum of one row in the matrix; see transition table above # and if so, we could get rid of counts above for i in state_list: result[j][i] = float(result[j][i])/counts[j] return result # Helps visualize the steps of Viterbi. def print_dptable(V): print " ", for i in range(len(V)): print "%7s" % ("%d" % i), print for y in V[0].keys(): print "%.5s: " % y, for t in range(len(V)): print "%.7s" % ("%f" % V[t][y]), print def viterbi(obs, states, start_p=start_prob, trans_p=transition_prob, emit_p=emission_prob): V = [{}] path = {} # Initialize base cases (t == 0) for y in states: V[0][y] = start_p[y] * emit_p[y][obs[0]] path[y] = [y] # Run Viterbi for t > 0 for t in range(1,len(obs)): V.append({}) newpath = {} for y in states: (prob, state) = max([(V[t-1][y0] * trans_p[y0][y] * emit_p[y][obs[t]], y0) for y0 in states]) V[t][y] = prob newpath[y] = path[state] + [y] # Don't need to remember the old paths path = newpath print_dptable(V) (prob, state) = max([(V[len(obs) - 1][y], y) for y in states]) #print V # I want to know what this really does. return (prob, path[state]) def test1000(): # generate 1000 a,b,c # calc conditional probabilities # compare with (i.e. find largest fractional error, say) theory states = make_markov_states(1000) obs = make_observables(states) return calc_conditionals(states), calc_conditionals(obs) def main(): states = make_markov_states(100) print "Hidden states\n----------" print states print "--------------------------" obs = make_observables(states) print "Observables\n----------" print obs print "--------------------------" state_cond = calc_conditionals(states) obs_cond = calc_conditionals(obs) print "Conditional Probabilities for Hidden States\n----------------------------" print state_cond print "--------------------------" print "Conditional Probabilities for Observables\n----------------------------" print obs_cond print "--------------------------" print "Running Viterbi Algorithm\n----------------------------" viterbi(obs, states) if __name__ == "__main__": doctest.testmod() main()