{
"cells": [
{
"cell_type": "markdown",
"source": [
"Some imports just to render the results in colab"
],
"metadata": {
"id": "DvGm0V_AWiEw"
}
},
{
"cell_type": "code",
"source": [
"!pip install gymnasium"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "m3Fcej5GUnbX",
"outputId": "f5e15855-1e2f-4b06-fb41-cc86007d0f93"
},
"execution_count": 1,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Requirement already satisfied: gymnasium in /usr/local/lib/python3.10/dist-packages (0.29.1)\n",
"Requirement already satisfied: numpy>=1.21.0 in /usr/local/lib/python3.10/dist-packages (from gymnasium) (1.25.2)\n",
"Requirement already satisfied: cloudpickle>=1.2.0 in /usr/local/lib/python3.10/dist-packages (from gymnasium) (2.2.1)\n",
"Requirement already satisfied: typing-extensions>=4.3.0 in /usr/local/lib/python3.10/dist-packages (from gymnasium) (4.10.0)\n",
"Requirement already satisfied: farama-notifications>=0.0.1 in /usr/local/lib/python3.10/dist-packages (from gymnasium) (0.0.4)\n"
]
}
]
},
{
"cell_type": "code",
"source": [
"from gymnasium.wrappers import RecordVideo\n",
"import glob\n",
"import io\n",
"import base64\n",
"from IPython.display import HTML\n",
"from IPython import display\n",
"\n",
"\"\"\"\n",
"Utility functions to enable video recording of gym environment\n",
"and displaying it.\n",
"To enable video, just do \"env = wrap_env(env)\"\"\n",
"\"\"\"\n",
"\n",
"def show_video():\n",
" mp4list = glob.glob('video/*.mp4')\n",
" if len(mp4list) > 0:\n",
" mp4 = mp4list[0]\n",
" video = io.open(mp4, 'r+b').read()\n",
" encoded = base64.b64encode(video)\n",
" display.display(HTML(data=''''''.format(encoded.decode('ascii'))))\n",
" else:\n",
" print(\"Could not find video\")"
],
"metadata": {
"id": "Q5PGqOlHWjC5"
},
"execution_count": 2,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "ATzH4Ql0Lhvo"
},
"source": [
"# Function Approximation (1)\n",
"## Tile Coding in the Mountain Car problem\n",
"\n",
"In this notebook we will show benefits of FA in the Mountain Car problem\n",
"\n",
"The Goal of Mountain Car problem is to reach the top of a hill when the obvious solution of accelerating does not work when starting form the bottom of the valley.\n",
"\n",
"https://gym.openai.com/envs/MountainCar-v0/"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"id": "NaYswgHeLhvq"
},
"outputs": [],
"source": [
"import gymnasium as gym\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"\n",
"env = gym.make(\"MountainCar-v0\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "k9w5IYGJLhvt"
},
"source": [
"From the previous notebook, we can obtain the action and state space"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "U_QvDHr8Lhvv",
"outputId": "4c955cc4-b948-4794-be61-c168f479cc1f"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Actions : Discrete(3)\n",
"Variables: Box([-1.2 -0.07], [0.6 0.07], (2,), float32)\n",
"Max. var: [0.6 0.07]\n",
"Min. var: [-1.2 -0.07]\n"
]
}
],
"source": [
"print('Actions : ',env.action_space)\n",
"print('Variables: ',env.observation_space)\n",
"print('Max. var: ',env.observation_space.high)\n",
"print('Min. var: ',env.observation_space.low)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "DH_8llutLhvw"
},
"source": [
"Variables correspond to position in x axis and speed. Actions correspond to forward and backward acceleration respectively.\n",
"\n",
"Let's see performance of random behaviour"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 508
},
"id": "FCqI1P6hLhvw",
"outputId": "92e5e125-3fce-4e66-daa0-621775a0869a"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Moviepy - Building video /content/video/rl-video-episode-0.mp4.\n",
"Moviepy - Writing video /content/video/rl-video-episode-0.mp4\n",
"\n"
]
},
{
"output_type": "stream",
"name": "stderr",
"text": []
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"Moviepy - Done !\n",
"Moviepy - video ready /content/video/rl-video-episode-0.mp4\n"
]
},
{
"output_type": "display_data",
"data": {
"text/plain": [
""
],
"text/html": [
""
]
},
"metadata": {}
}
],
"source": [
"env.close()\n",
"#env = gym.make(\"MountainCar-v0\")\n",
"env = RecordVideo(gym.make('MountainCar-v0',render_mode='rgb_array'),video_folder='video')\n",
"observation, _ = env.reset()\n",
"\n",
"for _ in range(300):\n",
" env.render()\n",
" action = env.action_space.sample() # this takes random actions\n",
" observation, reward, terminated, truncated, info = env.step(action)\n",
" done = truncated or terminated\n",
" if done:\n",
" break\n",
"env.close()\n",
"show_video()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "WJcrwn39Lhvx"
},
"source": [
"\n",
"For this problem we will define Tile Coding. The following Class define a TileCoding. Each variable will be discretized using *numTilings* grids, each one with *tilesPerTiling x tilesPerTiling* dimension. Tiles are overlaping in the usual way:\n",
"\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"id": "OsXBvwYaLhvy"
},
"outputs": [],
"source": [
"class Tilecoder:\n",
"\n",
" def __init__(self, numTilings, tilesPerTiling):\n",
" # Set max value for normalization of inputs\n",
" self.maxNormal = 1\n",
" self.maxVal = env.observation_space.high\n",
" self.minVal = env.observation_space.low\n",
" self.numTilings = numTilings\n",
" self.tilesPerTiling = tilesPerTiling\n",
" self.dim = len(self.maxVal)\n",
" self.numTiles = (self.tilesPerTiling**self.dim) * self.numTilings\n",
" self.actions = env.action_space.n\n",
" self.n = self.numTiles * self.actions\n",
" self.tileSize = np.divide(np.ones(self.dim)*self.maxNormal, self.tilesPerTiling-1)\n",
"\n",
" def getFeatures(self, variables):\n",
" # Ensures range is always between 0 and self.maxValue\n",
" values = np.zeros(self.dim)\n",
" for i in range(len(env.observation_space.shape)+1):\n",
" values[i] = self.maxNormal * ((variables[i] - self.minVal[i])/(self.maxVal[i]-self.minVal[i]))\n",
" tileIndices = np.zeros(self.numTilings)\n",
" matrix = np.zeros([self.numTilings,self.dim])\n",
" for i in range(self.numTilings):\n",
" for i2 in range(self.dim):\n",
" matrix[i,i2] = int(values[i2] / self.tileSize[i2] + i / self.numTilings)\n",
" for i in range(1,self.dim):\n",
" matrix[:,i] *= self.tilesPerTiling**i\n",
" for i in range(self.numTilings):\n",
" tileIndices[i] = (i * (self.tilesPerTiling**self.dim) + sum(matrix[i,:]))\n",
" return tileIndices\n",
"\n",
" def oneHotVector(self, features, action):\n",
" oneHot = np.zeros(self.n)\n",
" for i in features:\n",
" index = int(i + (self.numTiles*action))\n",
" oneHot[index] = 1\n",
" return oneHot\n",
"\n",
" def getVal(self, theta, features, action):\n",
" val = 0\n",
" for i in features:\n",
" index = int(i + (self.numTiles*action))\n",
" val += theta[index]\n",
" return val\n",
"\n",
" def getQ(self, features, theta):\n",
" Q = np.zeros(self.actions)\n",
" for i in range(self.actions):\n",
" Q[i] = tile.getVal(theta, features, i)\n",
" return Q\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "7RwwqDVMLhvz"
},
"source": [
"## Q-learning with TileCoding\n",
"\n",
"Let's start defining one function to implement epsilon-greedy procedure and another one to sum long-term reward of the current episode form position *t*"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"id": "KfDwNCIDLhv1"
},
"outputs": [],
"source": [
"def e_greedy_policy(Qs):\n",
" return env.action_space.sample() if (np.random.random() <= epsilon) else np.argmax(Q)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3d6nnayNLhv1"
},
"source": [
"Definition of funtion to collect scores of an episode whith completely greedy policy. Just to compare scores"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"id": "CYbG98YULhv2"
},
"outputs": [],
"source": [
"def rollout(niter):\n",
" G = 0\n",
" for i in range(niter):\n",
" state, _ = env.reset()\n",
" for _ in range(1000):\n",
" F = tile.getFeatures(state)\n",
" Q = tile.getQ(F, theta)\n",
" action = np.argmax(Q)\n",
" state, reward, terminated, truncated, info = env.step(action)\n",
" done = truncated or terminated\n",
" G += reward\n",
" if done: break\n",
" return G/niter\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NdfIqqhwLhv3"
},
"source": [
"Now, we define a TileCoder of 7x14 and apply Q-learning procedure"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "Mw6qKMb3Lhv3",
"outputId": "b89056b7-e0b9-41c9-da81-b87e19780251"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Average reward = -193.92\n",
"Average reward = -155.38\n",
"Average reward = -141.22\n",
"Average reward = -128.45\n",
"Average reward = -133.28\n",
"Average reward = -128.48\n",
"Average reward = -128.71\n",
"Average reward = -127.485\n",
"Average reward = -121.315\n",
"Average reward = -125.105\n"
]
}
],
"source": [
"env = gym.make(\"MountainCar-v0\")\n",
"tile = Tilecoder(7,14) # Definition of tiles (7x (14x14))\n",
"theta = np.random.uniform(-0.001, 0, size=(tile.n)) # Parameters for FA (7x (14x14)) = 1.372 parameters\n",
"\n",
"# Parameters of learning\n",
"alpha = 0.05\n",
"gamma = 1\n",
"numEpisodes = 2000\n",
"epsilon = 0.05\n",
"\n",
"# Variables to collect scores\n",
"rewardTracker = []\n",
"rewardTracker2 = []\n",
"episodeSum = 0\n",
"counter = 0\n",
"\n",
"for episodeNum in range(1,numEpisodes+1):\n",
" G = 0\n",
" state,_ = env.reset()\n",
" while True:\n",
" F = tile.getFeatures(state) # Vector of 1.372 representing state\n",
" Q = tile.getQ(F, theta) # Q-values for given state all actions\n",
" action = e_greedy_policy(Q) # select action with epsilon-greedy procedure\n",
" Qs = Q[action]\n",
" state2, reward, terminated, truncated, info = env.step(action)\n",
" done = truncated or terminated\n",
" G += reward\n",
" if done == True:\n",
" theta += np.multiply((alpha*(reward - Qs)), tile.oneHotVector(F,action))\n",
" episodeSum += G\n",
" rewardTracker.append(G) # Store reward collected\n",
" rewardTracker2.append(rollout(1)) # Store reward collected TESTING with epsilon = 0\n",
" if episodeNum %200 == 0:\n",
" print('Average reward = {}'.format(episodeSum / 200))\n",
" episodeSum = 0\n",
" break\n",
" Q = tile.getQ(tile.getFeatures(state2), theta)\n",
" theta += np.multiply((alpha*(reward - Qs+gamma*np.max(Q))), tile.oneHotVector(F,action))\n",
" state = state2\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2EbGU_OmLhv4"
},
"source": [
"Let's see behaviour learnt"
]
},
{
"cell_type": "code",
"source": [
"env.close()\n",
"#env = gym.make(\"MountainCar-v0\")\n",
"env = RecordVideo(gym.make('MountainCar-v0',render_mode='rgb_array'),video_folder='video')\n",
"\n",
"state, _ = env.reset()\n",
"while True:\n",
" env.render()\n",
" F = tile.getFeatures(state) # Vector of 1.372 representing state\n",
" Q = tile.getQ(F, theta) # Q-values for given state all actions\n",
" action = np.argmax(Q)\n",
" state, reward, terminated, truncated, info = env.step(action)\n",
" done = truncated or terminated\n",
" if done: break\n",
"env.close()\n",
"show_video()\n",
"\n"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 562
},
"id": "gN14SiexanrI",
"outputId": "d6f411d9-6298-467b-9909-bb4a4350dce7"
},
"execution_count": 10,
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": [
"/usr/local/lib/python3.10/dist-packages/gymnasium/wrappers/record_video.py:94: UserWarning: \u001b[33mWARN: Overwriting existing videos at /content/video folder (try specifying a different `video_folder` for the `RecordVideo` wrapper if this is not desired)\u001b[0m\n",
" logger.warn(\n"
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"Moviepy - Building video /content/video/rl-video-episode-0.mp4.\n",
"Moviepy - Writing video /content/video/rl-video-episode-0.mp4\n",
"\n"
]
},
{
"output_type": "stream",
"name": "stderr",
"text": [
" "
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"Moviepy - Done !\n",
"Moviepy - video ready /content/video/rl-video-episode-0.mp4\n"
]
},
{
"output_type": "stream",
"name": "stderr",
"text": [
"\r"
]
},
{
"output_type": "display_data",
"data": {
"text/plain": [
""
],
"text/html": [
""
]
},
"metadata": {}
}
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 430
},
"id": "rOP3nXGeLhv5",
"outputId": "a5bbf2d5-90ba-421d-bdaf-e7d320fd75ec"
},
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": [
"