-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
159 lines (137 loc) · 5.16 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import utils
import flappy_bird_gym
import time
import random
import numpy as np
global mode
mode = 0
class SmartFlappyBird:
def __init__(self, iterations):
self.Qvalues = utils.Counter()
self.landa = 1
self.epsilon = 0.8
self.alpha = 0.5
self.iterations = iterations # Training
def policy(self, state):
# implement the best way to get action base on current state
return self.max_arg(state)
@staticmethod
def get_all_actions():
return [0, 1]
@staticmethod
def convert_continuous_to_discrete(state):
# implement the best way to convert continuous distance values to discrete values
# range of x : (0.000, 1.650)
# range of y : (-0.500, 0.650)
x, y = state
rounded_x = round(x, 1)
rounded_y = round(y, 1)
if rounded_x == -0.0:
rounded_x = 0.0
if rounded_y == -0.0:
rounded_y = 0.0
return rounded_x, rounded_y
def compute_reward(self, prev_info, new_info, done, observation):
# implement the best way to compute reward base on observation and score
observation = self.convert_continuous_to_discrete((observation))
_, y = observation
if done: # Lost
return -1000
# Inside Pipe
elif (0 <= y and y <= 0.05):
new_info['score'] = prev_info['score'] + 1
return 500
else: # Continues
return 1
def get_action(self, state):
# implement the best way to get action base on current state
if mode == 0:
if utils.flip_coin(self.epsilon): # Random
rand = random.randint(0, 100)
choice = 1
if rand < 90:
choice = 0
return SmartFlappyBird.get_all_actions()[choice]
else: # Based on Policy
return self.policy(state)
else:
return self.policy(state)
def maxQ(self, state):
# return max Q value of a state
state = self.convert_continuous_to_discrete(tuple(state))
actions = self.get_all_actions()
q_values = [self.Qvalues.get((state, action), 0) for action in actions]
return max(q_values)
def max_arg(self, state):
# return argument of the max q of a state
state = self.convert_continuous_to_discrete(tuple(state))
actions = self.get_all_actions()
values = [self.Qvalues.get((state, action), 0)
for action in actions]
return actions[np.argmax(values)]
def update(self, reward, state, action, next_state):
# update q table
state = self.convert_continuous_to_discrete(state)
next_state = self.convert_continuous_to_discrete(next_state)
my_tuple = state, action
max_arg_next_state = self.max_arg(next_state)
self.Qvalues[my_tuple] += self.alpha * \
(reward + self.landa * (self.Qvalues[next_state, max_arg_next_state]) -
self.Qvalues[my_tuple])
def update_epsilon_alpha(self):
# update epsilon and alpha base on iterations
self.alpha = max(self.alpha * 0.95, 0.01)
self.epsilon = max(self.epsilon * 0.95, 0.1)
def run_with_policy(self, landa):
self.landa = landa
env = flappy_bird_gym.make("FlappyBird-v0")
observation = env.reset()
info = {'score': 0}
observations = []
for _i in range(self.iterations):
info = {'score': 0}
while True:
action = self.get_action(observation) # policy affects here
this_state = observation
prev_info = info
observation, reward, done, info = env.step(action)
observations.append(observation)
reward = self.compute_reward(
prev_info, info, done, tuple(observation))
self.update(reward, tuple(this_state),
action, tuple(observation))
self.update_epsilon_alpha()
if done:
observation = env.reset()
break
env.close()
def run_with_no_policy(self, landa):
scores = []
for _ in range(10):
self.landa = landa
# no policy test
env = flappy_bird_gym.make("FlappyBird-v0")
observation = env.reset()
info = {'score': 0}
while True:
# with no-policy -> wrong!
action = self.get_action(observation)
prev_info = info
observation, reward, done, info = env.step(action)
reward = self.compute_reward(
prev_info, info, done, observation)
env.render()
time.sleep(1 / 30) # FPS
if done:
break
env.close()
scores.append(info['score'])
print(scores)
print(f"\nAverage Score: {sum(scores) / len(scores)}\n")
def run(self):
self.run_with_policy(1)
global mode
mode = 1
self.run_with_no_policy(1)
program = SmartFlappyBird(iterations=1000)
program.run()