Akshay Ballal

Posted on

# Reinforcement Learning from Scratch - Part 2 - Deep Q Learning

Hey everyone. This is the second part of my Reinforcement Learning Series, where we look at the different RL algorithms and discuss their implementation details. In the last part, we saw a basic RL algorithm that uses tabular Q learning. One disadvantage of that technique was that it required us to discretize the observation and action spaces. This increases the dimensionality of the observation space quite a bit and makes learning hard and slower.

To overcome this problem, at least partially, in this part, we look at Deep Q Learning (DQN), a prominent technique that brought Reinforcement learning into the mainstream. We can use the continuous observation space in this technique but still need a discrete action space. First, we will put together a simple DQN algorithm and then see how we can use LSTM architecture as the network in DQN. Let’s dive in.

## DQN Theory

If you are not interested in the theory of DQN, that is fine. The code implementation below explains a lot. However, I am including some of the main driving equations below for completeness.

From the previous part, we know that the q value of a state action pair is the expected total return from that state if a certain action is taken. Using the Bellman Equation, we can write it out as:

If we are following the policy π we can say that:

From this, we can say that the q-value of a state action pair is equal to the average (expectation) of the sum of reward and γ times the maximum q-value of the next state. This is called bootstrapping

For example, if we have a transition (s, 2, r, s’), we take action 2. Then, we do the following.

1. Pass the state through the q-network and get the q-values of all the actions.
2. Pick the q-value of the action taken, which is 2 here.
3. Pass the next state s’ through the target q-network and get q-values of all the actions of s’.
4. Pick the maximum q-value.

We can then take a Mean Square Error Loss between the target and the prediction and perform backward propagation.

The below image can make this more clear

### The Algorithm

Below you can see the pseudocode for the Deep Q Learning. It looks more complex than it actually is. You can follow along with the code to get a better understanding.

## Code Implementation

We will again use the Pendulum environment we used in the last part. We will also log some metrics to TensorBoard to see how the learning is going.

You can check out the code at: GitHub Repo

``````import gymnasium as gym
import numpy as np
from tqdm import tqdm
import torch
from torch import nn
from copy import deepcopy
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F
``````

### Pendulum Environment Wrapper

We define the Gaussian Function that I introduced in Part 1, which creates the set of possible actions.

``````def gaussian(x, mu, sigma, max_amp):
return max_amp * np.exp(-0.5 * ((x - mu) / sigma)**2)
``````

Then, we create the wrapper. This is very similar, in fact more simpler than the previous wrapper because we can use the observation space from the Pendulum Environment as it is and don't have to do any discretization.

``````class PendulumDiscreteStateAction(gym.Wrapper):

def __init__(self, env: gym.Env, nvec_u: int, sigma: float):

super(PendulumDiscreteStateAction, self).__init__(env)

self.env = env
self.nvec_u = nvec_u

# Create a Discrete action space
self.action_space = gym.spaces.Discrete(nvec_u)

kernel = gaussian(np.linspace(0, 1, nvec_u//2), 0, sigma,
self.env.action_space.high[0])
self.actions = (-kernel).tolist() + [0] + np.flip(kernel).tolist()

def step(self, action: int) -> tuple[np.ndarray[float], float, bool, dict]:

action = self.actions[action]
obs, reward, terminated, truncated, info = self.env.step([action])
reward = reward/16.2736044 # normalize the reward between -1 and 1
obs: np.ndarray[float] = obs/self.env.observation_space.high # normalize the observation between -1 and 0
return obs, reward, terminated, truncated, info

def reset(self) -> tuple[np.ndarray[float], dict]:
"""
Resets the environment.

Returns:
- The initial discrete observation and additional information.
"""
obs, info = self.env.reset()
obs: np.ndarray[float] = obs/self.env.observation_space.high
return obs, info
``````

Notable Implementation Details

• Reward Scaling: The rewards were scaled using MinMax Scaler to make the learning smoother. This was done by dividing the returned reward by the minimum possible reward: rscaled = r/rmin. The rmin is defined in the documentation of the pendulum environment on gymnasium. The intuition behind this is that the network does not have to output significantly big q values, and the weights can remain smaller, making learning smooth
• Scaled Observations: The observation space is scaled by dividing the maximum value of the observation space.

### Define the Q-Network

Here we define a simple Fully Connected Network which maps the state to the q-values of all possible actions.

``````class QNetwork(nn.Module):
def __init__(self, nvec_s, nvec_u):
super(QNetwork, self).__init__()

self.fc1 = nn.Linear(nvec_s, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, nvec_u)

def forward(self, x:torch.Tensor) -> torch.Tensor:
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
return x
``````

### Create ReplayMemory Class

``````class ReplayMemory:
def __init__(self, capacity, env: gym.Env, device: torch.device):

self.position = 0
self.size = 0
self.capacity = capacity
self.device = device

self.n_states = env.observation_space.shape  # Number of dimensions in the state space
self.n_actions = env.action_space.n  # Number of discrete actions

# Initialize arrays to store the replay memory
self.states = np.zeros((capacity, *self.n_states))
self.actions = np.zeros((capacity))
self.rewards = np.zeros(capacity)
self.next_states = np.zeros((capacity, *self.n_states))
self.terminated = np.zeros(capacity)
self.truncated = np.zeros(capacity)

def push(self, state:np.ndarray, action:int, next_state:np.ndarray, reward:float, terminated: bool, truncated:bool):

self.states[self.position] = state.flatten()
self.actions[self.position] = action
self.next_states[self.position] = next_state.flatten()
self.rewards[self.position] = reward
self.terminated[self.position] = terminated
self.truncated[self.position] = truncated

self.position = (self.position + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)

def sample(self, batch_size):

indices = np.random.choice(self.size, batch_size, replace=False)

states = torch.tensor(self.states[indices], dtype = torch.float32, device=self.device)
actions = torch.tensor(self.actions[indices], dtype = torch.int64, device=self.device)
next_states = torch.tensor(self.next_states[indices], dtype = torch.float32, device=self.device)
rewards = torch.tensor(self.rewards[indices], dtype = torch.float32, device=self.device)
terminated = torch.tensor(self.terminated[indices], dtype = torch.float32, device=self.device)
truncated = torch.tensor(self.truncated[indices], dtype = torch.float32, device=self.device)

return states, actions, next_states, rewards, terminated, truncated

def __len__(self):
return len(self.size)
``````

### Create Agent Class

Initialization and some helper functions

Here we define the agent initialization function that initiates the various variables and hyperparameters. We also have a `get_action` function that takes random action with a probability of ϵ and otherwise takes an action by passing the state through the q-network and taking argmax i.e selecting action with the highest q-value.

``````class Agent:
def __init__(
self,
env: gym.Env,
gamma=0.99,
alpha=0.0003,
initial_epsilon=1,
min_epsilon=0.1,
decay_rate=0.9999,
batch_size=64,
n_rollouts=2000,
capacity=100000,
device: torch.device = torch.device("cpu"),
):
self.env = env  # Environment
self.device = device  # Computation device (CPU or GPU)
self.gamma = gamma  # Discount factor
self.alpha = alpha  # Learning rate
self.epsilon = initial_epsilon  # Initial epsilon value for exploration
self.batch_size = batch_size  # Batch size for training
self.n_rollouts = n_rollouts  # Number of rollouts to collect

self.epsilon = 1  # Initial epsilon value
self.min_epsilon = min_epsilon  # Minimum epsilon value
self.decay_rate = decay_rate  # Epsilon decay rate

# Replay memory to store experiences
self.replay_memory = ReplayMemory(capacity, env, device)
# Q-network and target network for Q-learning
self.q_network = QNetwork(
env.observation_space.shape[0], env.action_space.n
).to(device)
self.target_network = deepcopy(self.q_network)
# Optimizer for Q-network

# Number of dimensions in the state space
self.n_states = env.observation_space.shape[0]

# For metrics
self.n_time_steps = 0  # Number of time steps
self.episodes = 0  # Number of episodes
self.best_reward = -np.inf  # Best reward seen so far

def get_action(self, obs, greedy=False):

if not greedy and np.random.rand() < self.epsilon:  # Epsilon-greedy exploration
return np.random.randint(self.env.action_space.n)  # Random action
obs = torch.tensor(obs, dtype=torch.float32, device=self.device).unsqueeze(0)  # Convert observation to tensor
self.q_network.eval()  # Set Q-network to evaluation mode
q_values: torch.Tensor = self.q_network(obs)  # Get Q-values for the observation
return q_values.argmax().item()  # Return action with highest Q-value

def sample_experience(self):

return self.replay_memory.sample(self.batch_size)  # Sample experiences from replay memory

def update_target(self):

``````

Collect Rollouts:

The `collect_rollouts` function collects the transitions by simulating the environment by using the epsilon-greedy policy that we have defined in the `get_actions` function. Each transition is stored in the replay memory and we decay the epsilon to reduce exploration.

``````def collect_rollouts(self):
obs, info = self.env.reset()  # Reset environment
terminated = False
truncated = False
rewards = 0  # Total rewards
episodes = 0  # Total episodes
for _ in range(self.n_rollouts):
action = self.get_action(obs, greedy=False)  # Get action
next_obs, reward, terminated, truncated, _ = self.env.step(action)  # Step environment
self.replay_memory.push(
obs, action, next_obs, reward, terminated, truncated
)  # Save the transition
obs = next_obs  # Update observation
rewards += reward  # Accumulate reward
self.n_time_steps += 1  # Increment time steps
if terminated or truncated:  # Check if episode ended
episodes += 1
self.episodes += 1
obs, info = self.env.reset()  # Reset environment

self.epsilon = max(
self.min_epsilon, self.decay_rate * self.epsilon
)  # Decrease epsilon

return rewards / episodes  # Return the average reward per episode
``````

Learn

The `learn` function samples the experiences of `batch_size` from the replay memory, predicts the q-values and gets the target values. We then get the loss and perform backward propagation. We do this for the defined number of `epochs`.

``````def learn(self, epochs):

self.q_network.train()  # Set Q-network to training mode

average_loss = 0
for i in range(epochs):
obs, action, next_obs, reward, terminated, truncated = (
self.sample_experience()
)  # Sample a batch of experiences
q_values: torch.Tensor = self.q_network(obs)  # Get Q-values for the batch
next_q_values = self.target_network(next_obs)  # Get Q-values for the next states

q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)  # Gather Q-values for taken actions
next_q_value = next_q_values.max(1).values  # Get max Q-value for next states
target = reward + self.gamma * next_q_value * (1 - terminated) * (
1 - truncated
)  # Compute target Q-value

loss = F.smooth_l1_loss(q_value, target)  # Compute loss

loss.backward()  # Backpropagate
self.optimizer.step()  # Update Q-network

average_loss += (loss.item() - average_loss) / (i + 1)  # Update average loss
self.n_updates += 1  # Increment update count

if self.n_updates % 1000 == 0:  # Update target network periodically
self.update_target()

return average_loss  # Return average loss
``````

Some Implementation Details

• Smooth L1 Loss: This was inspired by the Stable Baselines implementation of DQN. Smooth L1 loss prevents exploding gradients due to outliers by transitioning from L2 loss to L1 loss. This is useful in off-policy algorithms because when transitions are sampled from the Replay Buffer, they can belong to an old, much different policy. It is not desirable to fit to these ”outliers.”
• Gradient Clipping: Before performing the optimizer step, the L2 norm of the gradients is clipped to 10 to counteract the influence of outliers further. This prevents exploding gradients by keeping all gradient norms below 10.

Evaluate the current Policy

According to Zhenyi Wang, in many cases, DQN starts to forget and then relearn again once it has reached near the maximum reward. This can also be seen in the below figure, where the agent sometimes drops to a low reward. This can happen because even when the agent has achieved an optimal greedy policy, the rollout still happens with ϵ − greedy policy. This can cause the agent to suddenly take sub-optimal action, which can cause a large change in the network and cause the agent to unlearn. This can also be sometimes beneficial for making the agent more robust to perturbance. Nevertheless, the policy is evaluated after every learning cycle, and the best model is saved. This makes sure that, in the end, the best-performing model is retrievable.

``````def evaluate(self, n_steps):
self.q_network.eval()  # Set Q-network to evaluation mode
rewards = 0  # Total rewards
episodes = 0  # Total episodes
obs, info = self.env.reset()  # Reset environment
for _ in range(n_steps):
action = self.get_action(obs, greedy=True)  # Get action
obs, reward, terminated, truncated, _ = self.env.step(action)  # Step environment
rewards += reward  # Accumulate reward

if terminated or truncated:  # Check if episode ended
episodes += 1
self.env.reset()  # Reset environment

rewards /= episodes  # Compute average reward per episode
if rewards > self.best_reward:  # Save best model if improved
self.best_reward = rewards
torch.save(self.q_network.state_dict(), "dqn_best_model.pth")
print("New best model saved!")
return rewards
``````

Train

Finally we have the `train` function that runs the `collect_rollouts`, `learn` and `evaluate` functions in a loop for a number of `epochs`.

`````` def train(self, epochs):
self.writer = SummaryWriter(log_dir="dqn_logs/DQN_2")  # TensorBoard writer

pbar = tqdm(range(epochs))  # Progress bar
for i in pbar:
rewards = self.collect_rollouts()  # Collect rollouts
loss = self.learn(int(self.n_rollouts/2))  # Perform learning
eval_reward = self.evaluate(1000)  # Evaluate agent

pbar.set_description(
f"Iteration {i+1} || Reward: {rewards:.3f} || Eval Reward: {eval_reward :.3f} || Loss: {loss:.3f} || Epsilon: {self.epsilon:.2f} || Time steps: {self.n_time_steps} || N updates: {self.n_updates}"
)  # Update progress bar
"Training/Rollout: Mean Episode Reward", rewards, self.n_updates
)  # Log mean episode reward during training
)  # Log mean episode reward during evaluation
``````

Hyperparameters:

• For learning and exploration:`gamma`, `alpha`, `initial_epsilon`, `min_epsilon`, `decay_rate`:
• For experience replay and training: `batch_size`, `n_rollouts`, `capacity`

### Start Training

``````env = gym.make("Pendulum-v1")
env._max_episode_steps = 500
env = PendulumDiscreteStateAction(env, 11, 0.4)

agent = Agent(env)
agent.train(30)
``````

### Testing

``````import matplotlib.pyplot as plt

env = gym.make("Pendulum-v1",
#    render_mode='human'
)
env._max_episode_steps = 500
env = PendulumDiscreteStateAction(env, 11, 0.4)

agent = Agent(env)

rewards = []
thetas = []
tot_reward = 0
n_episodes = 100
n_steps= 0
for _ in range(n_episodes):
obs, info = env.reset()
terminated = False
truncated = False
while not terminated and not truncated:
action = agent.get_action(obs, greedy = True)
# print(action)
obs, reward, terminated,  truncated, info = env.step(action)
x = obs[0] * env.observation_space.high[0]
y = obs[1] * env.observation_space.high[1]
theta = np.arctan2(y, x)
thetas.append(theta)
rewards.append(reward)
tot_reward += reward
n_steps+=1
# env.render()
print("Total reward: ", tot_reward/n_episodes)
``````

### Results

Below, we can see the results of our trained agent. The agent can reach an angle of zero no matter where it starts. Also, the reward is zero.

The average episodic reward across 100 episodes is 9.8

## LSTM Implementation

The transitions we collect are temporal. That is, there is a correlation between nearby transitions. To capture this temporal relation, we can change the state of the environment to be a history of past (state, action) pairs. So we need to add the previous action to the observation and maintain a list of `Nhist` number of previous observations. Let’s see if this improves the agent since now we have more information and a better network.

In the figure below, you can see the LSTM network architecture. There are several ways to implement this. Apart from this, another option is to take a linear output from the last hidden layer directly. But let’s proceed with this for now.

``````# Qnetwork with LSTM
class QNetwork(nn.Module):
def __init__(self, nvec_s, nvec_u):
super(QNetwork, self).__init__()

self.lstm = nn.LSTM(nvec_s, 64, 1, batch_first=True)
self.fc1 = nn.Linear(64, 64)
self.fc2 = nn.Linear(64, nvec_u)

def forward(self, x:torch.Tensor) -> torch.Tensor:
x, _ = self.lstm(x)
x = F.relu(self.fc1(x))
x = torch.mean(x, dim=1)
x = self.fc2(x)
return x
``````

Also, we need to change the wrapper to get the history of observations.

``````from collections import deque

class PendulumDiscreteStateAction(gym.Wrapper):

def __init__(self, env: gym.Env, nvec_u: int, sigma: float, nhist: int):

super(PendulumDiscreteStateAction, self).__init__(env)

self.env = env
self.nvec_u = nvec_u

# Check if the observation space is of type Box
assert isinstance(
env.observation_space, gym.spaces.Box
), "Error: observation space is not of type Box"

# Create a Discrete action space
self.action_space = gym.spaces.Discrete(nvec_u)

# Define the possible actions
kernel:np.ndarray = gaussian(np.linspace(0, 1, 5), 0, sigma, 2)
self.actions = (-kernel).tolist() + [0] + np.flip(kernel).tolist()

low = []
for _ in range(nhist):
temp_low = []
for value in self.env.observation_space.low:
temp_low.append(value)
temp_low.append(min(self.actions))
low.append(temp_low)

low = np.array([list(self.env.observation_space.low) + [min(self.actions)] for _ in range(nhist)])
high = np.array([list(self.env.observation_space.high) + [max(self.actions)] for _ in range(nhist)])

self.observation_space = gym.spaces.Box(
low=low,
high=high,
dtype=np.float32
)

self.prev_action = None
self.history = deque(maxlen = nhist)

# Initialize the history with zeros
for i in range(nhist):
self.history.append(np.array([0, 0, 0]))

def step(self, action: int) -> tuple[np.ndarray[float], float, bool, dict]:

action = self.actions[action]
obs, reward, terminated, truncated, info = self.env.step([action])
reward = reward/16.2736044 # normalize the reward between -1 and 1
obs: np.ndarray[float] = obs/self.env.observation_space.high # normalize the observation between -1 and 0
obs=np.append(obs, action/2)
self.history.append(obs)
obs = np.array(list(self.history))
return obs, reward, terminated, truncated, info

def reset(self) -> tuple[np.ndarray[float], dict]:

obs, info = self.env.reset()
obs: np.ndarray[float] = obs/self.env.observation_space.high
obs=np.append(obs, 0)

# Initialize the history with the same observation
for i in range(len(self.history)):
self.history.append(obs)
obs = np.array(list(self.history))
return obs, info
``````

Here, you can see that we are using `deque` to keep a list of the history of fixed-size `Nhist`. We also added the action to the observation. Now, we can train the agent as usual. Below, we can see how the training occurs in comparison to FCN. It is pretty similar. This means the LSTM is not having too big of an effect here. But in some complex environments this can be useful and worth trying.

That's all for this part. We saw how to implement a deep Q learning algorithm from scratch and use any network as the Q network, not just FCN. We saw LSTM, but you do this with transformers for an even longer context history. In the next part we will look into how to solve the limitation of using a discrete action space by implementing the Reinforce Algorithm.

Want to connect?