最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Problem with linear Q-function approximation for a maze runner game - Stack Overflow

programmeradmin1浏览0评论

I have a problem with poor learning of an agent using the Q-function approximation algorithm. The agent stands in one place and does not make any movements. The state in my game is the player's position and the monster's position, i.e. ((x, y), (t, w)). The features I chose for approximation are the player's distance from the exit, the player's distance from the monster, the number of available actions, and a possible collision with the creature. The code with the game and the agent implementation is provided below - player - blue, monster - red, exit - green, start - yellow. Game looks like:

import pygame
import random
import numpy as np

ROWS, COLS = 11, 11
TILE_SIZE = 32
WIDTH, HEIGHT = COLS * TILE_SIZE, ROWS * TILE_SIZE
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)

maze = [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
   [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
   [1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1],
   [1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
   [1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1],
   [1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
   [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]

class Monster:
def __init__(self, pos):
    self.pos = pos

def move(self, target_pos, maze, available_actions):
    row, col = self.pos
    target_row, target_col = target_pos

    if random.random() < 0.1:
        if available_actions:
            action = random.choice(available_actions)
            if action == 0:  # up
                row -= 1
            elif action == 1:  # down
                row += 1
            elif action == 2:  # left
                col -= 1
            elif action == 3:  # right
                col += 1
    else:
        if row < target_row and maze[row + 1][col] == 0:
            row += 1
        elif row > target_row and maze[row - 1][col] == 0:
            row -= 1
        elif col < target_col and maze[row][col + 1] == 0:
            col += 1
        elif col > target_col and maze[row][col - 1] == 0:
            col -= 1

    self.pos = (row, col)

class Maze:
def __init__(self, exit_pos = (ROWS - 2, COLS - 1), player_pos = (1, 0), monster_pos = (5, 4)):
    self.action_space = [0, 1, 2, 3]
    self.screen = None
    self.maze = maze
    self.exit_pos = exit_pos
    self.player_pos = player_pos
    self.monster = Monster(monster_pos)

def reset(self):
    self.player_pos = (1, 0)
    self.monster.pos = (5, 4)
    return self.get_actual_state()

def get_actual_state(self):
    return (self.player_pos, self.monster.pos)

def step(self, action):
    self.player_pos = self.move_player(self.player_pos, action)

    monster_actions = self.get_possible_actions(self.monster.pos)
    self.monster.move(self.player_pos, self.maze, monster_actions)

    done = False
    step_reward = -1.0
    
    if self.player_pos == self.monster.pos:
        print(f"Agent zginął po spotkaniu z potworem. Koniec gry!")
        step_reward = -100.0
        done = True
        
    elif self.player_pos == self.exit_pos:
        print(f"Wygrana!")
        step_reward = 1.0
        done = True
    
    return self.get_actual_state(), step_reward, done

def move_player(self, player_pos, action):
    row, col = player_pos
    if action == 0 and row > 0 and self.maze[row - 1][col] == 0:  # up
        row -= 1
    elif action == 1 and row < ROWS - 1 and self.maze[row + 1][col] == 0:  # down
        row += 1
    elif action == 2 and col > 0 and self.maze[row][col - 1] == 0:  # left
        col -= 1
    elif action == 3 and col < COLS - 1 and self.maze[row][col + 1] == 0:  # right
        col += 1
    return row, col

def render(self):
    if not self.screen:
        self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
    self.screen.fill(BLACK)
    self.draw_grid()
    pygame.display.flip()

def draw_grid(self):
    for row in range(ROWS):
        for col in range(COLS):
            # Kolor tła (białe dla ścieżek, czarne dla ścian)
            color = WHITE if self.maze[row][col] == 0 else BLACK
            pygame.draw.rect(self.screen, color, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE))
            pygame.draw.rect(self.screen, BLACK, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE), 1)

    # Rysowanie wyjścia (zielony)
    pygame.draw.rect(self.screen, (0, 255, 0), 
                 (self.exit_pos[1] * TILE_SIZE, self.exit_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

    # Rysowanie startu (żółty)
    pygame.draw.rect(self.screen, (255, 255, 0), 
                 (0 * TILE_SIZE, 1 * TILE_SIZE, TILE_SIZE, TILE_SIZE))

    # Rysowanie gracza (niebieski)
    pygame.draw.rect(self.screen, BLUE, 
                 (self.player_pos[1] * TILE_SIZE, self.player_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

    # Rysowanie potwora (czerwony)
    pygame.draw.rect(self.screen, (255, 0, 0), 
                 (self.monster.pos[1] * TILE_SIZE, self.monster.pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

     
def get_possible_actions(self, pos):
    row, col = pos
    actions = []
    if row > 0 and self.maze[row - 1][col] == 0:
        actions.append(0)  # up
    if row < ROWS - 1 and self.maze[row + 1][col] == 0:
        actions.append(1)  # down
    if col > 0 and self.maze[row][col - 1] == 0:
        actions.append(2)  # left
    if col < COLS - 1 and self.maze[row][col + 1] == 0:
        actions.append(3)  # right
    return actions

def close(self):
    pygame.quit()
    
def get_all_states(self):
    return [(row, col) for row in range(ROWS) for col in range(COLS) if self.maze[row][col] == 0]


class ApproximateQLearningAgent:
def __init__(self, alpha, epsilon, discount, num_features, get_legal_actions, min_epsilon = 0.05, epsilon_decay = 0.995):
    self.alpha = alpha  
    self.epsilon = epsilon  
    self.discount = discount 
    self.weights = np.random.uniform(-0.1, 0.1, (4, num_features))  
    self.min_epsilon = min_epsilon  
    self.epsilon_decay = epsilon_decay  
    self.get_legal_actions = get_legal_actions

def get_features(self, state):
    player_pos, monster_pos = state
    player_row, player_col = player_pos
    exit_row, exit_col = (ROWS - 2, COLS - 1)
    monster_row, monster_col = monster_pos

    bias = 1.0
    exit_dist = (abs(player_row - exit_row) + abs(player_col - exit_col)) / (ROWS + COLS - 2)
    monster_dist = (abs(player_row - monster_row) + abs(player_col - monster_col)) / (ROWS + COLS - 2)
    available_moves = len(self.get_legal_actions(state)) / 4.0  
    collision_risk = 1.0 if abs(player_row - monster_row) + abs(player_col - monster_col) <= 2 else 0.0

    return np.array([bias, exit_dist, monster_dist, available_moves, collision_risk])

def get_qvalue(self, state, action):
    features = self.get_features(state)
    q_value = np.dot(self.weights[action], features)
    return q_value

def get_value(self, state):
    possible_actions = self.get_legal_actions(state)
    if len(possible_actions) == 0:
        return 0.0
    
    return max(self.get_qvalue(state, action) for action in possible_actions)

def get_best_action(self, state):
    possible_actions = self.get_legal_actions(state)
    if len(possible_actions) == 0:
        return None
    
    qvalues = [self.get_qvalue(state, action) for action in possible_actions]
    max_qvalue = max(qvalues)
    best_actions = [action for action, qvalue in enumerate(qvalues) if qvalue == max_qvalue]
    return random.choice(best_actions)

def get_action(self, state):
    possible_actions = self.get_legal_actions(state)
    if len(possible_actions) == 0:
        return None

    if random.random() < self.epsilon:
        return random.choice(possible_actions)  
    return self.get_best_action(state)  

def update(self, state, action, reward, next_state):
    features = self.get_features(state)
    next_action = self.get_action(next_state)

    delta = (reward/100 + self.discount * self.get_qvalue(next_state, next_action)) - self.get_qvalue(state, action)

    self.weights[action] += self.alpha * delta * features
    return delta

def turn_off_learning(self):
    self.alpha = 0
    self.epsilon = 0

def decay_epsilon(self):
    self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

agent's learning:

def train_agent(env, agent, num_episodes):
    epsilon = []
    deltas = [] 
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        total_delta = 0
        done = False
    
        while not done:

            action = agent.get_action(state)
            next_state, reward, done = env.step(action) 
            delta = agent.update(state, action, reward, next_state) 
        
            state = next_state
            total_reward += reward
            total_delta += abs(delta)
        
            if done:
                break
        epsilon.append(agent.epsilon)
        deltas.append(total_delta / num_episodes)
        agent.decay_epsilon()
        print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    return epsilon, deltas

env = Maze()
agent = ApproximateQLearningAgent(alpha = 0.1, epsilon = 1.0, discount = 0.99, num_features = 5,
                             get_legal_actions = lambda state: env.get_possible_actions(state[0]))

epsilon, deltas = train_agent(env, agent, num_episodes = 2000)

evaluation:

agent.turn_off_learning()

env = Maze()
for test_episode in range(5):
state = env.reset()
total_reward = 0
done = False

print(f"Epizod testowy {test_episode + 1}:")
while not done:

    action = agent.get_best_action(state)
    next_state, reward, done = env.step(action)
    total_reward += reward

    state = next_state
    env.render()
    pygame.time.delay(100)

print(f"Nagroda: {total_reward}")

env.close()

I have a problem with poor learning of an agent using the Q-function approximation algorithm. The agent stands in one place and does not make any movements. The state in my game is the player's position and the monster's position, i.e. ((x, y), (t, w)). The features I chose for approximation are the player's distance from the exit, the player's distance from the monster, the number of available actions, and a possible collision with the creature. The code with the game and the agent implementation is provided below - player - blue, monster - red, exit - green, start - yellow. Game looks like:

import pygame
import random
import numpy as np

ROWS, COLS = 11, 11
TILE_SIZE = 32
WIDTH, HEIGHT = COLS * TILE_SIZE, ROWS * TILE_SIZE
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)

maze = [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
   [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
   [1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1],
   [1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
   [1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1],
   [1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1],
   [1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
   [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]

class Monster:
def __init__(self, pos):
    self.pos = pos

def move(self, target_pos, maze, available_actions):
    row, col = self.pos
    target_row, target_col = target_pos

    if random.random() < 0.1:
        if available_actions:
            action = random.choice(available_actions)
            if action == 0:  # up
                row -= 1
            elif action == 1:  # down
                row += 1
            elif action == 2:  # left
                col -= 1
            elif action == 3:  # right
                col += 1
    else:
        if row < target_row and maze[row + 1][col] == 0:
            row += 1
        elif row > target_row and maze[row - 1][col] == 0:
            row -= 1
        elif col < target_col and maze[row][col + 1] == 0:
            col += 1
        elif col > target_col and maze[row][col - 1] == 0:
            col -= 1

    self.pos = (row, col)

class Maze:
def __init__(self, exit_pos = (ROWS - 2, COLS - 1), player_pos = (1, 0), monster_pos = (5, 4)):
    self.action_space = [0, 1, 2, 3]
    self.screen = None
    self.maze = maze
    self.exit_pos = exit_pos
    self.player_pos = player_pos
    self.monster = Monster(monster_pos)

def reset(self):
    self.player_pos = (1, 0)
    self.monster.pos = (5, 4)
    return self.get_actual_state()

def get_actual_state(self):
    return (self.player_pos, self.monster.pos)

def step(self, action):
    self.player_pos = self.move_player(self.player_pos, action)

    monster_actions = self.get_possible_actions(self.monster.pos)
    self.monster.move(self.player_pos, self.maze, monster_actions)

    done = False
    step_reward = -1.0
    
    if self.player_pos == self.monster.pos:
        print(f"Agent zginął po spotkaniu z potworem. Koniec gry!")
        step_reward = -100.0
        done = True
        
    elif self.player_pos == self.exit_pos:
        print(f"Wygrana!")
        step_reward = 1.0
        done = True
    
    return self.get_actual_state(), step_reward, done

def move_player(self, player_pos, action):
    row, col = player_pos
    if action == 0 and row > 0 and self.maze[row - 1][col] == 0:  # up
        row -= 1
    elif action == 1 and row < ROWS - 1 and self.maze[row + 1][col] == 0:  # down
        row += 1
    elif action == 2 and col > 0 and self.maze[row][col - 1] == 0:  # left
        col -= 1
    elif action == 3 and col < COLS - 1 and self.maze[row][col + 1] == 0:  # right
        col += 1
    return row, col

def render(self):
    if not self.screen:
        self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
    self.screen.fill(BLACK)
    self.draw_grid()
    pygame.display.flip()

def draw_grid(self):
    for row in range(ROWS):
        for col in range(COLS):
            # Kolor tła (białe dla ścieżek, czarne dla ścian)
            color = WHITE if self.maze[row][col] == 0 else BLACK
            pygame.draw.rect(self.screen, color, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE))
            pygame.draw.rect(self.screen, BLACK, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE), 1)

    # Rysowanie wyjścia (zielony)
    pygame.draw.rect(self.screen, (0, 255, 0), 
                 (self.exit_pos[1] * TILE_SIZE, self.exit_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

    # Rysowanie startu (żółty)
    pygame.draw.rect(self.screen, (255, 255, 0), 
                 (0 * TILE_SIZE, 1 * TILE_SIZE, TILE_SIZE, TILE_SIZE))

    # Rysowanie gracza (niebieski)
    pygame.draw.rect(self.screen, BLUE, 
                 (self.player_pos[1] * TILE_SIZE, self.player_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

    # Rysowanie potwora (czerwony)
    pygame.draw.rect(self.screen, (255, 0, 0), 
                 (self.monster.pos[1] * TILE_SIZE, self.monster.pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

     
def get_possible_actions(self, pos):
    row, col = pos
    actions = []
    if row > 0 and self.maze[row - 1][col] == 0:
        actions.append(0)  # up
    if row < ROWS - 1 and self.maze[row + 1][col] == 0:
        actions.append(1)  # down
    if col > 0 and self.maze[row][col - 1] == 0:
        actions.append(2)  # left
    if col < COLS - 1 and self.maze[row][col + 1] == 0:
        actions.append(3)  # right
    return actions

def close(self):
    pygame.quit()
    
def get_all_states(self):
    return [(row, col) for row in range(ROWS) for col in range(COLS) if self.maze[row][col] == 0]


class ApproximateQLearningAgent:
def __init__(self, alpha, epsilon, discount, num_features, get_legal_actions, min_epsilon = 0.05, epsilon_decay = 0.995):
    self.alpha = alpha  
    self.epsilon = epsilon  
    self.discount = discount 
    self.weights = np.random.uniform(-0.1, 0.1, (4, num_features))  
    self.min_epsilon = min_epsilon  
    self.epsilon_decay = epsilon_decay  
    self.get_legal_actions = get_legal_actions

def get_features(self, state):
    player_pos, monster_pos = state
    player_row, player_col = player_pos
    exit_row, exit_col = (ROWS - 2, COLS - 1)
    monster_row, monster_col = monster_pos

    bias = 1.0
    exit_dist = (abs(player_row - exit_row) + abs(player_col - exit_col)) / (ROWS + COLS - 2)
    monster_dist = (abs(player_row - monster_row) + abs(player_col - monster_col)) / (ROWS + COLS - 2)
    available_moves = len(self.get_legal_actions(state)) / 4.0  
    collision_risk = 1.0 if abs(player_row - monster_row) + abs(player_col - monster_col) <= 2 else 0.0

    return np.array([bias, exit_dist, monster_dist, available_moves, collision_risk])

def get_qvalue(self, state, action):
    features = self.get_features(state)
    q_value = np.dot(self.weights[action], features)
    return q_value

def get_value(self, state):
    possible_actions = self.get_legal_actions(state)
    if len(possible_actions) == 0:
        return 0.0
    
    return max(self.get_qvalue(state, action) for action in possible_actions)

def get_best_action(self, state):
    possible_actions = self.get_legal_actions(state)
    if len(possible_actions) == 0:
        return None
    
    qvalues = [self.get_qvalue(state, action) for action in possible_actions]
    max_qvalue = max(qvalues)
    best_actions = [action for action, qvalue in enumerate(qvalues) if qvalue == max_qvalue]
    return random.choice(best_actions)

def get_action(self, state):
    possible_actions = self.get_legal_actions(state)
    if len(possible_actions) == 0:
        return None

    if random.random() < self.epsilon:
        return random.choice(possible_actions)  
    return self.get_best_action(state)  

def update(self, state, action, reward, next_state):
    features = self.get_features(state)
    next_action = self.get_action(next_state)

    delta = (reward/100 + self.discount * self.get_qvalue(next_state, next_action)) - self.get_qvalue(state, action)

    self.weights[action] += self.alpha * delta * features
    return delta

def turn_off_learning(self):
    self.alpha = 0
    self.epsilon = 0

def decay_epsilon(self):
    self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

agent's learning:

def train_agent(env, agent, num_episodes):
    epsilon = []
    deltas = [] 
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        total_delta = 0
        done = False
    
        while not done:

            action = agent.get_action(state)
            next_state, reward, done = env.step(action) 
            delta = agent.update(state, action, reward, next_state) 
        
            state = next_state
            total_reward += reward
            total_delta += abs(delta)
        
            if done:
                break
        epsilon.append(agent.epsilon)
        deltas.append(total_delta / num_episodes)
        agent.decay_epsilon()
        print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    return epsilon, deltas

env = Maze()
agent = ApproximateQLearningAgent(alpha = 0.1, epsilon = 1.0, discount = 0.99, num_features = 5,
                             get_legal_actions = lambda state: env.get_possible_actions(state[0]))

epsilon, deltas = train_agent(env, agent, num_episodes = 2000)

evaluation:

agent.turn_off_learning()

env = Maze()
for test_episode in range(5):
state = env.reset()
total_reward = 0
done = False

print(f"Epizod testowy {test_episode + 1}:")
while not done:

    action = agent.get_best_action(state)
    next_state, reward, done = env.step(action)
    total_reward += reward

    state = next_state
    env.render()
    pygame.time.delay(100)

print(f"Nagroda: {total_reward}")

env.close()
Share Improve this question edited Jan 17 at 20:21 papierowka asked Jan 17 at 15:47 papierowkapapierowka 112 bronze badges 5
  • Hello. Is this really the whole code? Some things seem to be missing (such as the entry point of the code and ROWS and COLS and pygame import). It may not be related, or for some people it may be obvious how to run this code and what is ROWS and COLS, but I am just asking for completeness. Please try to post something that we can copy-paste and run (see minimal reproducible example for more information). – gthanop Commented Jan 17 at 16:03
  • In addition, the maze definition seems to be missing, as well as other variables (WIDTH, HEIGHT, TILE_SIZE, BLACK, WHITE, etc...). Again they may be not important for the learning of the agent (except maze), but the question is about teaching the agent, so the code for it should exist fully in order for others to test it. – gthanop Commented Jan 17 at 16:15
  • Thanks for the advice. I have already added the missing information. – papierowka Commented Jan 17 at 16:33
  • No problem. Thanks for providing the information. I have a question, just to be sure, when maze[row][col] is equal to 0 then the maze has a passage there (ie the player and monster can move to it), otherwise it's a wall... Is that correct? – gthanop Commented Jan 17 at 16:46
  • Yes, that's right. – papierowka Commented Jan 17 at 16:58
Add a comment  | 

1 Answer 1

Reset to default 0

Some bugs:

1. Enumerating of actions in get_best_action:

Inside get_best_action method there are the following lines of code:

qvalues = [self.get_qvalue(state, action) for action in possible_actions]
max_qvalue = max(qvalues)
best_actions = [action for action, qvalue in enumerate(qvalues) if qvalue == max_qvalue]
return random.choice(best_actions)

possible_actions is a list of numbers, each one being an action from those currently available. The latter means that the list does not always contain all the actions necessarily of course. Later you enumerate the qvalues and assume that the index of the current qvalue is the action, which is the cause of the bug.

Let's say for example possible_actions is only actions [0, 3]. Then we would calculate their q values resulting in a list with two elements, say [q_value_for_action_0, q_value_for_action_3] (I am just giving names to the q values for simplicity). Lastly, upon enumerateing the qvalues we get two pairs in format action, qvalue, but since we use enumerate then the actions will always be 0, 1, 2, etc... But the qvalue will in this case be q_value_for_action_0 and then q_value_for_action_3. The problem is that q_value_for_action_3 would be matched against action with index 1 (as per the enumerate operation), but it is action 3!

To fix this, just replace enumerate(qvalues) with zip(possible_actions, qvalues) inside the method.

2. next_action when updating the agent:

When updating the agent (ie inside update method) you calculate the next action with:

next_action = self.get_action(next_state)

The thing that is wrong with this is that you are calculating the action with get_action which may yield a random action (depending on epsilon) for the state's estimation. Instead, according to the algorithm, we need the maximum Q-value between all legal actions for that state. Which in turn is the same (as far as I understand) with the Q-value of the best action for that state. So you can fix this by either using self.get_value(next_state) (ie the next state's estimated value) directly in the temporal difference target calculation, or by using get_best_action instead of get_action.

3. Edge case for terminating state.

There is an important detail when calculating the temporal difference (inside update) for the terminal state:

... we do not need to estimate the value of the next state because there is no next state for a terminal state.

In the code in question, there is no special handling for this case. To fix this, multiply the next state's value estimate by 1 - int(done), where done is the True when we are at a terminal state, otherwise False.

Incentives:

Just changed the reward for the successful termination from 1 to 100. That's because with just 1 for it, along with -1 for each non-terminal step, the reward of 1 just invalidates the last non-terminal step. This may not be important at all, but it was part of my most promising tests, so I left it as such.

Hyperparameter tuning:

I increased the number of episodes, because 2k seemed like never enough for the player to reach the exit. After fixing the above bugs and setting episodes to 20k I started seeing the player exploring greater portions of the map, with better distribution in visited locations, and actually making it to the exit for about 4600 (~23%) of the episodes. At the same time of course I had also configured epsilon_decay to 0.999 in order to reach the minimum epsilon within the first about 4.5k episodes, since with the original 0.995 it was decaying too fast and I thought this would harm exploration. I also used half of the learning rate (ie alpha = 0.05) because 0.1 seems a bit too big, taking into account that greater portion of the episodes has now a greater epsilon (ie more random moves initially). Finally I changed discount and minimum_epsilon (example values in the following code).

I understand though that we have to fix bugs first and then go to hyperparameter tuning, so I am not blaming you for anything here of course, I am just saying how I started to observe desired results.

Code:

import pygame
import random
import numpy as np

ROWS, COLS = 11, 11
TILE_SIZE = 32
WIDTH, HEIGHT = COLS * TILE_SIZE, ROWS * TILE_SIZE
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
BLUE = (0, 0, 255)

maze = [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
        [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
        [1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1],
        [1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1],
        [1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1],
        [1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]

# Utility method to print a map like 'maze' variable:
def print_map(the_map, repr_per_element={}):
    for row in the_map:
        for element in row:
            print(repr_per_element.get(element, f'{element:8d}'), end='')
        print()

# Some statistics about each entity (player/monster). These only serve to
# get an idea on how each entity moves and they do not affect learning.
class EntityStatistics:

    def __init__(self, name, the_map = maze):
        self.name = name
        self.reset(the_map)

    # [Re]Initialize the entity's statistics:
    def reset(self, the_map = None):
        if the_map is None:
            the_map = self.maze_visit_frequency # Only to maintain structure.
        self.maze_visit_frequency = [[0] * len(row) for row in the_map]
        selfpletion_count = 0

    # Called for each position the entity visits, for each time it does.
    # This will be called for the same position more than once in a row in case the entity failed to move at all.
    def visited(self, row, col):
        self.maze_visit_frequency[row][col] += 1

    # Called when the entity succeeds its goal:
    def completed(self):
        selfpletion_count += 1

    # Prints results for the entity:
    def print(self):
        print(f'{self.name} completed {selfpletion_count} time(s) with the following heatmap:')
        print_map(self.maze_visit_frequency)

# This represents the player just like Monster does for the monster.
# Nevertheless the representation is only partial, since there are more
# properties to add here (such as player's position in the map).
class Player:
    def __init__(self):
        self.stats = EntityStatistics('Player') # Holds only the statistics currently.

class Monster:
    def __init__(self, pos):
        self.pos = pos
        self.stats = EntityStatistics('Monster') # Added monster statistics.

    def move(self, target_pos, maze, available_actions):
        row, col = self.pos
        target_row, target_col = target_pos

        if random.random() < 0.1:
            if available_actions:
                action = random.choice(available_actions)
                if action == 0:  # up
                    row -= 1
                elif action == 1:  # down
                    row += 1
                elif action == 2:  # left
                    col -= 1
                elif action == 3:  # right
                    col += 1
        else:
            if row < target_row and maze[row + 1][col] == 0:
                row += 1
            elif row > target_row and maze[row - 1][col] == 0:
                row -= 1
            elif col < target_col and maze[row][col + 1] == 0:
                col += 1
            elif col > target_col and maze[row][col - 1] == 0:
                col -= 1

        self.pos = (row, col)

class Maze:
    def __init__(self, exit_pos = (ROWS - 2, COLS - 1), player_pos = (1, 0), monster_pos = (5, 4)):
        self.action_space = [0, 1, 2, 3]
        self.screen = None
        self.maze = maze
        self.exit_pos = exit_pos
        self.player_pos = player_pos
        self.monster = Monster(monster_pos)
        self.player = Player() # Only for statistics currently.

    def reset(self):
        self.player_pos = (1, 0)
        self.monster.pos = (5, 4)

        # Update statistics accordingly:
        #self.monster.stats.reset() # Use this if you need to reset the statistics per episode.
        #self.player.stats.reset() # Use this if you need to reset the statistics per episode.
        self.monster.stats.visited(*self.monster.pos) # Monster 'visited' its initial position.
        self.player.stats.visited(*self.player_pos) # Player 'visited' their initial position.

        return self.get_actual_state()

    def get_actual_state(self):
        return (self.player_pos, self.monster.pos)

    def step(self, action):
        self.player_pos = self.move_player(self.player_pos, action)
        self.player.stats.visited(*self.player_pos) # Player visited a new (or the same again) position.

        monster_actions = self.get_possible_actions(self.monster.pos)
        self.monster.move(self.player_pos, self.maze, monster_actions)
        self.monster.stats.visited(*self.monster.pos) # Monster visited a new (or the same again) position.

        done = False
        step_reward = -1.0
        
        if self.player_pos == self.monster.pos:
            # print(f"Agent zginął po spotkaniu z potworem. Koniec gry!")
            step_reward = -100.0
            done = True
            self.monster.statspleted() # The monster succeeded.
            
        elif self.player_pos == self.exit_pos:
            # print(f"Wygrana!")
            step_reward = 100.0 # Changed reward.
            done = True
            self.player.statspleted() # The player succeeded.
        
        return self.get_actual_state(), step_reward, done

    def move_player(self, player_pos, action):
        row, col = player_pos
        if action == 0 and row > 0 and self.maze[row - 1][col] == 0:  # up
            row -= 1
        elif action == 1 and row < ROWS - 1 and self.maze[row + 1][col] == 0:  # down
            row += 1
        elif action == 2 and col > 0 and self.maze[row][col - 1] == 0:  # left
            col -= 1
        elif action == 3 and col < COLS - 1 and self.maze[row][col + 1] == 0:  # right
            col += 1
        return row, col

    def render(self):
        if not self.screen:
            self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
        self.screen.fill(BLACK)
        self.draw_grid()
        pygame.display.flip()

    def draw_grid(self):
        for row in range(ROWS):
            for col in range(COLS):
                # Kolor tła (białe dla ścieżek, czarne dla ścian)
                color = WHITE if self.maze[row][col] == 0 else BLACK
                pygame.draw.rect(self.screen, color, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE))
                pygame.draw.rect(self.screen, BLACK, (col * TILE_SIZE, row * TILE_SIZE, TILE_SIZE, TILE_SIZE), 1)

        # Rysowanie wyjścia (zielony)
        pygame.draw.rect(self.screen, (0, 255, 0), 
                         (self.exit_pos[1] * TILE_SIZE, self.exit_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

        # Rysowanie startu (żółty)
        pygame.draw.rect(self.screen, (255, 255, 0), 
                         (0 * TILE_SIZE, 1 * TILE_SIZE, TILE_SIZE, TILE_SIZE))

        # Rysowanie gracza (niebieski)
        pygame.draw.rect(self.screen, BLUE, 
                         (self.player_pos[1] * TILE_SIZE, self.player_pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

        # Rysowanie potwora (czerwony)
        pygame.draw.rect(self.screen, (255, 0, 0), 
                         (self.monster.pos[1] * TILE_SIZE, self.monster.pos[0] * TILE_SIZE, TILE_SIZE, TILE_SIZE))

         
    def get_possible_actions(self, pos):
        row, col = pos
        actions = []
        if row > 0 and self.maze[row - 1][col] == 0:
            actions.append(0)  # up
        if row < ROWS - 1 and self.maze[row + 1][col] == 0:
            actions.append(1)  # down
        if col > 0 and self.maze[row][col - 1] == 0:
            actions.append(2)  # left
        if col < COLS - 1 and self.maze[row][col + 1] == 0:
            actions.append(3)  # right
        return actions

    def close(self):
        pygame.quit()
        
    def get_all_states(self):
        return [(row, col) for row in range(ROWS) for col in range(COLS) if self.maze[row][col] == 0]


class ApproximateQLearningAgent:
    def __init__(self, alpha, epsilon, discount, num_features, get_legal_actions, min_epsilon = 0.05, epsilon_decay = 0.995):
        self.alpha = alpha  
        self.epsilon = epsilon  
        self.discount = discount
        self.weights = np.random.uniform(-0.1, 0.1, (4, num_features))
        self.min_epsilon = min_epsilon  
        self.epsilon_decay = epsilon_decay  
        self.get_legal_actions = get_legal_actions

    def get_features(self, state):
        player_pos, monster_pos = state
        player_row, player_col = player_pos
        exit_row, exit_col = (ROWS - 2, COLS - 1)
        monster_row, monster_col = monster_pos

        bias = 1.0
        exit_dist = (abs(player_row - exit_row) + abs(player_col - exit_col)) / (ROWS + COLS - 2)
        monster_dist = (abs(player_row - monster_row) + abs(player_col - monster_col)) / (ROWS + COLS - 2)
        available_moves = len(self.get_legal_actions(state)) / 4.0  
        collision_risk = 1.0 if abs(player_row - monster_row) + abs(player_col - monster_col) <= 2 else 0.0

        return np.array([bias, exit_dist, monster_dist, available_moves, collision_risk])

    def get_qvalue(self, state, action):
        features = self.get_features(state)
        q_value = np.dot(self.weights[action], features)
        return q_value

    def get_value(self, state):
        possible_actions = self.get_legal_actions(state)
        if len(possible_actions) == 0:
            return 0.0
        
        return max(self.get_qvalue(state, action) for action in possible_actions)

    def get_best_actions_with_loop(self, state, possible_actions):
        best_actions = []
        max_q = None
        for action in possible_actions:
            cur_q = self.get_qvalue(state, action)
            if max_q is None or cur_q > max_q:
                max_q = cur_q
                best_actions = [action]
            elif cur_q == max_q:
                best_actions.append(action)
        return best_actions

    def get_best_action(self, state, possible_actions=None):
        if possible_actions is None: # Minor optimization (if possible_actions was already calculated, don't calculate it again).
            possible_actions = self.get_legal_actions(state)
            if len(possible_actions) == 0:
                return None

        # Fixed enumerating of actions with a loop.
        best_actions = self.get_best_actions_with_loop(state, possible_actions)
        return random.choice(best_actions)

    def get_action(self, state):
        possible_actions = self.get_legal_actions(state)
        if len(possible_actions) == 0:
            return None

        if random.random() < self.epsilon:
            return random.choice(possible_actions)  
        return self.get_best_action(state, possible_actions)  

    # Note: method signature changed ('done' is mandatory).
    def update(self, state, action, reward, done, next_state, next_action):
        features = self.get_features(state)

        # 'next_action' is the action that the agent will indeed take, but
        # we instead need the best one for calculating delta in Q-learning:
        next_action = self.get_best_action(next_state)

        done = int(done) # Convert True and False to 1 and 0.

        # Fixed delta calculation (the middle term is zeroed when done is True):
        delta = (reward/100) + (self.discount * self.get_qvalue(next_state, next_action) * (1 - done)) - self.get_qvalue(state, action)
        # otherwise use:
        #delta = (reward/100) + (self.discount * self.get_value(next_state) * (1 - done)) - self.get_qvalue(state, action)

        self.weights[action] += self.alpha * delta * features
        return delta

    def turn_off_learning(self):
        self.alpha = 0
        self.epsilon = 0

    def decay_epsilon(self):
        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

def train_agent(env, agent, num_episodes):
    epsilon = []
    deltas = []
    total_rewards = [] # We will also maintain total_rewards for statistics purposes only.
    for episode in range(num_episodes):

        total_reward = 0
        total_delta = 0
        done = False

        state = env.reset()
        action = agent.get_action(state)

        # Training loop is modified (next_action is the actual action the agent is going to take)...
        while not done:

            next_state, reward, done = env.step(action)
            next_action = agent.get_action(next_state)

            delta = agent.update(state, action, reward, done, next_state, next_action) 

            state = next_state
            action = next_action

            total_reward += reward
            total_delta += abs(delta)

        total_rewards.append(total_reward)
        epsilon.append(agent.epsilon)
        deltas.append(total_delta / num_episodes)
        agent.decay_epsilon()

        # Debug prints every 2k episodes.
        if (episode % 2000) == 0:
            avg_of_total = round(sum(total_rewards) / len(total_rewards), 2)
            print(f"Episode {episode + 1}: Average Total Reward so far = {avg_of_total}")

    return epsilon, deltas

if __name__ == '__main__':
    env = Maze()
    agent = ApproximateQLearningAgent(
        # Hyperparameter tuning is modified.
        alpha = 0.05, epsilon = 1.0, discount = 0.9995, num_features = 5,
        epsilon_decay = 0.999, min_epsilon = 0.01,
        get_legal_actions = lambda state: env.get_possible_actions(state[0])
    )

    # Training:

    epsilon, deltas = train_agent(env, agent, num_episodes = 20000)

    # Print training statistics:

    print()
    print('Given the map:')
    print_map(maze, {
        0: ' ' * 8,
        1: u'\u2588' * 8
    })
    print()
    env.monster.stats.print()
    print()
    env.player.stats.print()
    print()

    cnt = 0
    for e in epsilon:
        if e > agent.min_epsilon:
            #print(f'Epsilon: {e}')
            cnt += 1
    print(f'{cnt} total epsilons greater than minimum.')

    # Evaluation:

    agent.turn_off_learning()

    env = Maze()
    for test_episode in range(5):
        state = env.reset()
        total_reward = 0
        done = False

        print(f"Episode {test_episode + 1}:")
        while not done:

            action = agent.get_best_action(state)
            next_state, reward, done = env.step(action)
            total_reward += reward

            state = next_state
            env.render()
            pygame.time.delay(100)

        print(f"Reward: {total_reward}")

    env.close()

To clarify, I am having some issues with my GPU drivers, so I can't run at the moment the GUI evaluation. So I used another (very simple) means of verifying progress: I just kept a counter for each visited location by each entity (monster/player) and how many times the agent found the exit or the monster, which was helpful, so the code includes these debuging aids also. I judged that there is progress because reaching the exit for ~23% of the 20k episodes seems already a lot better than not reaching at all (which was the case initially).

Sample output:

pygame 2.6.1 (SDL 2.28.4, Python 3.13.0)
Hello from the pygame community. https://www.pygame./contribute.html
Episode 1: Average Total Reward so far = -104.0
Episode 2001: Average Total Reward so far = -103.48
Episode 4001: Average Total Reward so far = -89.36
Episode 6001: Average Total Reward so far = -84.3
Episode 8001: Average Total Reward so far = -81.8
Episode 10001: Average Total Reward so far = -80.87
Episode 12001: Average Total Reward so far = -78.77
Episode 14001: Average Total Reward so far = -78.13
Episode 16001: Average Total Reward so far = -78.46
Episode 18001: Average Total Reward so far = -77.94

Given the map:
████████████████████████████████████████████████████████████████████████████████████████
                                        ████████                                ████████
████████        ████████        ████████████████████████        ████████        ████████
████████                                                        ████████        ████████
████████        ████████████████████████        ████████████████████████        ████████
████████                                                                        ████████
████████        ████████        ████████████████████████████████████████        ████████
████████                                        ████████                        ████████
████████████████        ████████████████        ████████        ████████        ████████
████████                                                        ████████                
████████████████████████████████████████████████████████████████████████████████████████

Monster completed 15136 time(s) with the following heatmap:
       0       0       0       0       0       0       0       0       0       0       0
     103    3220    2630    7513     206       0     973    6285     396     367       0
       0    3553       0    6111       0       0       0   12713       0     456       0
       0    4335     863    6929    6286   16947   12548   54903       0     440       0
       0    4252       0       0       0   16294       0       0       0     426       0
       0    4443   24014  165477  120225   18425    2990   40573    2439     705       0
       0     163       0    5781       0       0       0       0       0     461       0
       0      18      16     339      46     162       0       0       5     347       0
       0       0       6       0       0      74       0       0       0     277       0
       0       0       5       3       7      49       4       1       0     103       0
       0       0       0       0       0       0       0       0       0       0       0

Player completed 4864 time(s) with the following heatmap:
       0       0       0       0       0       0       0       0       0       0       0
   22587   24981   21048  136097  111211       0      50    5886    5205    6647       0
       0    2442       0   24277       0       0       0   34581       0    6510       0
       0    1091     798   20098   11986   12375   10336   38767       0    4800       0
       0     168       0       0       0    2027       0       0       0    4684       0
       0      37       3       0       8    1302     322     299     333    5018       0
       0      17       0       0       0       0       0       0       0    4969       0
       0      15      19       7       0       0       0      18      46    5565       0
       0       0      12       0       0       1       0     204       0   12295       0
       0       1       9       6       5       6       6     192       0   11676    4864
       0       0       0       0       0       0       0       0       0       0       0

4603 total epsilons greater than minimum.

... and then the crash message for the GPU driver appeared which I did not include above.

Note:

I also tried with traditional tabular Q-learning on your environment and got the player to reach the exit for more than 10k (>50%) of the episodes at times. Specifically, I commented out get_features, updated the get_qvalue method to read the Q matrix and updated the update method with the Bellman equation and storing the new Q values back in their matrix. This however indicates that there certainly is a lot room for improvement. If no other bugs can be found, then there is always the case that the features and/or hypereparameter tuning may require improvements. But, this is how far I am able to progress at this point.

发布评论

评论列表(0)

  1. 暂无评论