I have a problem with poor learning of an agent using the Q-function approximation algorithm. The agent stands in one place and does not make any movements. The state in my game is the player's position and the monster's position, i.e. ((x, y), (t, w)). The features I chose for approximation are the player's distance from the exit, the player's distance from the monster, the number of available actions, and a possible collision with the creature. The code with the game and the agent implementation is provided below - player - blue, monster - red, exit - green, start - yellow. Game looks like:
import pygame
import random
import numpy as np
ROWS, COLS = 11, 11
TILE_SIZE = 32
WIDTH, HEIGHT = COLS * TILE_SIZE, ROWS * TILE_SIZE
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)
maze = [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
[1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1],
[1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1],
[1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
class Monster:
def __init__(self, pos):
self.pos = pos
def move(self, target_pos, maze, available_actions):
row, col = self.pos
target_row, target_col = target_pos
if random.random() < 0.1:
if available_actions:
action = random.choice(available_actions)
if action == 0: # up
row -= 1
elif action == 1: # down
row += 1
elif action == 2: # left
col -= 1
elif action == 3: # right
col += 1
else:
if row < target_row and maze[row + 1][col] == 0:
row += 1
elif row > target_row and maze[row - 1][col] == 0:
row -= 1
elif col < target_col and maze[row][col + 1] == 0:
col += 1
elif col > target_col and maze[row][col - 1] == 0:
col -= 1
self.pos = (row, col)
class Maze:
def __init__(self, exit_pos = (ROWS - 2, COLS - 1), player_pos = (1, 0), monster_pos = (5, 4)):
self.action_space = [0, 1, 2, 3]
self.screen = None
self.maze = maze
self.exit_pos = exit_pos
self.player_pos = player_pos
self.monster = Monster(monster_pos)
def reset(self):
self.player_pos = (1, 0)
self.monster.pos = (5, 4)
return self.get_actual_state()
def get_actual_state(self):
return (self.player_pos, self.monster.pos)
def step(self, action):
self.player_pos = self.move_player(self.player_pos, action)
monster_actions = self.get_possible_actions(self.monster.pos)
self.monster.move(self.player_pos, self.maze, monster_actions)
done = False
step_reward = -1.0
if self.player_pos == self.monster.pos:
print(f"Agent zginął po spotkaniu z potworem. Koniec gry!")
step_reward = -100.0
done = True
elif self.player_pos == self.exit_pos:
print(f"Wygrana!")
step_reward = 1.0
done = True
return self.get_actual_state(), step_reward, done
def move_player(self, player_pos, action):
row, col = player_pos
if action == 0 and row > 0 and self.maze[row - 1][col] == 0: # up
row -= 1
elif action == 1 and row < ROWS - 1 and self.maze[row + 1][col] == 0: # down
row += 1
elif action == 2 and col > 0 and self.maze[row][col - 1] == 0: # left
col -= 1
elif action == 3 and col < COLS - 1 and self.maze[row][col + 1] == 0: # right
col += 1
return row, col
def render(self):
if not self.screen:
self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
self.screen.fill(BLACK)
self.draw_grid()
pygame.display.flip()
def draw_grid(self):
for row in range(ROWS):
for col in range(COLS):
# Kolor tła (białe dla ścieżek, czarne dla ścian)
color = WHITE if self.maze[row][col] == 0 else BLACK
pygame.draw.rect(self.screen, color, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE))
pygame.draw.rect(self.screen, BLACK, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE), 1)
# Rysowanie wyjścia (zielony)
pygame.draw.rect(self.screen, (0, 255, 0),
(self.exit_pos[1] * TILE_SIZE, self.exit_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie startu (żółty)
pygame.draw.rect(self.screen, (255, 255, 0),
(0 * TILE_SIZE, 1 * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie gracza (niebieski)
pygame.draw.rect(self.screen, BLUE,
(self.player_pos[1] * TILE_SIZE, self.player_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie potwora (czerwony)
pygame.draw.rect(self.screen, (255, 0, 0),
(self.monster.pos[1] * TILE_SIZE, self.monster.pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
def get_possible_actions(self, pos):
row, col = pos
actions = []
if row > 0 and self.maze[row - 1][col] == 0:
actions.append(0) # up
if row < ROWS - 1 and self.maze[row + 1][col] == 0:
actions.append(1) # down
if col > 0 and self.maze[row][col - 1] == 0:
actions.append(2) # left
if col < COLS - 1 and self.maze[row][col + 1] == 0:
actions.append(3) # right
return actions
def close(self):
pygame.quit()
def get_all_states(self):
return [(row, col) for row in range(ROWS) for col in range(COLS) if self.maze[row][col] == 0]
class ApproximateQLearningAgent:
def __init__(self, alpha, epsilon, discount, num_features, get_legal_actions, min_epsilon = 0.05, epsilon_decay = 0.995):
self.alpha = alpha
self.epsilon = epsilon
self.discount = discount
self.weights = np.random.uniform(-0.1, 0.1, (4, num_features))
self.min_epsilon = min_epsilon
self.epsilon_decay = epsilon_decay
self.get_legal_actions = get_legal_actions
def get_features(self, state):
player_pos, monster_pos = state
player_row, player_col = player_pos
exit_row, exit_col = (ROWS - 2, COLS - 1)
monster_row, monster_col = monster_pos
bias = 1.0
exit_dist = (abs(player_row - exit_row) + abs(player_col - exit_col)) / (ROWS + COLS - 2)
monster_dist = (abs(player_row - monster_row) + abs(player_col - monster_col)) / (ROWS + COLS - 2)
available_moves = len(self.get_legal_actions(state)) / 4.0
collision_risk = 1.0 if abs(player_row - monster_row) + abs(player_col - monster_col) <= 2 else 0.0
return np.array([bias, exit_dist, monster_dist, available_moves, collision_risk])
def get_qvalue(self, state, action):
features = self.get_features(state)
q_value = np.dot(self.weights[action], features)
return q_value
def get_value(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return 0.0
return max(self.get_qvalue(state, action) for action in possible_actions)
def get_best_action(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return None
qvalues = [self.get_qvalue(state, action) for action in possible_actions]
max_qvalue = max(qvalues)
best_actions = [action for action, qvalue in enumerate(qvalues) if qvalue == max_qvalue]
return random.choice(best_actions)
def get_action(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return None
if random.random() < self.epsilon:
return random.choice(possible_actions)
return self.get_best_action(state)
def update(self, state, action, reward, next_state):
features = self.get_features(state)
next_action = self.get_action(next_state)
delta = (reward/100 + self.discount * self.get_qvalue(next_state, next_action)) - self.get_qvalue(state, action)
self.weights[action] += self.alpha * delta * features
return delta
def turn_off_learning(self):
self.alpha = 0
self.epsilon = 0
def decay_epsilon(self):
self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)
agent's learning:
def train_agent(env, agent, num_episodes):
epsilon = []
deltas = []
for episode in range(num_episodes):
state = env.reset()
total_reward = 0
total_delta = 0
done = False
while not done:
action = agent.get_action(state)
next_state, reward, done = env.step(action)
delta = agent.update(state, action, reward, next_state)
state = next_state
total_reward += reward
total_delta += abs(delta)
if done:
break
epsilon.append(agent.epsilon)
deltas.append(total_delta / num_episodes)
agent.decay_epsilon()
print(f"Episode {episode + 1}: Total Reward = {total_reward}")
return epsilon, deltas
env = Maze()
agent = ApproximateQLearningAgent(alpha = 0.1, epsilon = 1.0, discount = 0.99, num_features = 5,
get_legal_actions = lambda state: env.get_possible_actions(state[0]))
epsilon, deltas = train_agent(env, agent, num_episodes = 2000)
evaluation:
agent.turn_off_learning()
env = Maze()
for test_episode in range(5):
state = env.reset()
total_reward = 0
done = False
print(f"Epizod testowy {test_episode + 1}:")
while not done:
action = agent.get_best_action(state)
next_state, reward, done = env.step(action)
total_reward += reward
state = next_state
env.render()
pygame.time.delay(100)
print(f"Nagroda: {total_reward}")
env.close()
I have a problem with poor learning of an agent using the Q-function approximation algorithm. The agent stands in one place and does not make any movements. The state in my game is the player's position and the monster's position, i.e. ((x, y), (t, w)). The features I chose for approximation are the player's distance from the exit, the player's distance from the monster, the number of available actions, and a possible collision with the creature. The code with the game and the agent implementation is provided below - player - blue, monster - red, exit - green, start - yellow. Game looks like:
import pygame
import random
import numpy as np
ROWS, COLS = 11, 11
TILE_SIZE = 32
WIDTH, HEIGHT = COLS * TILE_SIZE, ROWS * TILE_SIZE
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)
maze = [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
[1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1],
[1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1],
[1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
class Monster:
def __init__(self, pos):
self.pos = pos
def move(self, target_pos, maze, available_actions):
row, col = self.pos
target_row, target_col = target_pos
if random.random() < 0.1:
if available_actions:
action = random.choice(available_actions)
if action == 0: # up
row -= 1
elif action == 1: # down
row += 1
elif action == 2: # left
col -= 1
elif action == 3: # right
col += 1
else:
if row < target_row and maze[row + 1][col] == 0:
row += 1
elif row > target_row and maze[row - 1][col] == 0:
row -= 1
elif col < target_col and maze[row][col + 1] == 0:
col += 1
elif col > target_col and maze[row][col - 1] == 0:
col -= 1
self.pos = (row, col)
class Maze:
def __init__(self, exit_pos = (ROWS - 2, COLS - 1), player_pos = (1, 0), monster_pos = (5, 4)):
self.action_space = [0, 1, 2, 3]
self.screen = None
self.maze = maze
self.exit_pos = exit_pos
self.player_pos = player_pos
self.monster = Monster(monster_pos)
def reset(self):
self.player_pos = (1, 0)
self.monster.pos = (5, 4)
return self.get_actual_state()
def get_actual_state(self):
return (self.player_pos, self.monster.pos)
def step(self, action):
self.player_pos = self.move_player(self.player_pos, action)
monster_actions = self.get_possible_actions(self.monster.pos)
self.monster.move(self.player_pos, self.maze, monster_actions)
done = False
step_reward = -1.0
if self.player_pos == self.monster.pos:
print(f"Agent zginął po spotkaniu z potworem. Koniec gry!")
step_reward = -100.0
done = True
elif self.player_pos == self.exit_pos:
print(f"Wygrana!")
step_reward = 1.0
done = True
return self.get_actual_state(), step_reward, done
def move_player(self, player_pos, action):
row, col = player_pos
if action == 0 and row > 0 and self.maze[row - 1][col] == 0: # up
row -= 1
elif action == 1 and row < ROWS - 1 and self.maze[row + 1][col] == 0: # down
row += 1
elif action == 2 and col > 0 and self.maze[row][col - 1] == 0: # left
col -= 1
elif action == 3 and col < COLS - 1 and self.maze[row][col + 1] == 0: # right
col += 1
return row, col
def render(self):
if not self.screen:
self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
self.screen.fill(BLACK)
self.draw_grid()
pygame.display.flip()
def draw_grid(self):
for row in range(ROWS):
for col in range(COLS):
# Kolor tła (białe dla ścieżek, czarne dla ścian)
color = WHITE if self.maze[row][col] == 0 else BLACK
pygame.draw.rect(self.screen, color, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE))
pygame.draw.rect(self.screen, BLACK, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE), 1)
# Rysowanie wyjścia (zielony)
pygame.draw.rect(self.screen, (0, 255, 0),
(self.exit_pos[1] * TILE_SIZE, self.exit_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie startu (żółty)
pygame.draw.rect(self.screen, (255, 255, 0),
(0 * TILE_SIZE, 1 * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie gracza (niebieski)
pygame.draw.rect(self.screen, BLUE,
(self.player_pos[1] * TILE_SIZE, self.player_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie potwora (czerwony)
pygame.draw.rect(self.screen, (255, 0, 0),
(self.monster.pos[1] * TILE_SIZE, self.monster.pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
def get_possible_actions(self, pos):
row, col = pos
actions = []
if row > 0 and self.maze[row - 1][col] == 0:
actions.append(0) # up
if row < ROWS - 1 and self.maze[row + 1][col] == 0:
actions.append(1) # down
if col > 0 and self.maze[row][col - 1] == 0:
actions.append(2) # left
if col < COLS - 1 and self.maze[row][col + 1] == 0:
actions.append(3) # right
return actions
def close(self):
pygame.quit()
def get_all_states(self):
return [(row, col) for row in range(ROWS) for col in range(COLS) if self.maze[row][col] == 0]
class ApproximateQLearningAgent:
def __init__(self, alpha, epsilon, discount, num_features, get_legal_actions, min_epsilon = 0.05, epsilon_decay = 0.995):
self.alpha = alpha
self.epsilon = epsilon
self.discount = discount
self.weights = np.random.uniform(-0.1, 0.1, (4, num_features))
self.min_epsilon = min_epsilon
self.epsilon_decay = epsilon_decay
self.get_legal_actions = get_legal_actions
def get_features(self, state):
player_pos, monster_pos = state
player_row, player_col = player_pos
exit_row, exit_col = (ROWS - 2, COLS - 1)
monster_row, monster_col = monster_pos
bias = 1.0
exit_dist = (abs(player_row - exit_row) + abs(player_col - exit_col)) / (ROWS + COLS - 2)
monster_dist = (abs(player_row - monster_row) + abs(player_col - monster_col)) / (ROWS + COLS - 2)
available_moves = len(self.get_legal_actions(state)) / 4.0
collision_risk = 1.0 if abs(player_row - monster_row) + abs(player_col - monster_col) <= 2 else 0.0
return np.array([bias, exit_dist, monster_dist, available_moves, collision_risk])
def get_qvalue(self, state, action):
features = self.get_features(state)
q_value = np.dot(self.weights[action], features)
return q_value
def get_value(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return 0.0
return max(self.get_qvalue(state, action) for action in possible_actions)
def get_best_action(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return None
qvalues = [self.get_qvalue(state, action) for action in possible_actions]
max_qvalue = max(qvalues)
best_actions = [action for action, qvalue in enumerate(qvalues) if qvalue == max_qvalue]
return random.choice(best_actions)
def get_action(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return None
if random.random() < self.epsilon:
return random.choice(possible_actions)
return self.get_best_action(state)
def update(self, state, action, reward, next_state):
features = self.get_features(state)
next_action = self.get_action(next_state)
delta = (reward/100 + self.discount * self.get_qvalue(next_state, next_action)) - self.get_qvalue(state, action)
self.weights[action] += self.alpha * delta * features
return delta
def turn_off_learning(self):
self.alpha = 0
self.epsilon = 0
def decay_epsilon(self):
self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)
agent's learning:
def train_agent(env, agent, num_episodes):
epsilon = []
deltas = []
for episode in range(num_episodes):
state = env.reset()
total_reward = 0
total_delta = 0
done = False
while not done:
action = agent.get_action(state)
next_state, reward, done = env.step(action)
delta = agent.update(state, action, reward, next_state)
state = next_state
total_reward += reward
total_delta += abs(delta)
if done:
break
epsilon.append(agent.epsilon)
deltas.append(total_delta / num_episodes)
agent.decay_epsilon()
print(f"Episode {episode + 1}: Total Reward = {total_reward}")
return epsilon, deltas
env = Maze()
agent = ApproximateQLearningAgent(alpha = 0.1, epsilon = 1.0, discount = 0.99, num_features = 5,
get_legal_actions = lambda state: env.get_possible_actions(state[0]))
epsilon, deltas = train_agent(env, agent, num_episodes = 2000)
evaluation:
agent.turn_off_learning()
env = Maze()
for test_episode in range(5):
state = env.reset()
total_reward = 0
done = False
print(f"Epizod testowy {test_episode + 1}:")
while not done:
action = agent.get_best_action(state)
next_state, reward, done = env.step(action)
total_reward += reward
state = next_state
env.render()
pygame.time.delay(100)
print(f"Nagroda: {total_reward}")
env.close()
Share
Improve this question
edited Jan 17 at 20:21
papierowka
asked Jan 17 at 15:47
papierowkapapierowka
112 bronze badges
5
|
1 Answer
Reset to default 0Some bugs:
1. Enumerating of actions in get_best_action
:
Inside get_best_action
method there are the following lines of code:
qvalues = [self.get_qvalue(state, action) for action in possible_actions]
max_qvalue = max(qvalues)
best_actions = [action for action, qvalue in enumerate(qvalues) if qvalue == max_qvalue]
return random.choice(best_actions)
possible_actions
is a list of numbers, each one being an action from those currently available. The latter means that the list does not always contain all the actions necessarily of course. Later you enumerate
the qvalues
and assume that the index of the current qvalue
is the action
, which is the cause of the bug.
Let's say for example possible_actions
is only actions [0, 3]
. Then we would calculate their q values resulting in a list with two elements, say [q_value_for_action_0, q_value_for_action_3]
(I am just giving names to the q values for simplicity). Lastly, upon enumerate
ing the qvalues
we get two pairs in format action, qvalue
, but since we use enumerate
then the action
s will always be 0
, 1
, 2
, etc... But the qvalue
will in this case be q_value_for_action_0
and then q_value_for_action_3
. The problem is that q_value_for_action_3
would be matched against action with index 1
(as per the enumerate
operation), but it is action 3
!
To fix this, just replace enumerate(qvalues)
with zip(possible_actions, qvalues)
inside the method.
2. next_action
when updating the agent:
When updating the agent (ie inside update
method) you calculate the next action with:
next_action = self.get_action(next_state)
The thing that is wrong with this is that you are calculating the action with get_action
which may yield a random action (depending on epsilon
) for the state's estimation. Instead, according to the algorithm, we need the maximum Q-value between all legal actions for that state. Which in turn is the same (as far as I understand) with the Q-value of the best action for that state. So you can fix this by either using self.get_value(next_state)
(ie the next state's estimated value) directly in the temporal difference target calculation, or by using get_best_action
instead of get_action
.
3. Edge case for terminating state.
There is an important detail when calculating the temporal difference (inside update
) for the terminal state:
... we do not need to estimate the value of the next state because there is no next state for a terminal state.
In the code in question, there is no special handling for this case. To fix this, multiply the next state's value estimate by 1 - int(done)
, where done
is the True
when we are at a terminal state, otherwise False
.
Incentives:
Just changed the reward for the successful termination from 1
to 100
. That's because with just 1
for it, along with -1
for each non-terminal step, the reward of 1
just invalidates the last non-terminal step. This may not be important at all, but it was part of my most promising tests, so I left it as such.
Hyperparameter tuning:
I increased the number of episodes, because 2k seemed like never enough for the player to reach the exit. After fixing the above bugs and setting episodes to 20k I started seeing the player exploring greater portions of the map, with better distribution in visited locations, and actually making it to the exit for about 4600 (~23%) of the episodes. At the same time of course I had also configured epsilon_decay
to 0.999
in order to reach the minimum epsilon
within the first about 4.5k episodes, since with the original 0.995
it was decaying too fast and I thought this would harm exploration. I also used half of the learning rate (ie alpha = 0.05
) because 0.1
seems a bit too big, taking into account that greater portion of the episodes has now a greater epsilon
(ie more random moves initially). Finally I changed discount
and minimum_epsilon
(example values in the following code).
I understand though that we have to fix bugs first and then go to hyperparameter tuning, so I am not blaming you for anything here of course, I am just saying how I started to observe desired results.
Code:
import pygame
import random
import numpy as np
ROWS, COLS = 11, 11
TILE_SIZE = 32
WIDTH, HEIGHT = COLS * TILE_SIZE, ROWS * TILE_SIZE
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)
maze = [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
[1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1],
[1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1],
[1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
# Utility method to print a map like 'maze' variable:
def print_map(the_map, repr_per_element={}):
for row in the_map:
for element in row:
print(repr_per_element.get(element, f'{element:8d}'), end='')
print()
# Some statistics about each entity (player/monster). These only serve to
# get an idea on how each entity moves and they do not affect learning.
class EntityStatistics:
def __init__(self, name, the_map = maze):
self.name = name
self.reset(the_map)
# [Re]Initialize the entity's statistics:
def reset(self, the_map = None):
if the_map is None:
the_map = self.maze_visit_frequency # Only to maintain structure.
self.maze_visit_frequency = [[0] * len(row) for row in the_map]
selfpletion_count = 0
# Called for each position the entity visits, for each time it does.
# This will be called for the same position more than once in a row in case the entity failed to move at all.
def visited(self, row, col):
self.maze_visit_frequency[row][col] += 1
# Called when the entity succeeds its goal:
def completed(self):
selfpletion_count += 1
# Prints results for the entity:
def print(self):
print(f'{self.name} completed {selfpletion_count} time(s) with the following heatmap:')
print_map(self.maze_visit_frequency)
# This represents the player just like Monster does for the monster.
# Nevertheless the representation is only partial, since there are more
# properties to add here (such as player's position in the map).
class Player:
def __init__(self):
self.stats = EntityStatistics('Player') # Holds only the statistics currently.
class Monster:
def __init__(self, pos):
self.pos = pos
self.stats = EntityStatistics('Monster') # Added monster statistics.
def move(self, target_pos, maze, available_actions):
row, col = self.pos
target_row, target_col = target_pos
if random.random() < 0.1:
if available_actions:
action = random.choice(available_actions)
if action == 0: # up
row -= 1
elif action == 1: # down
row += 1
elif action == 2: # left
col -= 1
elif action == 3: # right
col += 1
else:
if row < target_row and maze[row + 1][col] == 0:
row += 1
elif row > target_row and maze[row - 1][col] == 0:
row -= 1
elif col < target_col and maze[row][col + 1] == 0:
col += 1
elif col > target_col and maze[row][col - 1] == 0:
col -= 1
self.pos = (row, col)
class Maze:
def __init__(self, exit_pos = (ROWS - 2, COLS - 1), player_pos = (1, 0), monster_pos = (5, 4)):
self.action_space = [0, 1, 2, 3]
self.screen = None
self.maze = maze
self.exit_pos = exit_pos
self.player_pos = player_pos
self.monster = Monster(monster_pos)
self.player = Player() # Only for statistics currently.
def reset(self):
self.player_pos = (1, 0)
self.monster.pos = (5, 4)
# Update statistics accordingly:
#self.monster.stats.reset() # Use this if you need to reset the statistics per episode.
#self.player.stats.reset() # Use this if you need to reset the statistics per episode.
self.monster.stats.visited(*self.monster.pos) # Monster 'visited' its initial position.
self.player.stats.visited(*self.player_pos) # Player 'visited' their initial position.
return self.get_actual_state()
def get_actual_state(self):
return (self.player_pos, self.monster.pos)
def step(self, action):
self.player_pos = self.move_player(self.player_pos, action)
self.player.stats.visited(*self.player_pos) # Player visited a new (or the same again) position.
monster_actions = self.get_possible_actions(self.monster.pos)
self.monster.move(self.player_pos, self.maze, monster_actions)
self.monster.stats.visited(*self.monster.pos) # Monster visited a new (or the same again) position.
done = False
step_reward = -1.0
if self.player_pos == self.monster.pos:
# print(f"Agent zginął po spotkaniu z potworem. Koniec gry!")
step_reward = -100.0
done = True
self.monster.statspleted() # The monster succeeded.
elif self.player_pos == self.exit_pos:
# print(f"Wygrana!")
step_reward = 100.0 # Changed reward.
done = True
self.player.statspleted() # The player succeeded.
return self.get_actual_state(), step_reward, done
def move_player(self, player_pos, action):
row, col = player_pos
if action == 0 and row > 0 and self.maze[row - 1][col] == 0: # up
row -= 1
elif action == 1 and row < ROWS - 1 and self.maze[row + 1][col] == 0: # down
row += 1
elif action == 2 and col > 0 and self.maze[row][col - 1] == 0: # left
col -= 1
elif action == 3 and col < COLS - 1 and self.maze[row][col + 1] == 0: # right
col += 1
return row, col
def render(self):
if not self.screen:
self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
self.screen.fill(BLACK)
self.draw_grid()
pygame.display.flip()
def draw_grid(self):
for row in range(ROWS):
for col in range(COLS):
# Kolor tła (białe dla ścieżek, czarne dla ścian)
color = WHITE if self.maze[row][col] == 0 else BLACK
pygame.draw.rect(self.screen, color, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE))
pygame.draw.rect(self.screen, BLACK, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE), 1)
# Rysowanie wyjścia (zielony)
pygame.draw.rect(self.screen, (0, 255, 0),
(self.exit_pos[1] * TILE_SIZE, self.exit_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie startu (żółty)
pygame.draw.rect(self.screen, (255, 255, 0),
(0 * TILE_SIZE, 1 * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie gracza (niebieski)
pygame.draw.rect(self.screen, BLUE,
(self.player_pos[1] * TILE_SIZE, self.player_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
# Rysowanie potwora (czerwony)
pygame.draw.rect(self.screen, (255, 0, 0),
(self.monster.pos[1] * TILE_SIZE, self.monster.pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))
def get_possible_actions(self, pos):
row, col = pos
actions = []
if row > 0 and self.maze[row - 1][col] == 0:
actions.append(0) # up
if row < ROWS - 1 and self.maze[row + 1][col] == 0:
actions.append(1) # down
if col > 0 and self.maze[row][col - 1] == 0:
actions.append(2) # left
if col < COLS - 1 and self.maze[row][col + 1] == 0:
actions.append(3) # right
return actions
def close(self):
pygame.quit()
def get_all_states(self):
return [(row, col) for row in range(ROWS) for col in range(COLS) if self.maze[row][col] == 0]
class ApproximateQLearningAgent:
def __init__(self, alpha, epsilon, discount, num_features, get_legal_actions, min_epsilon = 0.05, epsilon_decay = 0.995):
self.alpha = alpha
self.epsilon = epsilon
self.discount = discount
self.weights = np.random.uniform(-0.1, 0.1, (4, num_features))
self.min_epsilon = min_epsilon
self.epsilon_decay = epsilon_decay
self.get_legal_actions = get_legal_actions
def get_features(self, state):
player_pos, monster_pos = state
player_row, player_col = player_pos
exit_row, exit_col = (ROWS - 2, COLS - 1)
monster_row, monster_col = monster_pos
bias = 1.0
exit_dist = (abs(player_row - exit_row) + abs(player_col - exit_col)) / (ROWS + COLS - 2)
monster_dist = (abs(player_row - monster_row) + abs(player_col - monster_col)) / (ROWS + COLS - 2)
available_moves = len(self.get_legal_actions(state)) / 4.0
collision_risk = 1.0 if abs(player_row - monster_row) + abs(player_col - monster_col) <= 2 else 0.0
return np.array([bias, exit_dist, monster_dist, available_moves, collision_risk])
def get_qvalue(self, state, action):
features = self.get_features(state)
q_value = np.dot(self.weights[action], features)
return q_value
def get_value(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return 0.0
return max(self.get_qvalue(state, action) for action in possible_actions)
def get_best_actions_with_loop(self, state, possible_actions):
best_actions = []
max_q = None
for action in possible_actions:
cur_q = self.get_qvalue(state, action)
if max_q is None or cur_q > max_q:
max_q = cur_q
best_actions = [action]
elif cur_q == max_q:
best_actions.append(action)
return best_actions
def get_best_action(self, state, possible_actions=None):
if possible_actions is None: # Minor optimization (if possible_actions was already calculated, don't calculate it again).
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return None
# Fixed enumerating of actions with a loop.
best_actions = self.get_best_actions_with_loop(state, possible_actions)
return random.choice(best_actions)
def get_action(self, state):
possible_actions = self.get_legal_actions(state)
if len(possible_actions) == 0:
return None
if random.random() < self.epsilon:
return random.choice(possible_actions)
return self.get_best_action(state, possible_actions)
# Note: method signature changed ('done' is mandatory).
def update(self, state, action, reward, done, next_state, next_action):
features = self.get_features(state)
# 'next_action' is the action that the agent will indeed take, but
# we instead need the best one for calculating delta in Q-learning:
next_action = self.get_best_action(next_state)
done = int(done) # Convert True and False to 1 and 0.
# Fixed delta calculation (the middle term is zeroed when done is True):
delta = (reward/100) + (self.discount * self.get_qvalue(next_state, next_action) * (1 - done)) - self.get_qvalue(state, action)
# otherwise use:
#delta = (reward/100) + (self.discount * self.get_value(next_state) * (1 - done)) - self.get_qvalue(state, action)
self.weights[action] += self.alpha * delta * features
return delta
def turn_off_learning(self):
self.alpha = 0
self.epsilon = 0
def decay_epsilon(self):
self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)
def train_agent(env, agent, num_episodes):
epsilon = []
deltas = []
total_rewards = [] # We will also maintain total_rewards for statistics purposes only.
for episode in range(num_episodes):
total_reward = 0
total_delta = 0
done = False
state = env.reset()
action = agent.get_action(state)
# Training loop is modified (next_action is the actual action the agent is going to take)...
while not done:
next_state, reward, done = env.step(action)
next_action = agent.get_action(next_state)
delta = agent.update(state, action, reward, done, next_state, next_action)
state = next_state
action = next_action
total_reward += reward
total_delta += abs(delta)
total_rewards.append(total_reward)
epsilon.append(agent.epsilon)
deltas.append(total_delta / num_episodes)
agent.decay_epsilon()
# Debug prints every 2k episodes.
if (episode % 2000) == 0:
avg_of_total = round(sum(total_rewards) / len(total_rewards), 2)
print(f"Episode {episode + 1}: Average Total Reward so far = {avg_of_total}")
return epsilon, deltas
if __name__ == '__main__':
env = Maze()
agent = ApproximateQLearningAgent(
# Hyperparameter tuning is modified.
alpha = 0.05, epsilon = 1.0, discount = 0.9995, num_features = 5,
epsilon_decay = 0.999, min_epsilon = 0.01,
get_legal_actions = lambda state: env.get_possible_actions(state[0])
)
# Training:
epsilon, deltas = train_agent(env, agent, num_episodes = 20000)
# Print training statistics:
print()
print('Given the map:')
print_map(maze, {
0: ' ' * 8,
1: u'\u2588' * 8
})
print()
env.monster.stats.print()
print()
env.player.stats.print()
print()
cnt = 0
for e in epsilon:
if e > agent.min_epsilon:
#print(f'Epsilon: {e}')
cnt += 1
print(f'{cnt} total epsilons greater than minimum.')
# Evaluation:
agent.turn_off_learning()
env = Maze()
for test_episode in range(5):
state = env.reset()
total_reward = 0
done = False
print(f"Episode {test_episode + 1}:")
while not done:
action = agent.get_best_action(state)
next_state, reward, done = env.step(action)
total_reward += reward
state = next_state
env.render()
pygame.time.delay(100)
print(f"Reward: {total_reward}")
env.close()
To clarify, I am having some issues with my GPU drivers, so I can't run at the moment the GUI evaluation. So I used another (very simple) means of verifying progress: I just kept a counter for each visited location by each entity (monster/player) and how many times the agent found the exit or the monster, which was helpful, so the code includes these debuging aids also. I judged that there is progress because reaching the exit for ~23% of the 20k episodes seems already a lot better than not reaching at all (which was the case initially).
Sample output:
pygame 2.6.1 (SDL 2.28.4, Python 3.13.0) Hello from the pygame community. https://www.pygame./contribute.html Episode 1: Average Total Reward so far = -104.0 Episode 2001: Average Total Reward so far = -103.48 Episode 4001: Average Total Reward so far = -89.36 Episode 6001: Average Total Reward so far = -84.3 Episode 8001: Average Total Reward so far = -81.8 Episode 10001: Average Total Reward so far = -80.87 Episode 12001: Average Total Reward so far = -78.77 Episode 14001: Average Total Reward so far = -78.13 Episode 16001: Average Total Reward so far = -78.46 Episode 18001: Average Total Reward so far = -77.94 Given the map: ████████████████████████████████████████████████████████████████████████████████████████ ████████ ████████ ████████ ████████ ████████████████████████ ████████ ████████ ████████ ████████ ████████ ████████ ████████████████████████ ████████████████████████ ████████ ████████ ████████ ████████ ████████ ████████████████████████████████████████ ████████ ████████ ████████ ████████ ████████████████ ████████████████ ████████ ████████ ████████ ████████ ████████ ████████████████████████████████████████████████████████████████████████████████████████ Monster completed 15136 time(s) with the following heatmap: 0 0 0 0 0 0 0 0 0 0 0 103 3220 2630 7513 206 0 973 6285 396 367 0 0 3553 0 6111 0 0 0 12713 0 456 0 0 4335 863 6929 6286 16947 12548 54903 0 440 0 0 4252 0 0 0 16294 0 0 0 426 0 0 4443 24014 165477 120225 18425 2990 40573 2439 705 0 0 163 0 5781 0 0 0 0 0 461 0 0 18 16 339 46 162 0 0 5 347 0 0 0 6 0 0 74 0 0 0 277 0 0 0 5 3 7 49 4 1 0 103 0 0 0 0 0 0 0 0 0 0 0 0 Player completed 4864 time(s) with the following heatmap: 0 0 0 0 0 0 0 0 0 0 0 22587 24981 21048 136097 111211 0 50 5886 5205 6647 0 0 2442 0 24277 0 0 0 34581 0 6510 0 0 1091 798 20098 11986 12375 10336 38767 0 4800 0 0 168 0 0 0 2027 0 0 0 4684 0 0 37 3 0 8 1302 322 299 333 5018 0 0 17 0 0 0 0 0 0 0 4969 0 0 15 19 7 0 0 0 18 46 5565 0 0 0 12 0 0 1 0 204 0 12295 0 0 1 9 6 5 6 6 192 0 11676 4864 0 0 0 0 0 0 0 0 0 0 0 4603 total epsilons greater than minimum.
... and then the crash message for the GPU driver appeared which I did not include above.
Note:
I also tried with traditional tabular Q-learning on your environment and got the player to reach the exit for more than 10k (>50%) of the episodes at times. Specifically, I commented out get_features
, updated the get_qvalue
method to read the Q matrix and updated the update
method with the Bellman equation and storing the new Q values back in their matrix. This however indicates that there certainly is a lot room for improvement. If no other bugs can be found, then there is always the case that the features and/or hypereparameter tuning may require improvements. But, this is how far I am able to progress at this point.
ROWS
andCOLS
andpygame
import). It may not be related, or for some people it may be obvious how to run this code and what isROWS
andCOLS
, but I am just asking for completeness. Please try to post something that we can copy-paste and run (see minimal reproducible example for more information). – gthanop Commented Jan 17 at 16:03maze
definition seems to be missing, as well as other variables (WIDTH
,HEIGHT
,TILE_SIZE
,BLACK
,WHITE
, etc...). Again they may be not important for the learning of the agent (exceptmaze
), but the question is about teaching the agent, so the code for it should exist fully in order for others to test it. – gthanop Commented Jan 17 at 16:15maze[row][col]
is equal to0
then the maze has a passage there (ie the player and monster can move to it), otherwise it's a wall... Is that correct? – gthanop Commented Jan 17 at 16:46