import tensorflow as tf
from keras import Sequential
from keras.layers import Activation, Dense
from keras.optimizers import Adam
import numpy as np
import math
import random
from collections import deque
!pip install gym
import matplotlib.pyplot as plt
import plotly
import plotly.plotly as py
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected = True)
class DQNN:
def __init__(self,state_size,action_size):
self.state_size = state_size
self.action_size = action_size
self.model = self._build()
self.memory = deque(maxlen=1000)
self.gamma = 0.95
self.epsilon = 0.5
self.epsilon_min = 0.2
self.epsilon_decay = 0.995
def _build(self):
model = Sequential()
model.add(Dense(24, input_dim=self.state_size, activation='relu'))
model.add(Dense(24, activation='relu'))
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss='mse',
optimizer=Adam(lr=0.01))
return model
def act(self,state):
if np.random.rand()<self.epsilon:
return np.random.randint(0,2)
action = self.model.predict(state)
action = np.argmax(action[0])
return action
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def revise(self,minibatchsize):
minibatch = random.sample(self.memory, minibatchsize)
for state, action, reward, next_state,done in minibatch:
target_f = self.model.predict(np.array([(next_state)]))
# if done, make our target reward
target = reward
if not done:
# predict the future discounted reward
#bellman Equation
target = reward + self.gamma * \
np.amax(self.model.predict(np.array([next_state]))[0])
#make the agent to approximately map
#the current state to future discounted reward
#We'll call that target_f
target_f = self.model.predict(np.array([state]))
target_f[0][action] = target
#Train the Neural Net with the state and target_f
self.model.fit(np.array([state]), target_f, epochs=1, verbose=0)
#reduce the randomness of the model gradually
if(self.epsilon>self.epsilon_min):
self.epsilon*=self.epsilon_decay
import gym
from gym import error, spaces, utils
from gym.utils import seeding
class ArrivalSim(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, price):
self.price = price
self.action_space = [0,1] #increase or decrease
# super(CustomEnv, self).__init__()
# # Define action and observation space
# # They must be gym.spaces objects
# # Example when using discrete actions:
# self.action_space = spaces.Discrete(N_DISCRETE_ACTIONS)
# # Example for using image as input:
# self.observation_space = spaces.Box(low=0, high=255,
# shape=(HEIGHT, WIDTH, N_CHANNELS), dtype=np.uint8)
def _state(self):
return self.price
def _step(self,action):
state = self.price
self._take_action(action)
next_state = self.price
reward = self._get_reward()
done = False
if(self.price<0):
done=True
elif(self._get_reward()==0):
done = True
return state, action, reward, next_state, done
def _reset(self):
self.price = np.random.rand()
def _take_action(self, action):
if(action == 1):
self.price+= 0.25
elif(action==0):
self.price-=0.25
def sigmoid(self,x,maxcust,gamma):
return (maxcust/(1+math.exp(gamma*max(0,x))))
def _get_reward(self):
return max(np.random.poisson(self.sigmoid(self.price,50,0.5))*self.price,0)
The above environment makes use of a poisson arrival every time step where the rate of arrival depends on the price of thep product. The price elasticity is simulated by the below:
$$\lambda = \frac{2*maxcustomers}{1+ e^{\gamma x}} = sigmoid(- \gamma x), x>= 0$$where $\lambda$ is the rate of arrival of people and $\gamma$ is the "elasticity"
this decreases the rate of arrival of people $\lambda$ through the poisson distribution with increase in price and vice versa. The reward is given by the money generated through sales at each timestep to train the model.
env = ArrivalSim(0.4)
agent = DQNN(1,2)
plotdata = []
for step in range(0,750):
agent.memory = []
env._reset()
state = env._state()
total_profit = 0
for time_t in range(500):
# turn this on if you want to render
# env.render()
# Decide action
action = agent.act(np.array([state]))
# Advance the simulation to the next frame based on the action.
# Reward is 1 for every frame the pole survived
state, action, reward, next_state,done = env._step(action)
#next_state = np.reshape(next_state, [1, 4])
# Remember the previous state, action, reward, and done
agent.remember(state, action, reward, next_state,done)
# make next_state the new current state for the next frame.
state = next_state
total_profit += env._get_reward()
print("Final Price (End Of Episode {}): {}".format(step,env.price))
print("Total Profit: {}".format(total_profit))
plotdata.append([step,total_profit])
agent.revise(32)
The model has between two choices to make (increase the price by 0.25 or decrease the price by 0.25) or 1/0
It does this by spitting out an array of two values of which the larger one is chosen to be the action. eg [50,1] so the action is 0
The model makes a choice to which the environment calculates and returns a reward. The reward is given by:
reward = Po($ \lambda $)x price = Po(sigmoid(-$ \gamma $x price) x price
If the reward is high then the chosen action gets reinforced by adding the reward to the respective part in the array.
eg reward is 5 then the array will be [~55,1] - approximate as it also adds a discounted future prediction of the same action.
over time the correct actions get reinforced.
trace = go.Scatter(
x =[x[0] for x in plotdata],
y = [x[1] for x in plotdata]
)
data = [trace]
layout = go.Layout(
xaxis=dict(
title = 'EPISODE',
autorange=True,
showgrid=False,
zeroline=False,
showline=False,
ticks='',
showticklabels=False
),
yaxis=dict(
title = 'TOTAL PROFIT PER EPISODE',
autorange=True,
showgrid=False,
zeroline=False,
showline=False
)
)
fig = go.Figure(data=data, layout=layout)
iplot(fig, filename='axes-booleans')
We can see above that the total profit gradually increases with more and more iterations.