Note
You can download this example as a Jupyter notebook or try it out directly in Google Colab.
4.2 Reinforcement learning tutorial#
This tutorial will introduce users into ASSUME and its ways of using reinforcement learning (RL). The main objective of this tutorial is to ensure participants grasp the steps required to equip a new unit with RL strategies or modify the action dimensions. Our emphasis lies in the bidding strategy, with less focus on the algorithm and role. The latter are usable as a plug-and-play solution in the framework. The following coding tasks will highlight the key aspects to be adjusted, as already outlined in the learning_strategies.py file.
The outline of this tutorial is as follows. We will start with a basic summary of the implementation of reinforcement learning (RL) in ASSUME and its architecture (1. ASSUME & Learning Basics). A brief refresher is also given in this exercise. If you need a more thorough refresher on RL in general, please visit our readthedocs (Reinforcement Learning Overview & Reinforcement Learning Algorithms). Afterwards, we install ASSUME in this Google Colab (2. Get ASSUME running) and then we dive into the learning_strategies.py file and explain how we need to adjust conventional bidding strategies to incorporate RL (3. Make ASSUME learn).
As a whole, this tutorial covers the following coding tasks:
How to define a step function in the ASSUME framework.
How do we get observations from the simulation framework.
How do we define actions based on the output of the actor neural network considering necessary exploration?
How do we define the reward?
1. ASSUME & Learning Basics#
ASSUME in general is intended for researchers, planners, utilities and everyone searching to understand market dynamics of energy markets. It provides an easy-to-use tool-box as a free software that can be tailored to the specific use case of the user.
In the following figure the architecture of the framework is depicted. It can be roughly divided into two parts. On the left side of the world class the markets are located and on the right side the market participants, which are here named units. Both worlds are connected via the orders that market participants place on the markets. The learning capability is sketched out with the yellow classes on the right side, namely the units side.
[ ]:
# this cell is used to display the image in the notebook when using colab
# or running the notebook locally
import os
from IPython.display import SVG, display
image_path = "assume-repo/docs/source/img/architecture.svg"
alt_image_path = "../../docs/source/img/architecture.svg"
if os.path.exists(image_path):
display(SVG(image_path))
elif os.path.exists(alt_image_path):
display(SVG(alt_image_path))
Let’s focus on the bright yellow part of the architecture, namely the learning algorithm, the actor and the critic. We start with some reinforcement learning background. In the current implementation of ASSUME, we model the electricity market as a partially observable Markov game, which is an extension of MDPs for multi-agent setups. Following, a brief summary of the more detailed documentation on Reinforcement Learning is provided.
Multi-agent DRL involves multiple agents learning simultaneously while interacting in the same environment. In a Markov game, agents exist in a set of states and can take actions, receive observations, and transition between states. Each agent follows a policy aimed at maximizing its expected reward based on individual reward functions and private observations.
To adapt from a single-agent algorithm like TD3 to a multi-agent version (MATD3), certain modifications are necessary. The learning process begins by understanding single-agent learning and then extends to multi-agent scenarios.
Single-Agent Learning#
The approach uses an actor-critic method with two neural networks: an actor network and a critic network. The actor network selects actions, while the critic network evaluates the quality of those actions. Both networks are trained simultaneously through an iterative process.
This approach allows the agent to learn an optimal policy by continuously improving its understanding of which actions lead to the highest rewards in different states.
Multi-Agent Learning#
In a multi-agent setup, state transitions and rewards depend on the actions of all learning agents, unlike single-agent scenarios. This creates a non-stationary environment that violates the Markov property, invalidating traditional single-agent reinforcement learning convergence guarantees.
To address this challenge, the approach uses a centralized training and decentralized execution framework, expanding on the MADDPG algorithm. During training, a centralized critic has access to the entire state and all agents’ actions, which helps explain state transition changes. However, during both training and execution, each agent’s actor uses only its local observations.
For each agent, two centralized critics are trained alongside target critic networks to address Overestimation Bias. Similar to TD3, the approach uses the smaller value of two critics and adds target action noise to calculate the target value. The critics are trained using the mean squared Bellman error loss.
The actor policy for each agent is updated using the deterministic policy gradient algorithm. Each actor uses only one critic network to update its policy. These modifications to the original DDPG algorithm aim to increase stability and convergence, particularly in multi-agent reinforcement learning scenarios. The key innovation is enabling agents to learn effectively in complex, interactive environments by using centralized information during training while maintaining decentralized decision-making during execution.
2. Get ASSUME running#
Here we just install the ASSUME core package via pip. In general the instructions for an installation can be found here: https://ASSUME.readthedocs.io/en/latest/installation.html. All the required steps are executed here and since we are working in colab the generation of a venv is not necessary.
As we will be working with learning agents, we need to install ASSUME with all learning dependencies such as torch. For this, we use the [learning] attribute.
You don’t need to execute the following code cell if you already have the ASSUME framework installed including learning dependencies
[ ]:
import importlib.util
# Check if 'google.colab' is available
IN_COLAB = importlib.util.find_spec("google.colab") is not None
if IN_COLAB:
!pip install 'assume-framework[learning]'
# Colab currently has issues with pyomo version 6.8.2, causing the notebook to crash
# Installing an older version resolves this issue. This should only be considered a temporary fix.
!pip install pyomo==6.8.0
And easy as this we have ASSUME installed. Now we can let it run. Please note though that we cannot use the functionalities tied to docker and, hence, cannot access the predefined dashboards in colab. For this please install docker and ASSUME on your personal machine.
Further we would like to access the predefined scenarios in ASSUME which are stored on the git repository. Hence, we clone the repository.
You don’t need to execute the following code cell if you already have the ASSUME repository cloned.
[ ]:
if IN_COLAB:
!git clone --depth=1 https://github.com/assume-framework/assume.git assume-repo
Let the magic happen. Now you can run your first ever simulation in ASSUME. The following code navigates to the respective ASSUME folder and starts the simulation example example_01b using the local database here in colab.
When running locally, you can also just run ASSUME -s example_01b -db "sqlite:///./examples/local_db/ASSUME_db_example_01b.db"
in a shell
[ ]:
if IN_COLAB:
!cd assume-repo && assume -s example_01b -db "sqlite:///./examples/local_db/assume_db_example_01b.db"
Select input files path:
We also need to differentiate between the input file paths when using this tutorial in Google Colab and a local environment. The code snippets will include both options for your convenience.
[ ]:
colab_inputs_path = "assume-repo/examples/inputs"
local_inputs_path = "../inputs"
inputs_path = colab_inputs_path if IN_COLAB else local_inputs_path
3. Make your agents learn#
Now it is time to get your hands dirty and actually dive into coding in ASSUME. The main objective of this session is to ensure participants grasp the steps required to equip a new unit with RL strategies or modify the action dimensions. Our emphasis lies in the bidding strategy, with less focus on the algorithm and role. Coding tasks will highlight the key aspects to be adjusted, as already outlined in the learning_strategies.py file. Subsequent sections will present the tasks and provide the correct answers for the coding exercises.
We start by initializing the class of our Learning Strategy. This is very closely related to the general structure of a bidding strategy.
But first some imports:
[ ]:
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
import numpy as np
import pandas as pd
import torch as th
import yaml
from assume import World
from assume.common.base import LearningStrategy, SupportsMinMax
from assume.common.market_objects import MarketConfig, Orderbook, Product
from assume.reinforcement_learning.algorithms import actor_architecture_aliases
from assume.reinforcement_learning.learning_utils import NormalActionNoise
from assume.scenario.loader_csv import load_scenario_folder, run_learning
[ ]:
class RLStrategy(LearningStrategy):
"""
Reinforcement Learning Strategy
"""
def __init__(self, *args, **kwargs):
super().__init__(obs_dim=50, act_dim=2, unique_obs_dim=2, *args, **kwargs)
self.unit_id = kwargs["unit_id"]
# defines bounds of actions space
self.max_bid_price = kwargs.get("max_bid_price", 100)
self.max_demand = kwargs.get("max_demand", 10e3)
# tells us whether we are training the agents or just executing per-learnind strategies
self.learning_mode = kwargs.get("learning_mode", False)
self.perform_evaluation = kwargs.get("perform_evaluation", False)
# based on learning config define algorithm configuration
self.algorithm = kwargs.get("algorithm", "matd3")
actor_architecture = kwargs.get("actor_architecture", "mlp")
# define the architecture of the actor neural network
# if you use many time series niputs you might want to use the LSTM instead of the MLP for example
if actor_architecture in actor_architecture_aliases.keys():
self.actor_architecture_class = actor_architecture_aliases[
actor_architecture
]
else:
raise ValueError(
f"Policy '{actor_architecture}' unknown. Supported architectures are {list(actor_architecture_aliases.keys())}"
)
# sets the devide of the actor network
device = kwargs.get("device", "cpu")
self.device = th.device(device if th.cuda.is_available() else "cpu")
if not self.learning_mode:
self.device = th.device("cpu")
# future: add option to choose between float16 and float32
# float_type = kwargs.get("float_type", "float32")
self.float_type = th.float
# for definition of observation space
self.foresight = kwargs.get("foresight", 24)
if self.learning_mode:
self.learning_role = None
self.collect_initial_experience_mode = kwargs.get(
"episodes_collecting_initial_experience", True
)
self.action_noise = NormalActionNoise(
mu=0.0,
sigma=kwargs.get("noise_sigma", 0.1),
action_dimension=self.act_dim,
scale=kwargs.get("noise_scale", 1.0),
dt=kwargs.get("noise_dt", 1.0),
)
elif Path(load_path=kwargs["trained_policies_save_path"]).is_dir():
self.load_actor_params(load_path=kwargs["trained_policies_save_path"])
3.1 The “Step Function”#
The key function in an RL problem is the step that is taken in the so called environment. It consist of the following parts:
Get an observation
Choose an action
Get a reward
Update your policy
In ASSUME we do not have such a straightforward step function. The steps 1 & 2 are combined in the calculate_bids() function which is called as soon as an offer on the market is placed. The step 3, however, can only happen after we get the market feedback from the simulation run and, hence, is in the calculate_reward() function. Step 4 is solely handeled by the learning_role as it schedules the policy update, manages the buffer and what not. Hence, it is actually not included in this notebook, since we only focus on transforming the bidding strategy into a learning one.
Step 1-3 will be implemented in the following sections 3.2 to 3.4. If there is a coding task for you it will be marked accordingly.
[ ]:
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_bids(
self,
unit: SupportsMinMax,
market_config: MarketConfig,
product_tuples: list[Product],
**kwargs,
) -> Orderbook:
"""
Calculate bids for a unit -> STEP 1 & 2
"""
start = product_tuples[0][0]
end = product_tuples[0][1]
# get technical bounds for the unit output from the unit
min_power, max_power = unit.calculate_min_max_power(start, end)
min_power = min_power[start]
max_power = max_power[start]
# =============================================================================
# 1. Get the Observations, which are the basis of the action decision
# =============================================================================
next_observation = self.create_observation(
unit=unit,
market_id=market_config.market_id,
start=start,
end=end,
)
# =============================================================================
# 2. Get the Actions, based on the observations
# =============================================================================
actions, noise = self.get_actions(next_observation)
bids = actions
bids = self.remove_empty_bids(bids)
return bids
[ ]:
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_reward(
self,
unit,
marketconfig: MarketConfig,
orderbook: Orderbook,
):
"""
Calculate reward
"""
return None
3.2 Get an observation#
The decision about the observations received by each agent plays a crucial role when designing a multi-agent RL setup. The following describes the task of learning agents representing profit-maximizing electricity market participants who either sell a generating unit’s output or optimize a storage unit’s operation. They are represented through their plants’ techno-economic parameters, such as minimal operational capacity \(P^{min}\), start-up \(c^{su}\), and shut-down \(c^{sd}\) costs. This information is all know by the unit itself and, hence, also accessible in the bidding strategy.
During the training phase, the centralized critic receives observations from all agents, resulting in an input size that grows linearly with the number of agents. This can lead to unstable training behavior of the critic networks, which limits the maximal number of agents in the simulation. This effect is known as the dimensionality curse, which likely contributed to the small number of learning agents in existing approaches. To address the dimensionality curse, we use a single observation that is the same for all agents and add a small size of unique observations for each agent to improve their performance. This modification allows the use of only one observation in the centralized critic, decoupled from the number of learning agents, significantly reducing the observation size and enabling simultaneous training of hundreds of learning agents with stable training behavior. The only limiting factor is the available working memory.
At time-step \(t\), agent \(i\) receives the observation \(o_{i,t}\) consisting of vectors \([L_{\mathrm{h},t}, L_{\mathrm{f},t}, M_{\mathrm{h},t}, M_{\mathrm{f},t}, mc_{i,t}]\). Here \(L_{\mathrm{h},t}, L_{\mathrm{f},t}\) and \(M_{\mathrm{h},t}, M_{\mathrm{f},t}\) are the past and the forecast residual loads and market prices, respectively. This information stems from the world, where an overall forecasting role generates them. The price forecast is calculated ahead of the simulation run using a simple merit order model based on the residual load forecast and the marginal cost of power plants. This part of the observation is the same for all agents. In addition, each agent receives its current marginal cost \(mc_{i,t}\). Information about the marginal cost is shared with a centralized critic during the training phase. Still, it is not shared with other agents during the execution phase. All the inputs are normalized to improve the performance of the training process.
Task 1#
Goal: With the help of the unit, the starttime and the endtime we want to create the Observations for the unit.
There are 4 different observations: - residual load forecast - price forecast - total capacity of the unit - marginal costs of the unit
For all observations we need scaling factors. Why do you think it is important to scale the input? How would you define the scaling factors?
[ ]:
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def create_observation(
self,
unit: SupportsMinMax,
market_id: str,
start: datetime,
end: datetime,
):
"""
Create observation
"""
end_excl = end - unit.index.freq
# get the forecast length depending on the time unit considered in the modelled unit
forecast_len = pd.Timedelta((self.foresight - 1) * unit.index.freq)
# =============================================================================
# 1.1 Get the Observations, which are the basis of the action decision
# =============================================================================
# residual load forecast
scaling_factor_res_load = None # TODO
# price forecast
scaling_factor_price = None # TODO
# total capacity
scaling_factor_total_capacity = None # TODO
# marginal cost
scaling_factor_marginal_cost = None # TODO
# checks if we are at the end of the simulation horizon, since we need to change the forecast then
# for residual load and price forecast and scale them
if (
end_excl + forecast_len
> unit.forecaster[f"residual_load_{market_id}"].index[-1]
):
scaled_res_load_forecast = (
unit.forecaster[f"residual_load_{market_id}"].loc[start:]
/ scaling_factor_res_load
)
scaled_res_load_forecast = np.concatenate(
[
scaled_res_load_forecast,
unit.forecaster[f"residual_load_{market_id}"].iloc[
: self.foresight - len(scaled_res_load_forecast)
],
]
)
else:
scaled_res_load_forecast = (
unit.forecaster[f"residual_load_{market_id}"].loc[
start : end_excl + forecast_len
]
/ scaling_factor_res_load
)
if end_excl + forecast_len > unit.forecaster[f"price_{market_id}"].index[-1]:
scaled_price_forecast = (
unit.forecaster[f"price_{market_id}"].loc[start:] / scaling_factor_price
)
scaled_price_forecast = np.concatenate(
[
scaled_price_forecast,
unit.forecaster[f"price_{market_id}"].iloc[
: self.foresight - len(scaled_price_forecast)
],
]
)
else:
scaled_price_forecast = (
unit.forecaster[f"price_{market_id}"].loc[
start : end_excl + forecast_len
]
/ scaling_factor_price
)
# get last accepted bid volume and the current marginal costs of the unit
current_volume = unit.get_output_before(start)
current_costs = unit.calculate_marginal_cost(start, current_volume)
# scale unit outputs
scaled_total_capacity = current_volume / scaling_factor_total_capacity
scaled_marginal_cost = current_costs / scaling_factor_marginal_cost
# concat all obsverations into one array
observation = np.concatenate(
[
scaled_res_load_forecast,
scaled_price_forecast,
np.array([scaled_total_capacity, scaled_marginal_cost]),
]
)
# transfer array to GPU for NN processing
observation = (
th.tensor(observation, dtype=self.float_type)
.to(self.device, non_blocking=True)
.view(-1)
)
return observation.detach().clone()
Solution 1#
First why do we scale?
Scaling observations is a crucial preprocessing step in machine learning, including reinforcement learning. It involves transforming the features so that they all fall within a similar numerical range. This is important for several reasons. Firstly, it aids in numerical stability during training. Large input values can lead to numerical precision issues, potentially causing the algorithm to perform poorly or even fail to converge. By scaling the features, we mitigate this risk, ensuring a more stable and reliable learning process.
Additionally, scaling promotes uniformity in the learning process. Many optimization algorithms, like gradient descent, adjust model parameters based on the magnitude of gradients. When features have vastly different scales, some may dominate the learning process, while others receive less attention. This imbalance can hinder convergence and result in a suboptimal model. Scaling addresses this issue, allowing the algorithm to treat all features equally and progress more efficiently towards an optimal solution. This not only expedites the learning process but also enhances the model’s ability to generalize to new, unseen data. In essence, scaling observations is a fundamental practice that enhances the performance and robustness of machine learning models across a wide array of applications.
According to this, the scaling should ensure a similar range for all input parameters. You can achieve that by choosing the following scaling factors. If you add new observations, choose your scaling factors wisely.
[ ]:
"""
#scaling factors for all observations
#residual load forecast
scaling_factor_res_load = self.max_demand
# price forecast
scaling_factor_price = self.max_bid_price
# total capacity
scaling_factor_total_capacity = unit.max_power
# marginal cost
scaling_factor_marginal_cost = self.max_bid_price
"""
3.3 Choose an action#
To differentiate between the inflexible and flexible parts of a plant’s generation capacity, we split the bids into two parts. The first bid part allows agents to bid a very low or even negative price for the inflexible capacity; this reflects the agent’s motivation to stay infra-marginal during periods of very low net load (e.g., in periods of high solar and wind power generation) to avoid the cost of a shut-down and subsequent start-up of the plant. The flexible part of the capacity can be offered at a higher price to provide chances for higher profits. The actions of agent \(i\) at time-step \(t\) are defined as \(a_{i,t} = [ep^\mathrm{inflex}_{i,t}, ep^\mathrm{flex}_{i,t}] \in [ep^{min},ep^{max}]\), where \(ep^\mathrm{inflex}_{i,t}\) and \(ep^\mathrm{flex}_{i,t}\) are bid prices for the inflexible and flexible capacities, and \(ep^{min},ep^{max}\) are minimal and maximal bid prices, respectively.
How do we learn, how to make good decisions? Basically by try and error, also know as exploration. Exploration is a fundamental concept in reinforcement learning, representing the strategy by which an agent interacts with its environment to gather information about the consequences of its actions. This is crucial because without exploration, the agent might settle for suboptimal policies based on its initial knowledge, limiting its ability to discover more rewarding states or actions.
In the initial stages of training, also often called initial exploration, it’s imperative to employ almost random actions. This means having the agent take actions purely by chance. This seemingly counterintuitive approach serves a critical purpose. Initially, the agent lacks any meaningful information about the environment, making it impossible to make informed decisions. By taking random actions, it can quickly gather a broad range of experiences, allowing it to grasp the fundamental structure of the environment. These random actions serve as a kind of “baseline exploration,” providing a starting point from which the agent can refine its policy through learning. With our domain knowledge we can even guide the initial exploration process, to enhance learning capabilities.
Following up on these concepts the following tasks will: 1. obtain the action values from the neurnal net in the bidding staretgy and 2. then transform theses values into the actual bids of an order.
Task 2.1#
Goal: With the observations and noise we generate actions
In the following task we define the actions for the initial exploration mode. As described before we can guide it by not letting it choose random actions but defining a base-bid on which we add a good amount of noise. In this way the initial strategy starts from a solution that we know works somewhat well. Define the respective base bid in the following code. Remember we are defining bids for a conventional power plant bidding in an Energy-Only-Market with a uniform pricing auction.
[ ]:
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def get_actions(self, next_observation):
"""
Get actions
"""
# distinction whether we are in learning mode or not to handle exploration realised with noise
if self.learning_mode:
# if we are in learning mode, the first x episodes we want to explore the entire action space
# to get a good initial experience in the area around the costs of the agent
if self.collect_initial_experience_mode:
# define current action as solely noise
noise = (
th.normal(
mean=0.0, std=0.2, size=(1, self.act_dim), dtype=self.float_type
)
.to(self.device)
.squeeze()
)
# =============================================================================
# 2.1 Get Actions and handle exploration
# =============================================================================
# ==> YOUR CODE HERE
base_bid = None # TODO
# add noise to the last dimension of the observation
# needs to be adjusted if observation space is changed, because only makes sense
# if the last dimension of the observation space are the marginal cost
curr_action = noise + base_bid.clone().detach()
else:
# if we are not in the initial exploration phase we chose the action with the actor neuronal net
# and add noise to the action
curr_action = self.actor(next_observation).detach()
noise = th.tensor(
self.action_noise.noise(), device=self.device, dtype=self.float_type
)
curr_action += noise
else:
# if we are not in learning mode we just use the actor neuronal net to get the action without adding noise
curr_action = self.actor(next_observation).detach()
noise = th.zeros(self.act_dim, dtype=self.float_type)
curr_action = curr_action.clamp(-1, 1)
return curr_action, noise
Solution 2.1#
So how do we define the base bid?
Assuming the described auction is an efficient market with full information and competition, we know that bidding the marginal costs of the power plant is the economically best bid. With the RL strategy we can recreate the abuse of market power and incomplete information, which enables us to model different market settings. Yet, starting off with the theoretically styleized optimal solution guides our RL agents properly. As the marginal costs of the power plant are part of the oberservations we can define the base bid in the following way.
[ ]:
"""
#base_bid = marginal costs
base_bid = next_observation[-1] # = marginal_costs
"""
Task 2.2#
Goal: Define the actual bids with the outputs of the actors
Similarly to every other output of a neuronal network, the actions are in the range of 0-1. These values need to be translated into the actual bids \(a_{i,t} = [ep^\mathrm{inflex}_{i,t}, ep^\mathrm{flex}_{i,t}] \in [ep^{min},ep^{max}].\) This can be done in a way that further helps the RL agent to learn, if we put some thought into.
For this we go back into the calculate_bids() function and instead of just defining bids=actions, which was just a place holder, we actually make them into bids. Think about a smart way to transform them and fill the gaps in the following code. Remember:
bid_quantity_inflex represent the inflexible part of the bid. This represents the minimum run capacity of the unit.
bid_quantity_flex represent the flexible part of the bid. This represents the flexible capacity of the unit.
[ ]:
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_bids(
self,
unit: SupportsMinMax,
market_config: MarketConfig,
product_tuples: list[Product],
**kwargs,
) -> Orderbook:
"""
Calculate bids for a unit
"""
bid_quantity_inflex, bid_price_inflex = 0, 0
bid_quantity_flex, bid_price_flex = 0, 0
start = product_tuples[0][0]
end = product_tuples[0][1]
# get technical bounds for the unit output from the unit
min_power, max_power = unit.calculate_min_max_power(start, end)
min_power = min_power[0]
max_power = max_power[0]
# =============================================================================
# 1. Get the Observations, which are the basis of the action decision
# =============================================================================
next_observation = self.create_observation(
unit=unit,
market_id=market_config.market_id,
start=start,
end=end,
)
# =============================================================================
# 2. Get the Actions, based on the observations
# =============================================================================
actions, noise = self.get_actions(next_observation)
# =============================================================================
# 3.2 Transform Actions into bids
# =============================================================================
# ==> YOUR CODE HERE
# actions are in the range [0,1], we need to transform them into actual bids
# we can use our domain knowledge to guide the bid formulation
bid_prices = None # TODO
# calculate inflexible part of the bid
bid_quantity_inflex = None # TODO
bid_price_inflex = None # TODO
# calculate flexible part of the bid
bid_quantity_flex = None # TODO
bid_price_flex = None # TODO
# actually formulate bids in orderbook format
bids = [
{
"start_time": start,
"end_time": end,
"only_hours": None,
"price": bid_price_inflex,
"volume": bid_quantity_inflex,
"node": unit.node,
},
{
"start_time": start,
"end_time": end,
"only_hours": None,
"price": bid_price_flex,
"volume": bid_quantity_flex,
"node": unit.node,
},
]
# store results in unit outputs as lists to be written to the buffer for learning
unit.outputs["rl_observations"].append(next_observation)
unit.outputs["rl_actions"].append(actions)
# store results in unit outputs as series to be written to the database by the unit operator
unit.outputs["actions"].at[start] = actions
unit.outputs["exploration_noise"].at[start] = noise
return bids
Solution 2.2#
So how do we define the actual bid from the action?
We have the bid price for the minimum power (inflex) and the rest of the power. As the power plant needs to run at minimal the minimum power in order to offer generation in general, it makes sense to offer this generation at a lower price than the rest of the power. Hence, we can allocate the actions to the bid prices in the following way. In addition, the actions need to be rescaled of course.
[ ]:
"""
#calculate actual bids
#rescale actions to actual prices
bid_prices = actions * self.max_bid_price
#calculate inflexible part of the bid
bid_quantity_inflex = min_power
bid_price_inflex = min(bid_prices)
#calculate flexible part of the bid
bid_quantity_flex = max_power - bid_quantity_inflex
bid_price_flex = max(bid_prices)
"""
3.4 Get a reward#
This step is done in the calculate_reward()
-function, which is called after the market is cleared and we get the market feedback, so we can calculate the profit. In RL, the design of a reward function is as important as the choice of the correct algorithm. During the initial phase of the work, pure economic reward in the form of the agent’s profit was used. Typically, electricity market models consider only a single restart cost. Still, in the case of using RL, the split into shut-down and
start-up costs allow the agents to better differentiate between these two events and learn a better policy.
\begin{equation} \pi_{i,t} = \begin{cases} P^\text{conf}_{i,t} (M_t - mc_{i,t}) dt - c^{su}_i & \text{if $P^\text{conf}_{i,t}$ $\geq P^{min}_i$} \\ & \text{and $P_{i,t-1}$ $= 0$} \\ P^\text{conf}_{i,t} (M_t - mc_{i,t}) dt & \text{if $P^\text{conf}_{i,t}$ $\geq P^{min}_i$} \\ & \text{and $P_{i,t-1}$ $\neq 0$} \\ - c^{sd}_i & \text{if $P^\text{conf}_{i,t}$ $\leq P^{min}_i$} \\ & \text{and $P_{i,t-1}$ $\neq 0$} \\ 0 & \text{otherwise} \\ \end{cases} \end{equation}
In this equation, the variables are: * \(P^\text{conf}\) the confirmed capacity on the market * \(P^{min}\) the minimal stable capacity * \(M\) the market clearing price * \(mc\) the marginal generation cost * \(dt\) the market time resolution * \(c^{su}, c^{sd}\) the start-up and shut-down costs, respectively
The profit-driven reward function was sufficient for a few agents, but the learning performance decreased significantly with more agents. Therefore, we add an additional regret term \(cm\).
Task 3#
Goal: Define the reward guiding the learning process of the agent.
As the reward plays such a crucial role in the learning think of ways how to integrate further signals exceeding the monetary profit. One example could be integrating a regret term, namely the opportunity costs. Your task is to define the reward using the opportunity costs and to scale it.
[ ]:
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_reward(
self,
unit,
marketconfig: MarketConfig,
orderbook: Orderbook,
):
"""
Calculate reward
"""
# =============================================================================
# 3. Calculate Reward
# =============================================================================
# function is called after the market is cleared and we get the market feedback,
# so we can calculate the profit
product_type = marketconfig.product_type
profit = 0
reward = 0
opportunity_cost = 0
costs = 0
# iterate over all orders in the orderbook, to calculate order specific profit
for order in orderbook:
start = order["start_time"]
end = order["end_time"]
# end includes the end of the last product, to get the last products' start time we deduct the frequency once
end_excl = end - unit.index.freq
# depending on whether the unit calaculates marginal costs we take costs
marginal_cost = unit.calculate_marginal_cost(
start, unit.outputs[product_type].at[start]
)
duration = (end - start) / timedelta(hours=1)
# calculate profit as income - running_cost from this event
order_profit = order["accepted_price"] * order["accepted_volume"] * duration
order_cost = marginal_cost * order["accepted_volume"] * duration
# collect profit and opportunity cost for all orders
profit += order_profit
costs += order_cost
# calculate opportunity cost
# as the loss of income we have because we are not running at full power
opportunity_cost = (
(order["accepted_price"] - marginal_cost)
* (unit.max_power - unit.outputs[product_type].loc[start:end_excl]).sum()
* duration
)
# if our opportunity costs are negative, we did not miss an opportunity to earn money and we set them to 0
opportunity_cost = max(opportunity_cost, 0)
# consideration of start-up costs, which are evenly divided between the
# upward and downward regulation events
if (
unit.outputs[product_type].at[start] != 0
and unit.outputs[product_type].loc[start - unit.index.freq] == 0
):
costs += unit.hot_start_cost / 2
elif (
unit.outputs[product_type].at[start] == 0
and unit.outputs[product_type].loc[start - unit.index.freq] != 0
):
costs += unit.hot_start_cost / 2
profit = profit - costs
# =============================================================================
# =============================================================================
# ==> YOUR CODE HERE
# The straight forward implementation would be reward = profit, yet we would like to give the agent more guidance
# in the learning process, so we add a regret term to the reward, which is the opportunity cost
# define the reward and scale it
scaling = 0.1 / unit.max_power
regret_scale = None # TODO
reward = None # TODO
# store results in unit outputs which are written to database by unit operator
unit.outputs["profit"].loc[start:end_excl] += profit
unit.outputs["reward"].loc[start:end_excl] = reward
unit.outputs["regret"].loc[start:end_excl] = regret_scale * opportunity_cost
unit.outputs["total_costs"].loc[start:end_excl] = costs
unit.outputs["rl_rewards"].append(reward)
Solution 3#
So how do we define the actual reward?
We use the opportunity costs for further guidance, which quantify the expected contribution margin, as defined by the following equation, with \(P^{max}\) as the maximal available capacity.
\begin{equation} cm_{i,t} = \max[(P^{max}_i - P^\text{conf}_{i,t}) (M_t - mc_{i,t}) dt, 0] \end{equation}
The regret term gives a negative signal to the agent when there is opportunity cost due to the unsold capacity, thus correcting the agent’s actions. This term also introduces an increased influence of the competition between agents in learning. By minimizing the regret, the agents drive the bid prices closer to the marginal generation cost, which drives the market price down.
The reward of agent \(i\) at time-step \(t\) is defined by the equation below.
\begin{equation} R_{i,t} = \pi_{i,t} + \beta cm_{i,t} \end{equation}
Here, \(\beta\) is the regret scaling factor to adjust the ratio between profit-maximizing and regret-minimizing learning. \(\beta = 0.2\) was found to work well empirically.
The described reward function has proven to perform well even with many agents and to accelerate learning convergence. This is because minimizing the regret term drives the overall system to equilibrium. At a point close to the equilibrium point, the average reward of all agents would converge to a constant value since further policy changes would not lead to an additional reduction in regrets or an increase in profits. Therefore, the average reward value can also be a good indicator of learning performance and convergence.
[ ]:
"""
scaling = 0.1 / unit.max_power
regret_scale = 0.2
reward = float(profit - regret_scale * opportunity_cost) * scaling
"""
3.5 Start the simulation#
We are almost done with all the changes to actually be able to make ASSUME learn here in google colab. If you would rather like to load our pretrained strategies, we need a function for loading parameters, which can be found below.
[ ]:
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def load_actor_params(self, load_path):
"""
Load actor parameters
"""
directory = f"{load_path}/actors/actor_{self.unit_id}.pt"
params = th.load(directory, map_location=self.device)
self.actor = self.actor_architecture_class(
obs_dim=self.obs_dim,
act_dim=self.act_dim,
float_type=self.float_type,
unique_obs_dim=self.unique_obs_dim,
num_timeseries_obs_dim=self.num_timeseries_obs_dim,
).to(self.device)
self.actor.load_state_dict(params["actor"])
if self.learning_mode:
self.actor_target = self.actor_architecture_class(
obs_dim=self.obs_dim,
act_dim=self.act_dim,
float_type=self.float_type,
unique_obs_dim=self.unique_obs_dim,
num_timeseries_obs_dim=self.num_timeseries_obs_dim,
).to(self.device)
self.actor_target.load_state_dict(params["actor_target"])
self.actor_target.eval()
self.actor.optimizer.load_state_dict(params["actor_optimizer"])
To control the learning process, the config file determines the parameters of the learning algorithm. As we want to temper with these values in the notebook we will overwrite the learning config in the next cell and then load it into our world.
[ ]:
learning_config = {
"continue_learning": False,
"trained_policies_save_path": None,
"max_bid_price": 100,
"algorithm": "matd3",
"learning_rate": 0.001,
"training_episodes": 10,
"episodes_collecting_initial_experience": 3,
"train_freq": "24h",
"gradient_steps": -1,
"batch_size": 256,
"gamma": 0.99,
"device": "cpu",
"noise_sigma": 0.1,
"noise_scale": 1,
"noise_dt": 1,
"validation_episodes_interval": 5,
}
[ ]:
# Read the YAML file
with open(f"{inputs_path}/example_02a/config.yaml") as file:
data = yaml.safe_load(file)
# store our modifications to the config file
data["base"]["learning_mode"] = True
data["base"]["learning_config"] = learning_config
# Write the modified data back to the file
with open(f"{inputs_path}/example_02a/config.yaml", "w") as file:
yaml.safe_dump(data, file)
In order to let the simulation run with the integrated learning we need to touch up the main file that runs it in the following way.
In the following cell, we let the example run in case 1 of [1], where one big reinforcement learning power plant exists that technically can exert max power.
[1] Harder, N.; Qussous, R.; Weidlich, A. Fit for purpose: Modeling wholesale electricity markets realistically with multi-agent deep reinforcement learning. Energy and AI 2023. 14. 100295. https://doi.org/10.1016/j.egyai.2023.100295.
[ ]:
log = logging.getLogger(__name__)
csv_path = "outputs"
os.makedirs("local_db", exist_ok=True)
if __name__ == "__main__":
db_uri = "sqlite:///local_db/assume_db.db"
scenario = "example_02a"
study_case = "base"
# create world
world = World(database_uri=db_uri, export_csv_path=csv_path)
# we import our defined bidding strategy class including the learning into the world bidding strategies
# in the example files we provided the name of the learning bidding strategies in the input csv in "pp_learning"
# hence we define this strategy to be the one of the learning class
world.bidding_strategies["pp_learning"] = RLStrategy
# then we load the scenario specified above from the respective input files
load_scenario_folder(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# run learning if learning mode is enabled
# needed as we simulate the modelling horizon multiple times to train reinforcement learning run_learning( world, inputs_path=input_path, scenario=scenario, study_case=study_case, )
if world.learning_config.get("learning_mode", False):
run_learning(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# after the learning is done we make a normal run of the simulation, which equals a test run
world.run()
In comparison, the following cell executes example case 2 of [1] where the same capacity of the reinforcement power plant in case 1 is divided into five reinforcement learning power plants, which hence cannot exert market power anymore.
[ ]:
log = logging.getLogger(__name__)
csv_path = "outputs"
os.makedirs("local_db", exist_ok=True)
if __name__ == "__main__":
db_uri = "sqlite:///local_db/assume_db.db"
scenario = "example_02b"
study_case = "base"
# create world
world = World(database_uri=db_uri, export_csv_path=csv_path)
# we import our defined bidding strategy class including the learning into the world bidding strategies
# in the example files we provided the name of the learning bidding strategies in the input csv in "pp_learning"
# hence we define this strategy to be the one of the learning class
world.bidding_strategies["pp_learning"] = RLStrategy
# then we load the scenario specified above from the respective input files
load_scenario_folder(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# run learning if learning mode is enabled
# needed as we simulate the modelling horizon multiple times to train reinforcement learning run_learning( world, inputs_path=input_path, scenario=scenario, study_case=study_case, )
if world.learning_config.get("learning_mode", False):
run_learning(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# after the learning is done we make a normal run of the simulation, which equals a test run
world.run()
The following simulation represents case 3, respectively.
[ ]:
log = logging.getLogger(__name__)
csv_path = "outputs"
os.makedirs("local_db", exist_ok=True)
if __name__ == "__main__":
db_uri = "sqlite:///local_db/assume_db.db"
scenario = "example_02c"
study_case = "base"
# create world
world = World(database_uri=db_uri, export_csv_path=csv_path)
# we import our defined bidding strategy class including the learning into the world bidding strategies
# in the example files we provided the name of the learning bidding strategies in the input csv in "pp_learning"
# hence we define this strategy to be the one of the learning class
world.bidding_strategies["pp_learning"] = RLStrategy
# then we load the scenario specified above from the respective input files
load_scenario_folder(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# run learning if learning mode is enabled
# needed as we simulate the modelling horizon multiple times to train reinforcement learning run_learning( world, inputs_path=input_path, scenario=scenario, study_case=study_case, )
if world.learning_config.get("learning_mode", False):
run_learning(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# after the learning is done we make a normal run of the simulation, which equals a test run
world.run()
Result Plotting#
[ ]:
!pip install matplotlib
[ ]:
import os
from functools import partial
import matplotlib.pyplot as plt
from sqlalchemy import create_engine
os.makedirs("outputs", exist_ok=True)
db_uri = "sqlite:///local_db/assume_db.db"
engine = create_engine(db_uri)
sql = """
SELECT ident, simulation,
sum(round(CAST(value AS numeric), 2)) FILTER (WHERE variable = 'total_cost') as total_cost,
sum(round(CAST(value AS numeric), 2)*1000) FILTER (WHERE variable = 'total_volume') as total_volume,
sum(round(CAST(value AS numeric), 2)) FILTER (WHERE variable = 'avg_price') as average_cost
FROM kpis
where variable in ('total_cost', 'total_volume', 'avg_price')
and simulation in ('example_02a_base', 'example_02b_base', 'example_02c_base')
group by simulation, ident ORDER BY simulation
"""
kpis = pd.read_sql(sql, engine)
kpis
[ ]:
# sort the dataframe to have sho, bo and lo case in the right order
# sort kpis in the order sho, bo, lo
kpis = kpis.sort_values(
by="simulation",
# key=lambda x: x.map({"example_02a": 1, "example_02b": 2, "example_02c": 3}),
)
kpis["total_volume"] /= 1e9
kpis["total_cost"] /= 1e6
savefig = partial(plt.savefig, transparent=False, bbox_inches="tight")
xticks = kpis["simulation"].unique()
plt.style.use("seaborn-v0_8")
fig, ax = plt.subplots(1, 1, figsize=(10, 6))
ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax.
width = 0.4
kpis.total_volume.plot(kind="bar", ax=ax, width=width, position=1, color="royalblue")
kpis.total_cost.plot(kind="bar", ax=ax2, width=width, position=0, color="green")
# set x-achxis limits
ax.set_xlim(-0.6, len(kpis["simulation"]) - 0.4)
# set y-achxis limits
ax.set_ylim(0, max(kpis.total_volume) * 1.1 + 0.1)
ax2.set_ylim(0, max(kpis.total_cost) * 1.1 + 0.1)
ax.set_ylabel("Total Volume (GWh)")
ax2.set_ylabel("Total Cost (M€)")
ax.set_xticklabels(xticks, rotation=45)
ax.set_xlabel("Simulation")
ax.legend(["Total Volume"], loc="upper left")
ax2.legend(["Total Cost"], loc="upper right")
plt.title("Total Volume and Total Cost for each Simulation")
plt.show()
[ ]:
sql = """
SELECT
product_start AS "time",
price AS "Price",
simulation AS "simulation",
node
FROM market_meta
WHERE simulation in ('example_02a_base', 'example_02b_base', 'example_02c_base') AND market_id in ('EOM')
GROUP BY market_id, simulation, product_start, price, node
ORDER BY product_start, node
"""
df = pd.read_sql(sql, engine)
df
[ ]:
# Convert the 'time' column to datetime
df["time"] = pd.to_datetime(df["time"])
# Plot the data
plt.figure(figsize=(14, 7))
# Loop through each simulation and plot
for simulation in df["simulation"].unique():
subset = df[df["simulation"] == simulation]
plt.plot(subset["time"], subset["Price"], label=simulation)
plt.title("Price over Time for Different Simulations")
plt.xlabel("Time")
plt.ylabel("Price")
plt.legend(title="Simulation")
plt.show()
[ ]:
# @title Complete notebook code with tasks already filled in
# this cell is used to display the image in the notebook when using colab
# or running the notebook locally
import importlib.util
import os
# Check if 'google.colab' is available
IN_COLAB = importlib.util.find_spec("google.colab") is not None
if IN_COLAB:
!pip install 'assume-framework[learning]'
# Colab currently has issues with pyomo version 6.8.2, causing the notebook to crash
# Installing an older version resolves this issue. This should only be considered a temporary fix.
!pip install pyomo==6.8.0
!git clone --depth=1 https://github.com/assume-framework/assume.git assume-repo
!cd assume-repo && assume -s example_01b -db "sqlite:///./examples/local_db/assume_db_example_01b.db"
colab_inputs_path = "assume-repo/examples/inputs"
local_inputs_path = "../inputs"
inputs_path = colab_inputs_path if IN_COLAB else local_inputs_path
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
import numpy as np
import pandas as pd
import torch as th
import yaml
from assume import World
from assume.common.base import LearningStrategy, SupportsMinMax
from assume.common.market_objects import MarketConfig, Orderbook, Product
from assume.reinforcement_learning.algorithms import actor_architecture_aliases
from assume.reinforcement_learning.learning_utils import NormalActionNoise
from assume.scenario.loader_csv import load_scenario_folder, run_learning
class RLStrategy(LearningStrategy):
"""
Reinforcement Learning Strategy
"""
def __init__(self, *args, **kwargs):
super().__init__(obs_dim=50, act_dim=2, unique_obs_dim=2, *args, **kwargs)
self.unit_id = kwargs["unit_id"]
# defines bounds of actions space
self.max_bid_price = kwargs.get("max_bid_price", 100)
self.max_demand = kwargs.get("max_demand", 10e3)
# tells us whether we are training the agents or just executing per-learnind strategies
self.learning_mode = kwargs.get("learning_mode", False)
self.perform_evaluation = kwargs.get("perform_evaluation", False)
# based on learning config define algorithm configuration
self.algorithm = kwargs.get("algorithm", "matd3")
actor_architecture = kwargs.get("actor_architecture", "mlp")
# define the architecture of the actor neural network
# if you use many time series niputs you might want to use the LSTM instead of the MLP for example
if actor_architecture in actor_architecture_aliases.keys():
self.actor_architecture_class = actor_architecture_aliases[
actor_architecture
]
else:
raise ValueError(
f"Policy '{actor_architecture}' unknown. Supported architectures are {list(actor_architecture_aliases.keys())}"
)
# sets the devide of the actor network
device = kwargs.get("device", "cpu")
self.device = th.device(device if th.cuda.is_available() else "cpu")
if not self.learning_mode:
self.device = th.device("cpu")
# future: add option to choose between float16 and float32
# float_type = kwargs.get("float_type", "float32")
self.float_type = th.float
# for definition of observation space
self.foresight = kwargs.get("foresight", 24)
if self.learning_mode:
self.learning_role = None
self.collect_initial_experience_mode = kwargs.get(
"episodes_collecting_initial_experience", True
)
self.action_noise = NormalActionNoise(
mu=0.0,
sigma=kwargs.get("noise_sigma", 0.1),
action_dimension=self.act_dim,
scale=kwargs.get("noise_scale", 1.0),
dt=kwargs.get("noise_dt", 1.0),
)
elif Path(load_path=kwargs["trained_policies_save_path"]).is_dir():
self.load_actor_params(load_path=kwargs["trained_policies_save_path"])
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_bids(
self,
unit: SupportsMinMax,
market_config: MarketConfig,
product_tuples: list[Product],
**kwargs,
) -> Orderbook:
"""
Calculate bids for a unit -> STEP 1 & 2
"""
start = product_tuples[0][0]
end = product_tuples[0][1]
# get technical bounds for the unit output from the unit
min_power, max_power = unit.calculate_min_max_power(start, end)
min_power = min_power[start]
max_power = max_power[start]
# =============================================================================
# 1. Get the Observations, which are the basis of the action decision
# =============================================================================
next_observation = self.create_observation(
unit=unit,
market_id=market_config.market_id,
start=start,
end=end,
)
# =============================================================================
# 2. Get the Actions, based on the observations
# =============================================================================
actions, noise = self.get_actions(next_observation)
bids = actions
bids = self.remove_empty_bids(bids)
return bids
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_reward(
self,
unit,
marketconfig: MarketConfig,
orderbook: Orderbook,
):
"""
Calculate reward
"""
return None
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def create_observation(
self,
unit: SupportsMinMax,
market_id: str,
start: datetime,
end: datetime,
):
"""
Create observation
"""
# end includes the end of the last product, to get the last products' start time we deduct the frequency once
end_excl = end - unit.index.freq
# get the forecast length depending on the time unit considered in the modelled unit
forecast_len = pd.Timedelta((self.foresight - 1) * unit.index.freq)
# =============================================================================
# 1.1 Get the Observations, which are the basis of the action decision
# =============================================================================
# residual load forecast
scaling_factor_res_load = self.max_demand
# price forecast
scaling_factor_price = self.max_bid_price
# total capacity
scaling_factor_total_capacity = unit.max_power
# marginal cost
scaling_factor_marginal_cost = self.max_bid_price
# checks if we are at the end of the simulation horizon, since we need to change the forecast then
# for residual load and price forecast and scale them
if (
end_excl + forecast_len
> unit.forecaster[f"residual_load_{market_id}"].index[-1]
):
scaled_res_load_forecast = (
unit.forecaster[f"residual_load_{market_id}"].loc[start:]
/ scaling_factor_res_load
)
scaled_res_load_forecast = np.concatenate(
[
scaled_res_load_forecast,
unit.forecaster[f"residual_load_{market_id}"].iloc[
: self.foresight - len(scaled_res_load_forecast)
],
]
)
else:
scaled_res_load_forecast = (
unit.forecaster[f"residual_load_{market_id}"].loc[
start : end_excl + forecast_len
]
/ scaling_factor_res_load
)
if end_excl + forecast_len > unit.forecaster[f"price_{market_id}"].index[-1]:
scaled_price_forecast = (
unit.forecaster[f"price_{market_id}"].loc[start:] / scaling_factor_price
)
scaled_price_forecast = np.concatenate(
[
scaled_price_forecast,
unit.forecaster[f"price_{market_id}"].iloc[
: self.foresight - len(scaled_price_forecast)
],
]
)
else:
scaled_price_forecast = (
unit.forecaster[f"price_{market_id}"].loc[
start : end_excl + forecast_len
]
/ scaling_factor_price
)
# get last accepted bid volume and the current marginal costs of the unit
current_volume = unit.get_output_before(start)
current_costs = unit.calculate_marginal_cost(start, current_volume)
# scale unit outputs
scaled_total_capacity = current_volume / scaling_factor_total_capacity
scaled_marginal_cost = current_costs / scaling_factor_marginal_cost
# concat all obsverations into one array
observation = np.concatenate(
[
scaled_res_load_forecast,
scaled_price_forecast,
np.array([scaled_total_capacity, scaled_marginal_cost]),
]
)
# transfer array to GPU for NN processing
observation = (
th.tensor(observation, dtype=self.float_type)
.to(self.device, non_blocking=True)
.view(-1)
)
return observation.detach().clone()
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def get_actions(self, next_observation):
"""
Get actions
"""
# distinction whether we are in learning mode or not to handle exploration realised with noise
if self.learning_mode:
# if we are in learning mode, the first x episodes we want to explore the entire action space
# to get a good initial experience in the area around the costs of the agent
if self.collect_initial_experience_mode:
# define current action as solely noise
noise = (
th.normal(
mean=0.0, std=0.2, size=(1, self.act_dim), dtype=self.float_type
)
.to(self.device)
.squeeze()
)
# =============================================================================
# 2.1 Get Actions and handle exploration
# =============================================================================
# ==> YOUR CODE HERE
base_bid = next_observation[-1] # = marginal_costs
# add noise to the last dimension of the observation
# needs to be adjusted if observation space is changed, because only makes sense
# if the last dimension of the observation space are the marginal cost
curr_action = noise + base_bid.clone().detach()
else:
# if we are not in the initial exploration phase we chose the action with the actor neuronal net
# and add noise to the action
curr_action = self.actor(next_observation).detach()
noise = th.tensor(
self.action_noise.noise(), device=self.device, dtype=self.float_type
)
curr_action += noise
else:
# if we are not in learning mode we just use the actor neuronal net to get the action without adding noise
curr_action = self.actor(next_observation).detach()
noise = th.zeros(self.act_dim, dtype=self.float_type)
curr_action = curr_action.clamp(-1, 1)
return curr_action, noise
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_bids(
self,
unit: SupportsMinMax,
market_config: MarketConfig,
product_tuples: list[Product],
**kwargs,
) -> Orderbook:
"""
Calculate bids for a unit
"""
bid_quantity_inflex, bid_price_inflex = 0, 0
bid_quantity_flex, bid_price_flex = 0, 0
start = product_tuples[0][0]
end = product_tuples[0][1]
# get technical bounds for the unit output from the unit
min_power, max_power = unit.calculate_min_max_power(start, end)
min_power = min_power[0]
max_power = max_power[0]
# =============================================================================
# 1. Get the Observations, which are the basis of the action decision
# =============================================================================
next_observation = self.create_observation(
unit=unit,
market_id=market_config.market_id,
start=start,
end=end,
)
# =============================================================================
# 2. Get the Actions, based on the observations
# =============================================================================
actions, noise = self.get_actions(next_observation)
# =============================================================================
# 3.2 Transform Actions into bids
# =============================================================================
# ==> YOUR CODE HERE
# actions are in the range [0,1], we need to transform them into actual bids
# we can use our domain knowledge to guide the bid formulation
# calculate actual bids
# rescale actions to actual prices
bid_prices = actions * self.max_bid_price
# calculate inflexible part of the bid
bid_quantity_inflex = min_power
bid_price_inflex = min(bid_prices)
# calculate flexible part of the bid
bid_quantity_flex = max_power - bid_quantity_inflex
bid_price_flex = max(bid_prices)
# actually formulate bids in orderbook format
bids = [
{
"start_time": start,
"end_time": end,
"only_hours": None,
"price": bid_price_inflex,
"volume": bid_quantity_inflex,
"node": unit.node,
},
{
"start_time": start,
"end_time": end,
"only_hours": None,
"price": bid_price_flex,
"volume": bid_quantity_flex,
"node": unit.node,
},
]
# store results in unit outputs as lists to be written to the buffer for learning
unit.outputs["rl_observations"].append(next_observation)
unit.outputs["rl_actions"].append(actions)
# store results in unit outputs as series to be written to the database by the unit operator
unit.outputs["actions"].at[start] = actions
unit.outputs["exploration_noise"].at[start] = noise
return bids
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def calculate_reward(
self,
unit,
marketconfig: MarketConfig,
orderbook: Orderbook,
):
"""
Calculate reward
"""
# =============================================================================
# 3. Calculate Reward
# =============================================================================
# function is called after the market is cleared and we get the market feedback,
# so we can calculate the profit
product_type = marketconfig.product_type
profit = 0
reward = 0
opportunity_cost = 0
costs = 0
# iterate over all orders in the orderbook, to calculate order specific profit
for order in orderbook:
start = order["start_time"]
end = order["end_time"]
# end includes the end of the last product, to get the last products' start time we deduct the frequency once
end_excl = end - unit.index.freq
# depending on whether the unit calaculates marginal costs we take costs
marginal_cost = unit.calculate_marginal_cost(
start, unit.outputs[product_type].at[start]
)
duration = (end - start) / timedelta(hours=1)
# calculate profit as income - running_cost from this event
order_profit = order["accepted_price"] * order["accepted_volume"] * duration
order_cost = marginal_cost * order["accepted_volume"] * duration
# collect profit and opportunity cost for all orders
profit += order_profit
costs += order_cost
# calculate opportunity cost
# as the loss of income we have because we are not running at full power
opportunity_cost = (
(order["accepted_price"] - marginal_cost)
* (unit.max_power - unit.outputs[product_type].loc[start:end_excl]).sum()
* duration
)
# if our opportunity costs are negative, we did not miss an opportunity to earn money and we set them to 0
opportunity_cost = max(opportunity_cost, 0)
# consideration of start-up costs, which are evenly divided between the
# upward and downward regulation events
if (
unit.outputs[product_type].at[start] != 0
and unit.outputs[product_type].loc[start - unit.index.freq] == 0
):
costs += unit.hot_start_cost / 2
elif (
unit.outputs[product_type].at[start] == 0
and unit.outputs[product_type].loc[start - unit.index.freq] != 0
):
costs += unit.hot_start_cost / 2
profit = profit - costs
# =============================================================================
# =============================================================================
# ==> YOUR CODE HERE
# The straight forward implementation would be reward = profit, yet we would like to give the agent more guidance
# in the learning process, so we add a regret term to the reward, which is the opportunity cost
# define the reward and scale it
scaling = 0.1 / unit.max_power
regret_scale = 0.2
reward = float(profit - regret_scale * opportunity_cost) * scaling
# store results in unit outputs which are written to database by unit operator
unit.outputs["profit"].loc[start:end_excl] += profit
unit.outputs["reward"].loc[start:end_excl] = reward
unit.outputs["regret"].loc[start:end_excl] = regret_scale * opportunity_cost
unit.outputs["total_costs"].loc[start:end_excl] = costs
unit.outputs["rl_rewards"].append(reward)
# we define the class again and inherit from the initial class just to add the additional method to the original class
# this is a workaround to have different methods of the class in different cells
# which is good for the purpose of this tutorial
# however, you should have all functions in a single class when using this example in .py files
class RLStrategy(RLStrategy):
def load_actor_params(self, load_path):
"""
Load actor parameters
"""
directory = f"{load_path}/actors/actor_{self.unit_id}.pt"
params = th.load(directory, map_location=self.device)
self.actor = self.actor_architecture_class(
obs_dim=self.obs_dim,
act_dim=self.act_dim,
float_type=self.float_type,
unique_obs_dim=self.unique_obs_dim,
num_timeseries_obs_dim=self.num_timeseries_obs_dim,
).to(self.device)
self.actor.load_state_dict(params["actor"])
if self.learning_mode:
self.actor_target = self.actor_architecture_class(
obs_dim=self.obs_dim,
act_dim=self.act_dim,
float_type=self.float_type,
unique_obs_dim=self.unique_obs_dim,
num_timeseries_obs_dim=self.num_timeseries_obs_dim,
).to(self.device)
self.actor_target.load_state_dict(params["actor_target"])
self.actor_target.eval()
self.actor.optimizer.load_state_dict(params["actor_optimizer"])
learning_config = {
"continue_learning": False,
"trained_policies_save_path": None,
"max_bid_price": 100,
"algorithm": "matd3",
"learning_rate": 0.001,
"training_episodes": 2,
"episodes_collecting_initial_experience": 1,
"train_freq": "24h",
"gradient_steps": -1,
"batch_size": 256,
"gamma": 0.99,
"device": "cpu",
"noise_sigma": 0.1,
"noise_scale": 1,
"noise_dt": 1,
"validation_episodes_interval": 5,
}
# Read the YAML file
with open(f"{inputs_path}/example_02a/config.yaml") as file:
data = yaml.safe_load(file)
# store our modifications to the config file
data["base"]["learning_mode"] = True
data["base"]["learning_config"] = learning_config
# Write the modified data back to the file
with open(f"{inputs_path}/example_02a/config.yaml", "w") as file:
yaml.safe_dump(data, file)
# Read the YAML file
with open(f"{inputs_path}/example_02b/config.yaml") as file:
data = yaml.safe_load(file)
# store our modifications to the config file
data["base"]["learning_mode"] = True
data["base"]["learning_config"] = learning_config
# Write the modified data back to the file
with open(f"{inputs_path}/example_02b/config.yaml", "w") as file:
yaml.safe_dump(data, file)
# Read the YAML file
with open(f"{inputs_path}/example_02c/config.yaml") as file:
data = yaml.safe_load(file)
# store our modifications to the config file
data["base"]["learning_mode"] = True
data["base"]["learning_config"] = learning_config
# Write the modified data back to the file
with open(f"{inputs_path}/example_02c/config.yaml", "w") as file:
yaml.safe_dump(data, file)
log = logging.getLogger(__name__)
csv_path = "outputs"
os.makedirs("local_db", exist_ok=True)
if __name__ == "__main__":
db_uri = "sqlite:///local_db/assume_db.db"
scenario = "example_02a"
study_case = "base"
# create world
world = World(database_uri=db_uri, export_csv_path=csv_path)
# we import our defined bidding strategy class including the learning into the world bidding strategies
# in the example files we provided the name of the learning bidding strategies in the input csv in "pp_learning"
# hence we define this strategy to be the one of the learning class
world.bidding_strategies["pp_learning"] = RLStrategy
# then we load the scenario specified above from the respective input files
load_scenario_folder(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# run learning if learning mode is enabled
# needed as we simulate the modelling horizon multiple times to train reinforcement learning run_learning( world, inputs_path=input_path, scenario=scenario, study_case=study_case, )
if world.learning_config.get("learning_mode", False):
run_learning(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# after the learning is done we make a normal run of the simulation, which equals a test run
world.run()
log = logging.getLogger(__name__)
csv_path = "outputs"
os.makedirs("local_db", exist_ok=True)
if __name__ == "__main__":
db_uri = "sqlite:///local_db/assume_db.db"
scenario = "example_02b"
study_case = "base"
# create world
world = World(database_uri=db_uri, export_csv_path=csv_path)
# we import our defined bidding strategy class including the learning into the world bidding strategies
# in the example files we provided the name of the learning bidding strategies in the input csv in "pp_learning"
# hence we define this strategy to be the one of the learning class
world.bidding_strategies["pp_learning"] = RLStrategy
# then we load the scenario specified above from the respective input files
load_scenario_folder(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# run learning if learning mode is enabled
# needed as we simulate the modelling horizon multiple times to train reinforcement learning run_learning( world, inputs_path=input_path, scenario=scenario, study_case=study_case, )
if world.learning_config.get("learning_mode", False):
run_learning(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# after the learning is done we make a normal run of the simulation, which equals a test run
world.run()
log = logging.getLogger(__name__)
csv_path = "outputs"
os.makedirs("local_db", exist_ok=True)
if __name__ == "__main__":
db_uri = "sqlite:///local_db/assume_db.db"
scenario = "example_02c"
study_case = "base"
# create world
world = World(database_uri=db_uri, export_csv_path=csv_path)
# we import our defined bidding strategy class including the learning into the world bidding strategies
# in the example files we provided the name of the learning bidding strategies in the input csv in "pp_learning"
# hence we define this strategy to be the one of the learning class
world.bidding_strategies["pp_learning"] = RLStrategy
# then we load the scenario specified above from the respective input files
load_scenario_folder(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# run learning if learning mode is enabled
# needed as we simulate the modelling horizon multiple times to train reinforcement learning run_learning( world, inputs_path=input_path, scenario=scenario, study_case=study_case, )
if world.learning_config.get("learning_mode", False):
run_learning(
world,
inputs_path=inputs_path,
scenario=scenario,
study_case=study_case,
)
# after the learning is done we make a normal run of the simulation, which equals a test run
world.run()
!pip install matplotlib
import os
from functools import partial
import matplotlib.pyplot as plt
from sqlalchemy import create_engine
os.makedirs("outputs", exist_ok=True)
db_uri = "sqlite:///local_db/assume_db.db"
engine = create_engine(db_uri)
sql = """
SELECT ident, simulation,
sum(round(CAST(value AS numeric), 2)) FILTER (WHERE variable = 'total_cost') as total_cost,
sum(round(CAST(value AS numeric), 2)*1000) FILTER (WHERE variable = 'total_volume') as total_volume,
sum(round(CAST(value AS numeric), 2)) FILTER (WHERE variable = 'avg_price') as average_cost
FROM kpis
where variable in ('total_cost', 'total_volume', 'avg_price')
and simulation in ('example_02a_base', 'example_02b_base', 'example_02c_base')
group by simulation, ident ORDER BY simulation
"""
kpis = pd.read_sql(sql, engine)
# sort the dataframe to have sho, bo and lo case in the right order
# sort kpis in the order sho, bo, lo
kpis = kpis.sort_values(
by="simulation",
# key=lambda x: x.map({"example_02a": 1, "example_02b": 2, "example_02c": 3}),
)
kpis["total_volume"] /= 1e9
kpis["total_cost"] /= 1e6
savefig = partial(plt.savefig, transparent=False, bbox_inches="tight")
xticks = kpis["simulation"].unique()
plt.style.use("seaborn-v0_8")
fig, ax = plt.subplots(1, 1, figsize=(10, 6))
ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax.
width = 0.4
kpis.total_volume.plot(kind="bar", ax=ax, width=width, position=1, color="royalblue")
kpis.total_cost.plot(kind="bar", ax=ax2, width=width, position=0, color="green")
# set x-achxis limits
ax.set_xlim(-0.6, len(kpis["simulation"]) - 0.4)
# set y-achxis limits
ax.set_ylim(0, max(kpis.total_volume) * 1.1 + 0.1)
ax2.set_ylim(0, max(kpis.total_cost) * 1.1 + 0.1)
ax.set_ylabel("Total Volume (GWh)")
ax2.set_ylabel("Total Cost (M€)")
ax.set_xticklabels(xticks, rotation=45)
ax.set_xlabel("Simulation")
ax.legend(["Total Volume"], loc="upper left")
ax2.legend(["Total Cost"], loc="upper right")
plt.title("Total Volume and Total Cost for each Simulation")
sql = """
SELECT
product_start AS "time",
price AS "Price",
simulation AS "simulation",
node
FROM market_meta
WHERE simulation in ('example_02a_base', 'example_02b_base', 'example_02c_base') AND market_id in ('EOM')
GROUP BY market_id, simulation, product_start, price, node
ORDER BY product_start, node
"""
df = pd.read_sql(sql, engine)
df
# Convert the 'time' column to datetime
df["time"] = pd.to_datetime(df["time"])
# Plot the data
plt.figure(figsize=(14, 7))
# Loop through each simulation and plot
for simulation in df["simulation"].unique():
subset = df[df["simulation"] == simulation]
plt.plot(subset["time"], subset["Price"], label=simulation)
plt.title("Price over Time for Different Simulations")
plt.xlabel("Time")
plt.ylabel("Price")
plt.legend(title="Simulation")
plt.show()
[ ]: