Implementing Policy Gradient in Python — Full article with line-by-line code explanations

Understanding Policy Gradients

A policy is a mapping from states to (possibly stochastic) actions. Policy gradient methods parametrize the policy (usually a neural network) and update parameters θ to maximize expected reward. The key formula:

θJ(θ)=E[θlogπθ(as)R]\nabla_\theta J(\theta) = \mathbb{E}[\nabla_\theta \log \pi_\theta(a|s)\cdot R]

This means we push up the probability of actions that lead to high returns. Now — implementation.

Explanation of the REINFORCE Algorithm

The key idea is to maximize the expected cumulative reward

J(θ)=Eπθ[R(τ)] J(\theta) = \mathbb{E}_{\pi_\theta} [R(\tau)]  

where θ\theta are the policy parameters, πθ\pi_\theta is the policy, and τ\tau is a trajectory (sequence of states, actions, and rewards).

  • The policy gradient is derived as:

θJ(θ)=Eπθ[t=0T1θlogπθ(atst)Gt],\nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta} \left[ \sum_{t=0}^{T-1} \nabla_\theta \log \pi_\theta(a_t | s_t) \cdot G_t \right],

where,

Gt=k=tT1γktrk G_t = \sum_{k=t}^{T-1} \gamma^{k-t} r_k

is the discounted return (future rewards) from timestep t t , and γ\gamma is the discount factor. This is estimated via Monte Carlo sampling from episodes: for each trajectory, compute.

tθlogπθ(atst)

Gt\sum_t \nabla_\theta \log \pi_\theta(a_t | s_t) \cdot G_t and perform a gradient ascent step θθ+αθJ(θ)\theta \leftarrow \theta + \alpha \nabla_\theta J(\theta), with learning rate α\alpha.

To reduce variance (a common issue in pure REINFORCE), one can subtract a baseline (e.g., average return), but the basic version uses no baseline.

This derivation relies on the log-trick and the fact that the environment dynamics are independent of θ\theta, making it model-free.

Implementing Policy Gradient in Python — step-by-step (detailed code + line-by-line explanations)

Note: the code below is robust to different gym versions (some return different shapes from reset() / step()).

Setup — imports & environment creation

				
					import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

				
			

Code Explanation

The first step in Implementing Policy Gradient in Python is setting up the environment.

  • import gym — imports OpenAI Gym to create RL environments (CartPole in our example).

  • import numpy as np — imports NumPy for numerical operations and random sampling.

  • import torch — imports PyTorch main package (tensor operations).

  • import torch.nn as nn — imports PyTorch neural-network helper classes (layers, modules).

  • import torch.optim as optim — imports optimizers like Adam used for parameter updates.

				
					env = gym.make("CartPole-v1")

				
			
  • env = gym.make("CartPole-v1") — creates the CartPole environment; an agent must balance a pole on a cart. This environment has discrete actions (left/right).
				
					# Optional: reproducibility
torch.manual_seed(42)
np.random.seed(42)

				
			
  • torch.manual_seed(42) & np.random.seed(42) — set random seeds for reproducibility so training is more deterministic across runs (useful while debugging).

Policy Network — define the neural policy (PyTorch)

				
					class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, action_dim)
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return self.softmax(x)

				
			

Code Explanation

Here, we define a Policy Network using PyTorch. In Implementing Policy Gradient in Python, this network represents our policy function

πθ(as)\pi_\theta(a|s)

.

  • class PolicyNetwork(nn.Module): — defines a neural network class that inherits from torch.nn.Module. This will represent our policy πθ(a|s).

  • def __init__(self, state_dim, action_dim, hidden_dim=128): — constructor; state_dim is input size (CartPole: 4), action_dim is number of discrete actions (CartPole: 2), hidden_dim is size of hidden layer.

  • super(PolicyNetwork, self).__init__() — initializes the base nn.Module.

  • self.fc1 = nn.Linear(state_dim, hidden_dim) — first fully-connected layer mapping state → hidden features.

  • self.fc2 = nn.Linear(hidden_dim, action_dim) — final linear layer mapping hidden features → action logits (unnormalized scores).

  • self.softmax = nn.Softmax(dim=-1) — softmax layer to convert logits into action probabilities summing to 1.

  • def forward(self, x): — forward pass function used when calling the network.

  • x = torch.relu(self.fc1(x)) — apply first linear layer then ReLU non-linearity.

  • x = self.fc2(x) — compute logits for each action.

  • return self.softmax(x) — return probabilities for each action.

				
					state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
policy = PolicyNetwork(state_dim, action_dim)
optimizer = optim.Adam(policy.parameters(), lr=0.01)

				
			
  • state_dim = env.observation_space.shape[0] — CartPole observation is a vector; this grabs its size (4).

  • action_dim = env.action_space.n — number of discrete actions (2).

  • policy = PolicyNetwork(state_dim, action_dim) — instantiate the policy network.

  • optimizer = optim.Adam(policy.parameters(), lr=0.01) — Adam optimizer to update the policy parameters. Learning rate is 0.01 (tuneable).

Collecting one episode — sampling actions & storing log-probs

				
					def run_episode(env, policy):
    rewards = []
    log_probs = []
    reset_output = env.reset()
    state = reset_output[0] if isinstance(reset_output, tuple) else reset_output

    done = False
    while not done:
        state_tensor = torch.from_numpy(np.array(state)).float()
        action_probs = policy(state_tensor)
        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        step_output = env.step(action.item())
        if len(step_output) == 5:
            next_state, reward, terminated, truncated, _ = step_output
            done = bool(terminated or truncated)
        else:
            next_state, reward, done, _ = step_output

        rewards.append(reward)
        log_probs.append(log_prob)
        state = next_state

    return rewards, log_probs

				
			

Code Explanation

In Implementing Policy Gradient in Python, the agent interacts with the environment to collect trajectories.

  • def run_episode(env, policy): — function to run one episode and collect per-step rewards and action log-probabilities.

  • rewards = [] — list to store rewards collected at each timestep.

  • log_probs = [] — list to store log_prob of the action chosen at each timestep (used later in loss).

  • reset_output = env.reset() — reset environment to initial state. Newer gym versions return (obs, info); older return obs.

  • state = reset_output[0] if isinstance(reset_output, tuple) else reset_output — robust extraction of the observation (state) whether reset() returned tuple or not.

  • done = False — episode termination flag.

  • while not done: — loop until episode ends.

  • state_tensor = torch.from_numpy(np.array(state)).float() — convert the state (NumPy array) to a torch.FloatTensor. We use np.array() to ensure consistent type.

  • action_probs = policy(state_tensor) — forward pass through policy to get action probabilities πθ(a|s).

  • dist = torch.distributions.Categorical(action_probs) — create a categorical distribution with the policy probabilities (convenient for sampling and log-probs).

  • action = dist.sample() — sample an action from the distribution (tensor).

  • log_prob = dist.log_prob(action) — get log probability of the sampled action; store for gradient calculation later.

  • step_output = env.step(action.item()) — take the action in environment (use .item() to convert tensor->int).

  • if len(step_output) == 5: — gym v0.26+ returns 5 items (obs, reward, terminated, truncated, info) while older returns 4; we branch accordingly.

  • next_state, reward, terminated, truncated, _ = step_output and done = bool(terminated or truncated) — determine done.

  • else: branch handles older gym: next_state, reward, done, _ = step_output.

  • rewards.append(reward) — store reward for this timestep.

  • log_probs.append(log_prob) — store log-probability for later.

  • state = next_state — update state and continue loop.

  • return rewards, log_probs — provide the collected rewards and log-probs to the trainer.

Computing discounted returns (and normalization)

				
					def compute_returns(rewards, gamma=0.99):
    returns = []
    G = 0.0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns = torch.tensor(returns, dtype=torch.float32)
    returns = (returns - returns.mean()) / (returns.std() + 1e-9)
    return returns

				
			

Code Explanation

To train using implementing Policy Gradient in Python, we must calculate the discounted return for each step.

  • We loop backward through rewards to compute:

     

    Gt=rt+γrt+1+γ2rt+2+...G_t = r_t + \gamma r_{t+1} + \gamma^2 r_{t+2} + …

     

  • Normalization ensures training stability.

This return value acts as a weight in our policy gradient loss, guiding the network to favor actions leading to higher rewards.

  • def compute_returns(rewards, gamma=0.99): — compute discounted cumulative returns for each timestep. gamma is the discount factor.

  • returns = [] — will hold returns for each timestep, same length as rewards.

  • G = 0.0 — running total for discounted future reward.

  • for r in reversed(rewards): — iterate rewards from last to first to compute discounted sums efficiently.

  • G = r + gamma * G — update running discounted reward: current reward r plus discounted future G.

  • returns.insert(0, G) — insert at front to maintain original time order.

  • returns = torch.tensor(returns, dtype=torch.float32) — convert list to PyTorch tensor.

  • returns = (returns - returns.mean()) / (returns.std() + 1e-9) — normalize returns to zero mean and unit variance for training stability (small epsilon avoids division by zero).

  • return returns — return normalized returns as torch.FloatTensor.

Why normalize? Normalization reduces variance of gradients and stabilizes learning, especially early in training.

Policy update (loss & optimizer step)

				
					def update_policy(log_probs, returns, optimizer):
    loss = 0
    for log_prob, G in zip(log_probs, returns):
        loss += -log_prob * G
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

				
			

Code Explanation

This is the heart of Implementing Policy Gradient in Python.

  • def update_policy(log_probs, returns, optimizer): — apply policy gradient update using stored log-probs and computed returns.

  • loss = 0 — initialize scalar loss. In practice we will sum step losses.

  • for log_prob, G in zip(log_probs, returns): — iterate over each timestep’s log probability and its corresponding return.

  • loss += -log_prob * G — accumulate negative log-probability times return. We minimize loss, so using negative converts the gradient descent to gradient ascent on expected reward (we want to increase probability of actions that resulted in high returns).

  • optimizer.zero_grad() — clear previous gradients.

  • loss.backward() — compute gradients of loss w.r.t. policy parameters.

  • optimizer.step() — perform gradient descent step (which in effect performs gradient ascent on expected return because of negative sign in loss).

Note: loss is a scalar PyTorch tensor (auto-differentiable). Optionally you can average over steps or over episodes.

Training loop — bring it all together

				
					num_episodes = 1000
reward_history = []

for episode in range(1, num_episodes + 1):
    rewards, log_probs = run_episode(env, policy)
    returns = compute_returns(rewards, gamma=0.99)
    update_policy(log_probs, returns, optimizer)

    total_reward = sum(rewards)
    reward_history.append(total_reward)

    if episode % 50 == 0:
        avg_reward = np.mean(reward_history[-50:])
        print(f"Episode {episode}, Average reward (last 50): {avg_reward:.2f}")

        # optional: stopping condition for CartPole-v1 (replace threshold as needed)
        if avg_reward >= 475:
            print("Solved CartPole-v1!")
            break

				
			

Code Explanation

The training loop ties everything together in Implementing Policy Gradient in Python.

  • num_episodes = 1000 — number of training episodes (tune as desired).

  • reward_history = [] — list to record total reward per episode for plotting/analysis.

  • for episode in range(1, num_episodes + 1): — training loop over episodes.

  • rewards, log_probs = run_episode(env, policy) — collect one episode’s rewards and log-probs.

  • returns = compute_returns(rewards, gamma=0.99) — compute normalized discounted returns.

  • update_policy(log_probs, returns, optimizer) — update policy network parameters using the collected trajectory.

  • total_reward = sum(rewards) — total reward for this episode (useful metric).

  • reward_history.append(total_reward) — append for tracking/trends.

  • if episode % 50 == 0: — every 50 episodes print progress.

  • avg_reward = np.mean(reward_history[-50:]) — compute mean of last 50 episodes.

  • print(...) — display progress.

  • if avg_reward >= 475: — optional solved condition for CartPole-v1 (CartPole-v1 max score is 500).

  • break — exit early if solved.

Full combined code (copy-paste run-ready)

				
					import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

# Environment & seeds
env = gym.make("CartPole-v1")
torch.manual_seed(42)
np.random.seed(42)

# Policy network
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, action_dim)
        self.softmax = nn.Softmax(dim=-1)
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return self.softmax(x)

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
policy = PolicyNetwork(state_dim, action_dim)
optimizer = optim.Adam(policy.parameters(), lr=0.01)

def run_episode(env, policy):
    rewards = []
    log_probs = []
    reset_output = env.reset()
    state = reset_output[0] if isinstance(reset_output, tuple) else reset_output

    done = False
    while not done:
        state_tensor = torch.from_numpy(np.array(state)).float()
        action_probs = policy(state_tensor)
        dist = torch.distributions.Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        step_output = env.step(action.item())
        if len(step_output) == 5:
            next_state, reward, terminated, truncated, _ = step_output
            done = bool(terminated or truncated)
        else:
            next_state, reward, done, _ = step_output

        rewards.append(reward)
        log_probs.append(log_prob)
        state = next_state

    return rewards, log_probs

def compute_returns(rewards, gamma=0.99):
    returns = []
    G = 0.0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns = torch.tensor(returns, dtype=torch.float32)
    returns = (returns - returns.mean()) / (returns.std() + 1e-9)
    return returns

def update_policy(log_probs, returns, optimizer):
    loss = 0
    for log_prob, G in zip(log_probs, returns):
        loss += -log_prob * G
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Training loop
num_episodes = 1000
reward_history = []
for episode in range(1, num_episodes + 1):
    rewards, log_probs = run_episode(env, policy)
    returns = compute_returns(rewards)
    update_policy(log_probs, returns, optimizer)

    total_reward = sum(rewards)
    reward_history.append(total_reward)

    if episode % 50 == 0:
        avg_reward = np.mean(reward_history[-50:])
        print(f"Episode {episode}, Average reward (last 50): {avg_reward:.2f}")
        if avg_reward >= 475:
            print("Solved CartPole-v1!")
            break

				
			

What to run & dependencies

  • Install required packages if you haven’t: pip install gym torch numpy (or pip install gym[box2d] etc. if needed).

  • Save code to a file (e.g., pg_cartpole.py) and run with python pg_cartpole.py.

  • The training prints average reward every 50 episodes.

Results & quick notes

  • With the basic REINFORCE implementation above, you should see average reward improve over episodes. CartPole may be solved after a few hundred episodes depending on randomness and hyperparameters.

  • If training is unstable: try lowering learning rate (e.g., 1e-3), increase hidden layer size, or use baselines.

Possible improvements (brief)

One can improve the basic REINFORCE implementation (we suggest these for next steps):

  • Baseline subtraction: subtract a value baseline (e.g., average return or learned value) to reduce variance.

  • Actor-Critic: jointly learn a value function (critic) to estimate baseline — usually much more sample efficient.

  • Generalized Advantage Estimation (GAE): for lower variance and bias.

  • Better optimizers / learning rate schedules, gradient clipping.

  • Batching multiple episodes before update to stabilize gradients.

Conclusion

You now have a complete, line-by-line explained implementation of REINFORCE for CartPole. This article is centered on Implementing Policy Gradient in Python and shows how to: build a policy network, sample actions, compute discounted returns, and update the policy using log-probabilities and returns.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top