https://lightning.ai/docs/pytorch/stable/
공식 Docs 참고
pip install lightning
miniconda에 새로운 가상환경을 만들고 pytorch lightning을 설치 해준다.
또는
requirements.txt 파일에 필요한 모듈들 적어서 한번에 다운 받는다.
pip install -r requirements.txt
import time
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib_inline.backend_inline
import numpy as np
import torch
import torch.nn as nn
import torch.utils.data as data
from matplotlib.colors import to_rgba
from torch import Tensor
from tqdm.notebook import tqdm
matplotlib_inline.backend_inline.set_matplotlib_formats("svg", "pdf")
print("Using torch", torch.__version__)
Using torch 2.1.2+cu121
torch.manual_seed(42) #시드값 고정
<torch._C.Generator at 0x7f162daad950>
x = Tensor(2, 3, 4)
print(x)
tensor([[[-1.4961e-36, 4.5588e-41, -1.4943e-36, 4.5588e-41],
[-1.4943e-36, 4.5588e-41, -1.4961e-36, 4.5588e-41],
[-1.4943e-36, 4.5588e-41, -1.4943e-36, 4.5588e-41]],
[[-1.4961e-36, 4.5588e-41, -1.4960e-36, 4.5588e-41],
[-1.4962e-36, 4.5588e-41, -1.4960e-36, 4.5588e-41],
[-1.4960e-36, 4.5588e-41, -1.4960e-36, 4.5588e-41]]])
x = Tensor([[1, 2], [3, 4]])
print(x)
tensor([[1., 2.],
[3., 4.]])
x = torch.rand(2, 3, 4)
print(x)
tensor([[[0.8823, 0.9150, 0.3829, 0.9593],
[0.3904, 0.6009, 0.2566, 0.7936],
[0.9408, 0.1332, 0.9346, 0.5936]],
[[0.8694, 0.5677, 0.7411, 0.4294],
[0.8854, 0.5739, 0.2666, 0.6274],
[0.2696, 0.4414, 0.2969, 0.8317]]])
shape = x.shape
print("Shape:", x.shape)
size = x.size()
print("Size:", size)
dim1, dim2, dim3 = x.size()
print("Size:", dim1, dim2, dim3)
Shape: torch.Size([2, 3, 4])
Size: torch.Size([2, 3, 4])
Size: 2 3 4
np_arr = np.array([[1, 2], [3, 4]])
tensor = torch.from_numpy(np_arr)
print("Numpy array:", np_arr)
print("PyTorch tensor:", tensor)
Numpy array: [[1 2]
[3 4]]
PyTorch tensor: tensor([[1, 2],
[3, 4]])
tensor = torch.arange(4)
np_arr = tensor.numpy()
print("PyTorch tensor:", tensor)
print("Numpy array:", np_arr)
PyTorch tensor: tensor([0, 1, 2, 3])
Numpy array: [0 1 2 3]
x1 = torch.rand(2, 3)
x2 = torch.rand(2, 3)
y = x1 + x2
print("X1", x1)
print("X2", x2)
print("Y", y)
X1 tensor([[0.1053, 0.2695, 0.3588],
[0.1994, 0.5472, 0.0062]])
X2 tensor([[0.9516, 0.0753, 0.8860],
[0.5832, 0.3376, 0.8090]])
Y tensor([[1.0569, 0.3448, 1.2448],
[0.7826, 0.8848, 0.8151]])
x1 = torch.rand(2, 3)
x2 = torch.rand(2, 3)
print("X1 (before)", x1)
print("X2 (before)", x2)
x2.add_(x1)
print("X1 (after)", x1)
print("X2 (after)", x2)
X1 (before) tensor([[0.5779, 0.9040, 0.5547],
[0.3423, 0.6343, 0.3644]])
X2 (before) tensor([[0.7104, 0.9464, 0.7890],
[0.2814, 0.7886, 0.5895]])
X1 (after) tensor([[0.5779, 0.9040, 0.5547],
[0.3423, 0.6343, 0.3644]])
X2 (after) tensor([[1.2884, 1.8504, 1.3437],
[0.6237, 1.4230, 0.9539]])
x = torch.arange(6)
print("X", x)
X tensor([0, 1, 2, 3, 4, 5])
x = x.view(2, 3)
print("X", x)
X tensor([[0, 1, 2],
[3, 4, 5]])
x = x.permute(1, 0)
print("X", x)
X tensor([[0, 3],
[1, 4],
[2, 5]])
x = torch.arange(6)
x = x.view(2, 3)
print("X", x)
X tensor([[0, 1, 2],
[3, 4, 5]])
W = torch.arange(9).view(3, 3)
print("W", W)
W tensor([[0, 1, 2],
[3, 4, 5],
[6, 7, 8]])
h = torch.matmul(x, W)
print("h", h)
h tensor([[15, 18, 21],
[42, 54, 66]])
x = torch.arange(12).view(3, 4)
print("X", x)
X tensor([[ 0, 1, 2, 3],
[ 4, 5, 6, 7],
[ 8, 9, 10, 11]])
print(x[:, 1])
tensor([1, 5, 9])
print(x[0])
tensor([0, 1, 2, 3])
print(x[:2, -1])
tensor([3, 7])
print(x[1:3, :])
tensor([[ 4, 5, 6, 7],
[ 8, 9, 10, 11]])
x = torch.ones((3,))
print(x.requires_grad)
False
x.requires_grad_(True)
print(x.requires_grad)
True
x = torch.arange(3, dtype=torch.float32, requires_grad=True) # Only float tensors can have gradients
print("X", x)
X tensor([0., 1., 2.], requires_grad=True)
a = x + 2
b = a**2
c = b + 3
y = c.mean()
print("Y", y)
Y tensor(12.6667, grad_fn=<MeanBackward0>)
y.backward()
print(x.grad)
tensor([1.3333, 2.0000, 2.6667])
gpu_avail = torch.cuda.is_available()
print(f"Is the GPU available? {gpu_avail}")
Is the GPU available? True
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Device", device)
Device cuda
x = torch.zeros(2, 3)
x = x.to(device)
print("X", x)
X tensor([[0., 0., 0.],
[0., 0., 0.]], device='cuda:0')
x = torch.randn(5000, 5000)
# CPU version
start_time = time.time()
_ = torch.matmul(x, x)
end_time = time.time()
print(f"CPU time: {(end_time - start_time):6.5f}s")
# GPU version
if torch.cuda.is_available():
x = x.to(device)
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
_ = torch.matmul(x, x)
end.record()
torch.cuda.synchronize()
print(f"GPU time: {0.001 * start.elapsed_time(end):6.5f}s")
# 왜 gpu가 더 느리지?
CPU time: 0.22116s
GPU time: 1.89578s
if torch.cuda.is_available():
torch.cuda.manual_seed(42)
torch.cuda.manual_seed_all(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
import torch.nn as nn
import torch.nn.functional as F
class MyModule(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
pass
class SimpleClassifier(nn.Module):
def __init__(self, num_inputs, num_hidden, num_outputs):
super().__init__()
self.linear1 = nn.Linear(num_inputs, num_hidden)
self.act_fn = nn.Tanh()
self.linear2 = nn.Linear(num_hidden, num_outputs)
def forward(self, x):
x = self.linear1(x)
x = self.act_fn(x)
x = self.linear2(x)
return x
model = SimpleClassifier(num_inputs=2, num_hidden=4, num_outputs=1)
print(model)
SimpleClassifier(
(linear1): Linear(in_features=2, out_features=4, bias=True)
(act_fn): Tanh()
(linear2): Linear(in_features=4, out_features=1, bias=True)
)
for name, param in model.named_parameters():
print(f"Parameter {name}, shape {param.shape}")
Parameter linear1.weight, shape torch.Size([4, 2])
Parameter linear1.bias, shape torch.Size([4])
Parameter linear2.weight, shape torch.Size([1, 4])
Parameter linear2.bias, shape torch.Size([1])
import torch.utils.data as data
class XORDataset(data.Dataset):
def __init__(self, size, std=0.1):
super().__init__()
self.size = size
self.std = std
self.generate_continuous_xor()
def generate_continuous_xor(self):
data = torch.randint(low=0, high=2, size=(self.size, 2), dtype=torch.float32)
label = (data.sum(dim=1) == 1).to(torch.long)
data += self.std * torch.randn(data.shape)
self.data = data
self.label = label
def __len__(self):
return self.size
def __getitem__(self, idx):
data_point = self.data[idx]
data_label = self.label[idx]
return data_point, data_label
dataset = XORDataset(size=200)
print("Size of dataset:", len(dataset))
print("Data point 0:", dataset[0])
Size of dataset: 200
Data point 0: (tensor([0.9632, 0.1117]), tensor(1))
def visualize_samples(data, label):
if isinstance(data, Tensor):
data = data.cpu().numpy()
if isinstance(label, Tensor):
label = label.cpu().numpy()
data_0 = data[label == 0]
data_1 = data[label == 1]
plt.figure(figsize=(4, 4))
plt.scatter(data_0[:, 0], data_0[:, 1], edgecolor="#333", label="Class 0")
plt.scatter(data_1[:, 0], data_1[:, 1], edgecolor="#333", label="Class 1")
plt.title("Dataset samples")
plt.ylabel(r"$x_2$")
plt.xlabel(r"$x_1$")
plt.legend()
visualize_samples(dataset.data, dataset.label)
plt.show()
data_loader = data.DataLoader(dataset, batch_size=8, shuffle=True)
data_inputs, data_labels = next(iter(data_loader))
print("Data inputs", data_inputs.shape, "\n", data_inputs)
print("Data labels", data_labels.shape, "\n", data_labels)
Data inputs torch.Size([8, 2])
tensor([[-0.0890, 0.8608],
[ 1.0905, -0.0128],
[ 0.7967, 0.2268],
[-0.0688, 0.0371],
[ 0.8732, -0.2240],
[-0.0559, -0.0282],
[ 0.9277, 0.0978],
[ 1.0150, 0.9689]])
Data labels torch.Size([8])
tensor([1, 1, 1, 0, 1, 0, 1, 0])
import torch.nn as nn
loss_module = nn.BCEWithLogitsLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
def train_model(model, optimizer, data_loader, loss_module, num_epochs=100):
# 모델을 학습 모드로 설정
model.train()
# 학습 루프
for epoch in tqdm(range(num_epochs)):
for data_inputs, data_labels in data_loader:
# 단계 1: 입력 데이터를 디바이스로 이동 (GPU를 사용하는 경우에만 엄격하게 필요)
data_inputs = data_inputs.to(device)
data_labels = data_labels.to(device)
# 단계 2: 입력 데이터에 모델을 실행
preds = model(data_inputs)
preds = preds.squeeze(dim=1) # 출력은 [배치 크기, 1]이지만 [배치 크기]로 변경
# 단계 3: 손실을 계산
loss = loss_module(preds, data_labels.float())
# 단계 4: 역전파를 수행
# 기울기를 계산하기 전에 모두 0으로 설정
# 기울기는 덮어쓰기 대신 기존 것에 추가되지 않도록 수행
optimizer.zero_grad()
# 역전파 수행
loss.backward()
# 단계 5: 매개변수를 업데이트
optimizer.step()
state_dict = model.state_dict()
print(state_dict)
OrderedDict([('linear1.weight', tensor([[-0.3851, -0.1043],
[-0.0225, -0.5951],
[-0.5302, -0.0411],
[-0.1559, -0.1731]])), ('linear1.bias', tensor([-0.1501, -0.6921, 0.3138, -0.0009])), ('linear2.weight', tensor([[-0.3575, 0.0298, 0.4821, -0.4590]])), ('linear2.bias', tensor([-0.2364]))])
torch.save(state_dict, "our_model.tar")
# 디스크에서 상태 사전을 로드 (위와 동일한 이름)
state_dict = torch.load("our_model.tar")
# 새로운 모델을 생성하고 상태를 로드
new_model = SimpleClassifier(num_inputs=2, num_hidden=4, num_outputs=1)
new_model.load_state_dict(state_dict)
# 매개변수가 동일한지 확인
print("원본 모델\n", model.state_dict())
print("\n로드된 모델\n", new_model.state_dict())
원본 모델
OrderedDict([('linear1.weight', tensor([[-0.3851, -0.1043],
[-0.0225, -0.5951],
[-0.5302, -0.0411],
[-0.1559, -0.1731]])), ('linear1.bias', tensor([-0.1501, -0.6921, 0.3138, -0.0009])), ('linear2.weight', tensor([[-0.3575, 0.0298, 0.4821, -0.4590]])), ('linear2.bias', tensor([-0.2364]))])
로드된 모델
OrderedDict([('linear1.weight', tensor([[-0.3851, -0.1043],
[-0.0225, -0.5951],
[-0.5302, -0.0411],
[-0.1559, -0.1731]])), ('linear1.bias', tensor([-0.1501, -0.6921, 0.3138, -0.0009])), ('linear2.weight', tensor([[-0.3575, 0.0298, 0.4821, -0.4590]])), ('linear2.bias', tensor([-0.2364]))])
test_dataset = XORDataset(size=500)
# drop_last -> 마지막 배치가 128보다 작더라도 삭제하지 않음
test_data_loader = data.DataLoader(test_dataset, batch_size=128, shuffle=False, drop_last=False, pin_memory=True)
def eval_model(model, data_loader):
model.eval() # 모델을 평가 모드로 설정
true_preds, num_preds = 0.0, 0.0
with torch.no_grad(): # 다음 코드에서 그라디언트 비활성화
for data_inputs, data_labels in data_loader:
# 데이터를 모델의 장치로 이동
data_inputs, data_labels = data_inputs.to(device), data_labels.to(device)
# 모델을 CPU 또는 GPU로 이동
model.to(device)
preds = model(data_inputs)
preds = preds.squeeze(dim=1)
preds = torch.sigmoid(preds) # 예측 값을 0과 1 사이로 매핑하기 위해 시그모이드 사용
pred_labels = (preds >= 0.5).long() # 예측을 0과 1로 이진화
# 정확도 메트릭을 위한 예측 기록 (true_preds=TP+TN, num_preds=TP+TN+FP+FN)
true_preds += (pred_labels == data_labels).sum().item()
num_preds += data_labels.shape[0]
acc = true_preds / num_preds
print(f"모델의 정확도: {100.0*acc:4.2f}%")
# eval_model 함수 호출
eval_model(model, test_data_loader)
모델의 정확도: 76.40%
@torch.no_grad() # 데코레이터, 함수 전체에 "with torch.no_grad(): ..."과 동일한 효과를 준다
def visualize_classification(model, data, label):
if isinstance(data, Tensor):
data = data.cpu().numpy()
if isinstance(label, Tensor):
label = label.cpu().numpy()
data_0 = data[label == 0]
data_1 = data[label == 1]
plt.figure(figsize=(4, 4))
plt.scatter(data_0[:, 0], data_0[:, 1], edgecolor="#333", label="class 0")
plt.scatter(data_1[:, 0], data_1[:, 1], edgecolor="#333", label="class 1")
plt.title("Dataset samples")
plt.ylabel(r"$x_2$")
plt.xlabel(r"$x_1$")
plt.legend()
# 학습한 연산들을 활용
model.to(device)
c0 = Tensor(to_rgba("C0")).to(device)
c1 = Tensor(to_rgba("C1")).to(device)
x1 = torch.arange(-0.5, 1.5, step=0.01, device=device)
x2 = torch.arange(-0.5, 1.5, step=0.01, device=device)
xx1, xx2 = torch.meshgrid(x1, x2) # numpy의 meshgrid 함수와 동일한 역할
model_inputs = torch.stack([xx1, xx2], dim=-1)
preds = model(model_inputs)
preds = torch.sigmoid(preds)
# 차원에 "None"을 지정하면 새로운 차원을 생성
output_image = (1 - preds) * c0[None, None] + preds * c1[None, None]
output_image = (
output_image.cpu().numpy()
) # numpy 배열로 변환. 이는 CPU 상의 텐서에만 작동하므로 먼저 CPU로 이동
plt.imshow(output_image, origin="lower", extent=(-0.5, 1.5, -0.5, 1.5))
plt.grid(False)
visualize_classification(model, dataset.data, dataset.label)
plt.show()