#REINFORCE 알고리즘 구현
#SAINT Lab. Q1 [강화학습]
#60201969 이유현 [2024.01.29]
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
#Hyperparameters
learning_rate = 0.0002
gamma = 0.98
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.data = []
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 2)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
**def forward(self, x):
x = F.relu(self.fc1(x))
x = F.softmax(self.fc2(x), dim=0)
return x
def put_data(self, item):
self.data.append(item)
def train_net(self):
R = 0
self.optimizer.zero_grad()
for r, prob in self.data[::-1]:
R = r + gamma * R
loss = -torch.log(prob) * R
loss.backward() #loss에 대한 그라디언트가 계산되어 계속해서 더해짐
self.optimizer.step() #축적된 그라디언트를 이용해 뉴럴넷의 파라미터가 업데이트
self.data = []
def main():
env = gym.make('CartPole-v1')
pi = Policy() #정책 네트워크 pi
score = 0.0
print_interval = 20
for n_epi in range(10000):
s, _ = env.reset()
done = False
while not done: # CartPole-v1 forced to terminates at 500 step.
prob = pi(torch.from_numpy(s).float())
m = Categorical(prob)
a = m.sample() #하나의 액션 샘플링
s_prime, r, done, truncated, info = env.step(a.item())
pi.put_data((r,prob[a])) #보상, 확률값 저장
s = s_prime
score += r
pi.train_net() #하나의 에피소드 동안 모은 데이터를 이용해 실제 업데이트
if n_epi%print_interval==0 and n_epi!=0:
print("# of episode :{}, avg score : {}".format(n_epi, score/print_interval))
score = 0.0
env.close()
if __name__ == '__main__':
main()
<aside> 📢 파이토치 설치 실패로 실구현 보류
</aside>
#TD Actor-Critic 구현
#SAINT Lab. Q1 [강화학습]
#60201969 이유현 [2024.01.30]
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
#Hyperparameters
learning_rate = 0.0002
gamma = 0.98
n_rollout = 10 #몇 틱의 데이터를 쌓아서 업데이트할 지(10번의 상태 전이를 모아서 업데이트)
class ActorCritic(nn.Module):
def __init__(self):
super(ActorCritic, self).__init__()
self.data = []
self.fc1 = nn.Linear(4,256)
self.fc_pi = nn.Linear(256,2)
self.fc_v = nn.Linear(256,1)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def pi(self, x, softmax_dim = 0): #정책 네트워크
x = F.relu(self.fc1(x))
x = self.fc_pi(x)
prob = F.softmax(x, dim=softmax_dim)
return prob
def v(self, x): #밸류 네트워크
x = F.relu(self.fc1(x))
v = self.fc_v(x)
return v
def put_data(self, transition):
self.data.append(transition)
def make_batch(self): #n_rollout동안 모였던 데이터를 s,a,r,s'끼리 각각 따로따로 모아서 미니배치를 만들어주는 함수
s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
for transition in self.data:
s,a,r,s_prime,done = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r/100.0])
s_prime_lst.append(s_prime)
done_mask = 0.0 if done else 1.0
done_lst.append([done_mask])
s_batch, a_batch, r_batch, s_prime_batch, done_batch = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \\
torch.tensor(r_lst, dtype=torch.float), torch.tensor(s_prime_lst, dtype=torch.float), \\
torch.tensor(done_lst, dtype=torch.float)
self.data = []
return s_batch, a_batch, r_batch, s_prime_batch, done_batch
def train_net(self): #미니배치를 바탕으로 실제 학습
s, a, r, s_prime, done = self.make_batch()
td_target = r + gamma * self.v(s_prime) * done
delta = td_target - self.v(s)
pi = self.pi(s, softmax_dim=1)
pi_a = pi.gather(1,a)
#-부호를 통해 gradient ascent 구현(코드는 gradient descent 방향으로 업데이트 되기 때문)
loss = -torch.log(pi_a) * delta.detach() + F.smooth_l1_loss(self.v(s), td_target.detach()) #정책 네트워크의 손실 함수와 밸류 네트워크의 손실 함수를 더하여 한번에 업데이트를 진행, detach 중요: 상수 취급을 하여 그라디언트가 뒤로 흘러가지 않게 함
#밸류 네트워크의 loss는 TD방식으로 계산, td_target 상수 취급을 위해 detach 사용
self.optimizer.zero_grad()
loss.mean().backward() #10틱의 데이터의 loss의 평균 = 최종 loss, backward 함수로 그라디언트 계산
self.optimizer.step()
def main():
env = gym.make('CartPole-v1')
model = ActorCritic()
print_interval = 20
score = 0.0
for n_epi in range(10000):
done = False
s, _ = env.reset()
while not done:
for t in range(n_rollout):
prob = model.pi(torch.from_numpy(s).float()) #액션 선정
m = Categorical(prob)
a = m.sample().item()
s_prime, r, done, truncated, info = env.step(a)
model.put_data((s,a,r,s_prime,done)) #환경, 액션, 상태 전이와 보상 값들을 잠시 저장
s = s_prime
score += r
if done:
break
model.train_net() #학습을 진행
if n_epi%print_interval==0 and n_epi!=0:
print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
score = 0.0
env.close()
if __name__ == '__main__':
main()
<aside> 📢 파이토치 설치 실패로 실구현 보류
</aside>