• 카테고리

    질문 & 답변
  • 세부 분야

    컴퓨터 비전

  • 해결 여부

    미해결

클래스 질문

22.10.04 20:19 작성 조회수 251

0

안녕하세요. 강사님. 제가 이번 실습을 기반으로 병해충 진단 모델을 구축중입니다. 이에 대해 질문사항이 있어 질문드립니다.

efficientNet모델로 딥러닝 모델을 만들고 있습니다. 총 16개의 병해충에 걸린 식물을 구분하는 모델입니다. output layer에서 최종적으로 예측하고 나온 16개의 확률값중에 가장 큰 확률값의 index가 4라고 할때 이게 어떤 식물인지 어떻게 아는건가요?..

답변 1

답변을 작성해보세요.

0

안녕하십니까,

label encoding 값을 label값으로 역변환을 해주시면 됩니다.

최초에 병해충 타겟값이 label로 해충A, 해충B, 해충C로 되어 있어서 이걸 사이킷런의 LabelEncoder로 변환한 후에 to_categorical()로 one-hot encoding 하셨다면 해당 LabelEncoder객체로 다시 역변환해주면 됩니다.

예를 들어

le = LabelEncoder()

label_encoded = le.fit_transform(병해충 타겟값 label 집합)

one_encoded = to_categorical(label_encoded)

해서 예측 결과가 index값 4가 나왔다면

le.inverse_transform([4]) 를 입력하시면 병해충 label값이 다시 도출 됩니다.

감사합니다.

원형 조님의 프로필

원형 조

2022.10.05

아 감사합니다!! 해결했습니다.

죄송한데.. 질문 두 개만 더 하겠습니다.

1)제가 처음에 one-hot-encoding을 진행할 때 말씀해주신 LabelEncoder() 말고 pd.get_dummies를 사용해서 원-핫 인코딩을 진행했었습니다. 이런 방식으로는 제가 질문한 사항에 대해 해결할 수 없을까요?

2)강사님 코드기반으로 아까 언급한 class 갯수가 16개인 efficientNetB5 모델을 구축했습니다. 하지만, 무슨 이유인지 model.evaluate로 평가한 결과 정확도가 50%밖에 되지 않았습니다. 대부분 이런 경우 정확도를 높일려면 어떻게 해야될까요? 각 클래스별 이미지 갯수는 약 1천건으로 맞춰주었습니다. 혹시 몰라 아래에 코드를 첨부하겠습니다.

def get_label(x):
  if x['질병명'] == 1:
    return "고추흰가루병"
  elif x['질병명'] == 2:
    return "고추탄저병"
  elif x['질병명'] == 5:
    return "배추검은썩음병"
  elif x['질병명'] == 6:
    return "배추노균병"
  elif x['질병명'] == 13:
    return "콩점무늬병"
  elif x['질병명'] == 14:
    return "콩불마름병"
  elif x['질병명'] == 16:
    return "파검은무늬병"
  elif x['질병명'] == 17:
    return "파노균병"
  elif x['질병명'] == 18:
    return "파녹병"
  elif x['질병명'] == 3:
    return "무검은무늬병"
  elif x['질병명'] == 4:
    return "무노균병"
  else: #정상 Label은 작물명_정상으로 바꿔준다.
    if x['작물'] == 1:
      return "정상_고추"
    elif x['작물'] == 3:
      return "정상_배추"
    elif x['작물'] == 8:
      return "정상_콩"
    elif x['작물'] == 2:
      return "정상_무"
    elif x['작물'] == 9:
      return "정상_파"
------------------------------------------------------------------------------------------------
tr_json_df['질병명'] = tr_json_df.apply(get_label,axis=1)

from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

le = LabelEncoder()

label_encoded = le.fit_transform(tr_json_df['질병명'])

one_encoded = to_categorical(label_encoded)
------------------------------------------------------------------------------------------
train_labels = pd.DataFrame(data=one_encoded,columns=le.classes_).astype('int')
train_df = pd.concat([tr_json_df,train_labels],axis=1) #최종적으로 학습시킬때 필요한 dataframe
import albumentations as A

augmentor_01 = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.ShiftScaleRotate(scale_limit=(0.7, 0.9), p=0.3, rotate_limit=30),
    A.RandomBrightnessContrast(brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2), p=0.3),
    A.Blur(p=0.3)
])
---------------------------------------------------------------------------------------------------
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import Sequence
import sklearn 

# 입력 인자 image_filenames, labels는 모두 numpy array로 들어옴. image size는 (높이, 너비)로 수정. 
class Plant_Dataset(Sequence):
    def __init__(self, image_filenames, labels, image_size=(456, 456), batch_size=64, 
                 augmentor=None, shuffle=False, pre_func=None):
        '''
        파라미터 설명
        image_filenames: opencv로 image를 로드할 파일의 절대 경로들
        labels: 해당 image의 label들
        batch_size: __getitem__(self, index) 호출 시 마다 가져올 데이터 batch 건수
        augmentor: albumentations 객체
        shuffle: 학습 데이터의 경우 epoch 종료시마다 데이터를 섞을지 여부
        '''
        # 객체 생성 인자로 들어온 값을 객체 내부 변수로 할당. 
        self.image_filenames = image_filenames
        self.labels = labels
        self.image_size = image_size
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.pre_func = pre_func
        # train data의 경우 
        self.shuffle = shuffle
        if self.shuffle:
            # 객체 생성시에 한번 데이터를 섞음. 
            #self.on_epoch_end()
            pass
    
    # Sequence를 상속받은 Dataset은 batch_size 단위로 입력된 데이터를 처리함. 
    # __len__()은 전체 데이터 건수가 주어졌을 때 batch_size단위로 몇번 데이터를 반환하는지 나타남
    def __len__(self):
        # batch_size단위로 데이터를 몇번 가져와야하는지 계산하기 위해 전체 데이터 건수를 batch_size로 나누되, 정수로 정확히 나눠지지 않을 경우 1회를 더한다. 
        return int(np.ceil(len(self.image_filenames) / self.batch_size))
    
    # batch_size 단위로 image_array, label_array 데이터를 가져와서 변환한 뒤 다시 반환함
    # 인자로 몇번째 batch 인지를 나타내는 index를 입력하면 해당 순서에 해당하는 batch_size 만큼의 데이타를 가공하여 반환
    # batch_size 갯수만큼 변환된 image_array와 label_array 반환. 
    def __getitem__(self, index):
        # index는 몇번째 batch인지를 나타냄. 
        # batch_size만큼 순차적으로 데이터를 가져오려면 array에서 index*self.batch_size:(index+1)*self.batch_size 만큼의 연속 데이터를 가져오면 됨
        image_name_batch = self.image_filenames[index*self.batch_size:(index+1)*self.batch_size]
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]
        
        # label_batch가 None이 될 수 있음. 
        else: 
            label_batch = None
        # 만일 객체 생성 인자로 albumentation으로 만든 augmentor가 주어진다면 아래와 같이 augmentor를 이용하여 image 변환
        # albumentations은 개별 image만 변환할 수 있으므로 batch_size만큼 할당된 image_name_batch를 한 건씩 iteration하면서 변환 수행. 
        # image_batch 배열은 float32 로 설정. 
        image_batch = np.zeros((image_name_batch.shape[0], self.image_size[0], self.image_size[1], 3), dtype='float32')
        
        # batch_size에 담긴 건수만큼 iteration 하면서 opencv image load -> image augmentation 변환(augmentor가 not None일 경우)-> image_batch에 담음. 
        for image_index in range(image_name_batch.shape[0]):
            image = cv2.cvtColor(cv2.imread(image_name_batch[image_index]), cv2.COLOR_BGR2RGB)
            if self.augmentor is not None:
                image = self.augmentor(image=image)['image']
            #원본 이미지와 다르게 resize 적용. opencv의 resize은 (가로, 세로)의 개념임. 배열은 (높이, 너비)의 개념이므로 이에 주의하여 opencv resize 인자 입력 필요.  
            image = cv2.resize(image, (self.image_size[1], self.image_size[0]))
            # 만일 preprocessing_input이 pre_func인자로 들어오면 이를 이용하여 scaling 적용. 
            if self.pre_func is not None:
                image = self.pre_func(image)
                
            image_batch[image_index] = image
        
        return image_batch, label_batch
    
    # epoch가 한번 수행이 완료 될 때마다 모델의 fit()에서 호출됨. 
    def on_epoch_end(self):
        if(self.shuffle):
            #print('epoch end')
            # 전체 image 파일의 위치와 label를 쌍을 맞춰서 섞어준다. scikt learn의 utils.shuffle에서 해당 기능 제공
            self.image_filenames, self.labels = sklearn.utils.shuffle(self.image_filenames, self.labels)
        else:
            pass
-------------------------------------------------------------------------------------------------------

from sklearn.model_selection import train_test_split

def get_train_valid(train_df, valid_size=0.2, random_state=2021):
    train_path = train_df['path'].values
    # 별도의 원핫인코딩을 하지 않고  'healthy', 'multiple_diseases', 'rust', 'scab' 컬럼들을 모두 Numpy array로 변환하는 수준으로 label을 원핫 인코딩 적용. 
    train_label = train_df[['고추탄저병','고추흰가루병','배추검은썩음병','배추노균병','정상_고추','정상_배추','정상_콩','콩불마름병','콩점무늬병','무검은무늬병','무노균병','파검은무늬병','파노균병','파녹병','정상_파','정상_무']].values
    
    tr_path, val_path, tr_label, val_label = train_test_split(train_path, train_label, test_size=valid_size, random_state=random_state)
    print('tr_path shape:', tr_path.shape, 'tr_label shape:', tr_label.shape, 'val_path shape:', val_path.shape, 'val_label shape:', val_label.shape)
    return tr_path, val_path, tr_label, val_label
----------------------------------------------------------------------------------------------------------------
from tensorflow.keras.applications.xception import preprocess_input as xcp_preprocess_input
from tensorflow.keras.applications.efficientnet import preprocess_input as eff_preprocess_input
import cv2
# image size는 224x224로 Dataset 생성. 
IMAGE_SIZE = (456, 456)
BATCH_SIZE = 64

tr_path, val_path, tr_label, val_label = get_train_valid(train_df, valid_size=0.2, random_state=2021)

tr_ds = Plant_Dataset(tr_path, tr_label, image_size=IMAGE_SIZE, batch_size=BATCH_SIZE, 
                          augmentor=augmentor_01, shuffle=True, pre_func=eff_preprocess_input)
val_ds = Plant_Dataset(val_path, val_label, image_size=IMAGE_SIZE, batch_size=BATCH_SIZE, 
                      augmentor=None, shuffle=False, pre_func=eff_preprocess_input)

tr_image_batch, tr_label_batch = next(iter(tr_ds))
val_image_batch, val_label_batch = next(iter(val_ds))
print(tr_image_batch.shape, val_image_batch.shape, tr_label_batch.shape, val_label_batch.shape)
print(tr_image_batch[0], val_image_batch[0])
-----------------------------------------------------------------------------------------------
from tensorflow.keras.models import Sequential , Model
from tensorflow.keras.layers import Input, Dense , Conv2D , Dropout , Flatten , Activation, MaxPooling2D , GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam , RMSprop 
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau , EarlyStopping , ModelCheckpoint , LearningRateScheduler
from tensorflow.keras.metrics import AUC


from tensorflow.keras.applications import Xception, ResNet50V2, EfficientNetB0, EfficientNetB1, EfficientNetB2, EfficientNetB3
from tensorflow.keras.applications import EfficientNetB4, EfficientNetB5, EfficientNetB6, EfficientNetB7
from tensorflow.keras.applications import MobileNetV3Large
import tensorflow as tf


def create_model(model_type='efficientnetb0', in_shape=(224, 224, 3), n_classes=16):
    input_tensor = Input(shape=in_shape)

    if model_type == 'resnet50v2':
        base_model = tf.keras.applications.ResNet50V2(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'xception':
        base_model = tf.keras.applications.Xception(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb0':
        base_model = tf.keras.applications.EfficientNetB0(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb1':
        base_model = tf.keras.applications.EfficientNetB1(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb2':
        base_model = tf.keras.applications.EfficientNetB2(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb3':
        base_model = tf.keras.applications.EfficientNetB3(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb4':
        base_model = tf.keras.applications.EfficientNetB4(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb5':
        base_model = tf.keras.applications.EfficientNetB5(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb6':
        base_model = tf.keras.applications.EfficientNetB6(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'efficientnetb7':
        base_model = tf.keras.applications.EfficientNetB7(include_top=False, weights='imagenet', input_tensor=input_tensor)
    elif model_type == 'mobilenetV3Large':
        base_model = tf.keras.applications.MobileNetV3Large(include_top=False, weights='imagenet', input_tensor=input_tensor)
        
    x = base_model.output  
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)    
    preds = Dense(units=n_classes, activation='softmax')(x)
    model = Model(inputs=input_tensor, outputs=preds)
    
    return model
---------------------------------------------------------------------------------------------------------------
from tensorflow.keras.applications.efficientnet import preprocess_input as eff_preprocess_input
from tensorflow.keras.applications.xception import preprocess_input as xcp_preprocess_input
import tensorflow as tf

# learning rate scheduler에 적용할 함수 선언. 
def lrfn_01(epoch):
    LR_START = 1e-5
    LR_MAX = 1e-4
    LR_RAMPUP_EPOCHS = 2
    LR_SUSTAIN_EPOCHS = 1
    LR_STEP_DECAY = 0.75
    
    def calc_fn(epoch):
        if epoch < LR_RAMPUP_EPOCHS:
            lr = (LR_MAX - LR_START) / LR_RAMPUP_EPOCHS * epoch + LR_START
        elif epoch < LR_RAMPUP_EPOCHS + LR_SUSTAIN_EPOCHS:
            lr = LR_MAX
        else:
            lr = LR_MAX * LR_STEP_DECAY**((epoch - LR_RAMPUP_EPOCHS - LR_SUSTAIN_EPOCHS)//2)
        return lr
    
    return calc_fn(epoch)

def lrfn_02(epoch):
    LR_START = 1e-6
    LR_MAX = 2e-5
    LR_RAMPUP_EPOCHS = 2
    LR_SUSTAIN_EPOCHS = 1
    LR_STEP_DECAY = 0.75
    
    def calc_fn(epoch):
        if epoch < LR_RAMPUP_EPOCHS:
            lr = (LR_MAX - LR_START) / LR_RAMPUP_EPOCHS * epoch + LR_START
        elif epoch < LR_RAMPUP_EPOCHS + LR_SUSTAIN_EPOCHS:
            lr = LR_MAX
        else:
            lr = LR_MAX * LR_STEP_DECAY**((epoch - LR_RAMPUP_EPOCHS - LR_SUSTAIN_EPOCHS)//2)
        return lr
    
    return calc_fn(epoch)

# Config에 입력할 callback 생성. 
lr01_cb = tf.keras.callbacks.LearningRateScheduler(lrfn_01, verbose=1)
lr02_cb = tf.keras.callbacks.LearningRateScheduler(lrfn_02, verbose=1)
rlr_cb = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, mode='min', verbose=1)

ely_cb = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, mode='min', verbose=1)

# Augmentor 생성. 
augmentor_01 = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.ShiftScaleRotate(scale_limit=(0.7, 0.9), p=0.3, rotate_limit=30),
    A.RandomBrightnessContrast(brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2), p=0.3),
    A.Blur(p=0.3)
])

# Config 생성. 
class Config:
    MODEL_TYPE = 'efficientnetb5'
    IMAGE_SIZE = (456, 456)
    BATCH_SIZE = 8
    N_EPOCHS = 10 # fine tuning이 아닐 경우 전체 수행 epoch 횟수
    IS_FINE_TUNING = False
    FIRST_EPOCHS = 15 # fine tuning 일 경우 첫번째 epoch 횟수
    SECOND_EPOCHS = 15 # fine tuning 일 경우 두번째 epoch 횟수
    FIRST_CALLBACKS = [lr01_cb, ely_cb] #모델 train시 적용될 callback 객체 리스트
    SECOND_CALLBACKS = [lr02_cb, ely_cb] #만일 Fine tuning 시 첫번째 학습과 두번째 학습의 Learning rate scheduler가 서로 다를 경우 사용. 
    AUGMENTOR = augmentor_01
    PRE_FUNC = eff_preprocess_input
    INITIAL_LR = 0.0001
    DEBUG = True
----------------------------------------------------------------------------------------------------------
def train_model(train_df, config=Config):
    # 학습과 검증 데이터 이미지/레이블로 분리하고 학습/검증 Dataset 생성. 
    tr_path, val_path, tr_label, val_label = get_train_valid(train_df, valid_size=0.2, random_state=2021)
    
    tr_ds = Plant_Dataset(tr_path, tr_label, image_size=config.IMAGE_SIZE, batch_size=config.BATCH_SIZE, 
                          augmentor=config.AUGMENTOR, shuffle=True, pre_func=config.PRE_FUNC)
    val_ds = Plant_Dataset(val_path, val_label, image_size=config.IMAGE_SIZE, batch_size=config.BATCH_SIZE, 
                          augmentor=None, shuffle=False, pre_func=config.PRE_FUNC)
    if config.DEBUG:
        tr_image_batch = next(iter(tr_ds))[0]
        val_image_batch = next(iter(val_ds))[0]
        print(tr_image_batch.shape, val_image_batch.shape)
        print(tr_image_batch[0], val_image_batch[0])
        
    # model_type인자로 들어온 모델 생성. optimizer Adam적용. 
    print('#######', config.MODEL_TYPE, ' 생성 및 학습 수행 ########')
    model = create_model(model_type=config.MODEL_TYPE, in_shape=(config.IMAGE_SIZE[0], config.IMAGE_SIZE[1], 3), n_classes=16)
    model.compile(optimizer=Adam(lr=config.INITIAL_LR), loss='categorical_crossentropy', metrics=[AUC(),'accuracy'])
    
    # 만일 Fine tuning 일 경우 아래 로직 적용. 
    if config.IS_FINE_TUNING:
        print('####### Fine tuning 학습을 시작합니다. ########')
        # 첫번째 Fine Tuning. Feature Extractor를 제외한 classification layer를 학습.(Feature Extractor layer들을 trainable=False 설정)
        for layer in model.layers[:-4]:
            layer.trainable = False
        
        print('####### Classification Layer들의 학습을 시작합니다. ########')
        history = model.fit(tr_ds, epochs=config.FIRST_EPOCHS, steps_per_epoch=int(np.ceil(tr_path.shape[0]/config.BATCH_SIZE)), 
                           validation_data=val_ds, validation_steps=int(np.ceil(val_path.shape[0]/config.BATCH_SIZE)),
                           callbacks=(config.FIRST_CALLBACKS), verbose=1)
        
        # 두번째, 전체 Layer를 학습. 전체 layer를 trainable=True로 수정. 모델이 EfficientNet 계열일 경우 Batch Normalization layer는 학습 제외. 
        for layer in model.layers:
            if config.MODEL_TYPE in 'efficientnet':
                if not isinstance(layer, layers.BatchNormalization):
                    layer.trainable = True
            else:
                layer.trainable = True
        
        print('####### 전체 Layer들의 학습을 시작합니다. ########')
        history = model.fit(tr_ds, epochs=config.SECOND_EPOCHS, steps_per_epoch=int(np.ceil(tr_path.shape[0]/config.BATCH_SIZE)), 
                           validation_data=val_ds, validation_steps=int(np.ceil(val_path.shape[0]/config.BATCH_SIZE)),
                           callbacks=(config.SECOND_CALLBACKS), verbose=1)
    
    # Fine Tuning이 아닐 경우 
    else:
        print('####### 학습을 시작합니다. ########')
        history = model.fit(tr_ds, epochs=config.N_EPOCHS, steps_per_epoch=int(np.ceil(tr_path.shape[0]/config.BATCH_SIZE)), 
                       validation_data=val_ds, validation_steps=int(np.ceil(val_path.shape[0]/config.BATCH_SIZE)),
                       callbacks=(config.FIRST_CALLBACKS), verbose=1)
        
    return model, history
--------------------------------------------------------------------------------------------------------------
efficientnetB5, history = train_model(train_df, config=Config)