end0tknr's kipple - 新web写経開発

http://d.hatena.ne.jp/end0tknr/ から移転しました

独自データセットを CNN(AlexNet) で画像分類

deep learningにおけるhello worldのMLP (Multi Layer Perceptron) から、畳込みニューラルネットワーク(CNN : Convolutional Neural Network )におけるhello worldのAlexNetへ - end0tknr's kipple - 新web写経開発

GitHub - miyamotok0105/pytorch_handbook: pytorch_handbook

先日の CNN(AlexNet) エントリの続きで、やはり上記urlの写経。

前回は、CIFAR-10 https://www.cs.toronto.edu/~kriz/cifar.html というバイナリ?で用意されたデータを使用しましたが、 今回は、 https://download.pytorch.org/tutorial/hymenoptera_data.zip にある アリとハチの画像を分類します。

画像のサンプルは以下。 f:id:end0tknr:20191019185851p:plain f:id:end0tknr:20191019185850p:plain

#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.autograd import Variable
import numpy as np
import torchvision
from torchvision import datasets, models, transforms

# import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
import matplotlib.pylab as plt

from PIL import Image
import time
import os

import cv2
from PIL import Image


DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

root = 'hymenoptera_data'
num_classes = 2
TRAIN_NUM_EPOCHS = 500
fc_size = 9216


def main():
    print("DEVICE:", DEVICE)
    

    (data_transforms, to_tensor_transforms) = get_pre_process()

    # 定義したDatasetとDataLoaderを使います。
    custom_train_dataset = CustomDataset(root, data_transforms["train"], train=True)
    train_loader = torch.utils.data.DataLoader(dataset=custom_train_dataset,
                                               batch_size=5, 
                                               shuffle=True)
    custom_test_dataset = CustomDataset(root, data_transforms["val"])
    test_loader = torch.utils.data.DataLoader(dataset=custom_test_dataset,
                                              batch_size=5, 
                                              shuffle=False)
    # for i, (images, labels) in enumerate(train_loader):
    #     print(images.size())
    #     print(images[0].size())
    #     print(labels[0].item())
    #     #ここに訓練などの処理をきます。
    #     break

    global fc_size
    # batch_size=10, channel=3, size=224x224
    fc_size = get_fc_size( torch.FloatTensor(10, 3, 224, 224) )
    print("fc_size:",fc_size)
    
    net = AlexNet(num_classes, fc_size).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

    (train_loss_list, train_acc_list, val_loss_list, val_acc_list) = \
        train_epochs(train_loader, test_loader, net, criterion, optimizer)

    output_to_file(train_loss_list,train_acc_list,val_loss_list,val_acc_list)


def train_epochs(train_loader, test_loader, net, criterion, optimizer):
    train_loss_list = []
    train_acc_list =  []
    val_loss_list =   []
    val_acc_list =    []

    for epoch in range(TRAIN_NUM_EPOCHS):
        train_loss = 0
        train_acc =  0
        val_loss =   0
        val_acc =    0
    
        #train
        net.train()
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            train_loss += loss.item()
            train_acc += (outputs.max(1)[1] == labels).sum().item()
            loss.backward()
            optimizer.step()
    
        avg_train_loss = train_loss / len(train_loader.dataset)
        avg_train_acc =  train_acc  / len(train_loader.dataset)
    
        #val
        net.eval()
        with torch.no_grad():
            for images, labels in test_loader:
                images = images.to(DEVICE)
                labels = labels.to(DEVICE)
                outputs = net(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                val_acc += (outputs.max(1)[1] == labels).sum().item()
        avg_val_loss = val_loss / len(test_loader.dataset)
        avg_val_acc = val_acc / len(test_loader.dataset)

        
        output_str_format = ', '.join(['Epoch [{}/{}]','Loss: {loss:.4f}',
                                       'val_loss: {val_loss:.4f}','val_acc: {val_acc:.4f}'])
        print (output_str_format.format(epoch+1, TRAIN_NUM_EPOCHS, i+1, loss=avg_train_loss,
                                        val_loss=avg_val_loss, val_acc=avg_val_acc))
        
        train_loss_list.append(avg_train_loss)
        train_acc_list.append(avg_train_acc)
        val_loss_list.append(avg_val_loss)
        val_acc_list.append(avg_val_acc)
        
    return train_loss_list, train_acc_list, val_loss_list, val_acc_list


def show_img(img):
    npimg = img.numpy()
#   ↓この行は理解できていません
#    plt.imshow(np.transpose(npimg, (1,2,0)), interpolation='nearest')


def get_pre_process():
    #画像の前処理
    data_transforms = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(224),  # re-size
            transforms.RandomHorizontalFlip(),  # 反転
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]) #正規化
        ]),
        'val': transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
        ]),
    }
    #正規化をしない前処理
    to_tensor_transforms = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor()
    ])
    return data_transforms , to_tensor_transforms

def get_nn_features():
    features = nn.Sequential(
        nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(64, 192, kernel_size=5, padding=2),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(192, 384, kernel_size=3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(384, 256, kernel_size=3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 256, kernel_size=3, padding=1),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=2, stride=2),
    )
    return features

def get_fc_size( size_check ):
    features = get_nn_features()
    
    #バッチサイズ10, 6×6のフィルターが256枚
    #10バッチは残して、6×6×256を1次元に落とす=>6×6×256=9216
    print("size1:",features(size_check).size())
    #バッチ10の値を軸にして残りの次元を1次元へ落とした場合の
    #Tensorの形状をチェックすると9216。
    print("size2:",features(size_check).view(size_check.size(0), -1).size())
    #fc_sizeを全結合の形状として保持
    global fc_size
    fc_size = features(size_check).view(size_check.size(0), -1).size()[1]
    print("size3:",fc_size)
    return fc_size


def output_to_file(train_loss_list,train_acc_list,val_loss_list,val_acc_list):

    plt.figure()
    plt.plot(range(TRAIN_NUM_EPOCHS),
             train_loss_list,
             color='blue',
             linestyle='-',
             label='train_loss')
    plt.plot(range(TRAIN_NUM_EPOCHS),
             val_loss_list,
             color='green',
             linestyle='--',
             label='val_loss')
#    plt.ylim([0,0.04])
    plt.legend()
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.title('Training and validation loss')
    plt.grid()
    plt.savefig( 'test_4_1.png' )

    plt.figure()
    plt.plot(range(TRAIN_NUM_EPOCHS),
             train_acc_list,
             color='blue',
             linestyle='-',
             label='train_acc')
    plt.plot(range(TRAIN_NUM_EPOCHS),
             val_acc_list,
             color='green',
             linestyle='--',
             label='val_acc')
    plt.ylim([0,1])
    plt.legend()
    plt.xlabel('epoch')
    plt.ylabel('acc')
    plt.title('Training and validation accuracy')
    plt.grid()
    plt.savefig( 'test_4_2.png' )


class CustomDataset(torch.utils.data.Dataset):
    classes = ['ant', 'bee']
  
    def __init__(self, root, transform=None, train=True):
        
        self.transform = transform   # 指定する場合、前処理クラスを受取り
        
        self.images = []  # 画像とlabelの保持用
        self.labels = []

        # 訓練と検証で、読込むpathを取得
        if train == True:
            root_ants_path = os.path.join(root, 'train', 'ants')
            root_bees_path = os.path.join(root, 'train', 'bees')
        else:
            root_ants_path = os.path.join(root, 'val', 'ants')
            root_bees_path = os.path.join(root, 'val', 'bees')
        
        ant_images = os.listdir(root_ants_path)  # アリの画像一覧を取得
        ant_labels = [0] * len(ant_images)       # アリをlabel=0に指定
       
        bee_images = os.listdir(root_bees_path)  # ハチの画像一覧を取得
        bee_labels = [1] * len(bee_images)       # ハチをlabel=1に指定

        # listのmerge
        for image, label in zip(ant_images, ant_labels):
            self.images.append(os.path.join(root_ants_path, image))
            self.labels.append(label)
        for image, label in zip(bee_images, bee_labels):
            self.images.append(os.path.join(root_bees_path, image))
            self.labels.append(label)
        
    def __getitem__(self, index):
        image = self.images[index]     # indexを元に画像のpathとlabel取得
        label = self.labels[index]
       
        with open(image, 'rb') as f:   # pathから画像読込み
            image = Image.open(f)
            image = image.convert('RGB')
        
        if self.transform is not None: # 前処理がある場合は入れる
            image = self.transform(image)
            
        return image, label
        
    def __len__(self):
        # ここにはデータ数を指定します。
        return len(self.images)


class AlexNet(nn.Module):
    def __init__(self, num_classes, fc_size):
        super(AlexNet, self).__init__()
        self.features = get_nn_features()

        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(fc_size, 4096),  #fc_sizeで計算した形状を指定
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes)
        )
        
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


if __name__ == '__main__':
    main()

↑こう書くと、↓こう出力されます。

手元のcent os 8 でも実行しましたが、GPUがなく時間がかかる為、 google colaboratory でも実行しています。

$ wget https://download.pytorch.org/tutorial/hymenoptera_data.zip
$ unzip hymenoptera_data.zip
$ ./foo.py
DEVICE: cuda
size1: torch.Size([10, 256, 6, 6])
size2: torch.Size([10, 9216])
size3: 9216
fc_size: 9216
Epoch [1/500], Loss: 0.1392, val_loss: 0.1386, val_acc: 0.5061
Epoch [2/500], Loss: 0.1394, val_loss: 0.1386, val_acc: 0.5061
  :
Epoch [500/500], Loss: 0.0758, val_loss: 0.0612, val_acc: 0.8653

f:id:end0tknr:20191019185806p:plainf:id:end0tknr:20191019185810p:plain