🤍 전 강의 25% 할인 중 🤍

2024년 상반기를 돌아보고 하반기에도 함께 성장해요!
인프런이 준비한 25% 할인 받으러 가기 >>

  • 카테고리

    질문 & 답변
  • 세부 분야

    딥러닝 · 머신러닝

  • 해결 여부

    해결됨

train,test 분리 MF알고리즘에서 결과가 출력이 안되요 ㅠㅠ

22.06.05 18:44 작성 조회수 254

1

안녕하세요 거친코딩님.
해당 챕터에서 코드를 작성해서 돌려봤는데요~
마지막에 하셨던 full_prediction과 get_one_prediction은 잘 출력되었는데,
result=mf.test()만 출력이 안돼요 ㅠㅠ 코랩에서 체크모양은 떴는데, 결과값 출력이 안나와요..

import os
import numpy as np
import pandas as pd

base_src = 'drive/MyDrive/RecoSys/Data'
u_data_src = os.path.join(base_src,'u.data')
r_cols = ['user_id','movie_id','rating','timestamp']
ratings = pd.read_csv(u_data_src,
                      sep='\t',
                      names = r_cols,
                      encoding='latin-1')
#timestamp 제거
ratings = ratings[['user_id','movie_id','rating']].astype(int)

# train set과 test set 분리
from sklearn.utils import shuffle #데이터가 imbalance하면 계층화추출, balance하면 셔플(계층화추출하면, 표본의 대표성을 저해할 수 있기때문에)
TRAIN_SIZE = 0.75
#(사용자-영화-평점)
ratings = shuffle(ratings,random_state=2021)
cutoff = int(TRAIN_SIZE * len(ratings))
ratings_train = ratings.iloc[:cutoff]
ratings_test = ratings.iloc[cutoff:]

class NEW_MF():
  def __init__(self,ratings,hyper_params):
    self.R = np.array(ratings)
    #사용자 수(num_users)와 아이템 수 (num_items)를 받아온다.
    self.num_users,self.num_items = np.shape(self.R)
    #아래는 MF weight 조절을 위한 하이퍼 파라미터
    # K : 잠재요인(Latent Factor)의 수
    self.K = hyper_params['K']
    # alpha : 학습률
    self.alpha = hyper_params['alpha']
    # beta : 정규화 계수
    self.beta = hyper_params['beta']
    # iterations : SGD의 계산을 할 때의 반복 횟수
    self.iterations = hyper_params['iterations']
    # verbose : SGD의 학습 과정을 중간중간에 출력할 것인지에 대한 여부
    self.verbose = hyper_params['verbose']
    # 데이터가 연속적이지 않을 때, 인덱스와 맞지 않는 것을 방지하기위해 맵핑해주는 작업
    item_id_index = []
    index_item_id = []
    for i, one_id, in enumerate(ratings): #i:enumerate의 idx, one_id : movie_id
      item_id_index.append([one_id,i])
      index_item_id.append([i,one_id])
    self.item_id_index = dict(item_id_index)
    self.index_item_id = dict(index_item_id)

    user_id_index = []
    index_user_id = []
    for i, one_id, in enumerate(ratings.T):
      user_id_index.append([one_id,i])
      index_user_id.append([i,one_id])
    self.user_id_index = dict(user_id_index)
    self.index_user_id = dict(index_user_id)

  def rmse(self):
    # self.R에서 평점이 있는(0이 아닌) 요소의 인덱스를 가져온다.
    xs,ys = self.R.nonzero()
    # prediction과 error를 담을 리스트 변수 초기화
    self.predictions = []
    self.errors = []
    # 평점이 있는 요소(사용자 x, 아이템 y) 각각에 대해서 아래의 코드를 실행
    for x,y in zip(xs,ys):
      #사용자 x, 아이템 y에 대해서 평점 예측치를 get_prediction()함수를 사용해서 계산
      prediction=self.get_prediction(x,y)
      # 예측값을 예측값 리스트에 추가
      self.predictions.append(prediction)
      # 실제값(R)과 예측값의 차이(errors) 계산해서 오차값 리스트에 추가
      self.errors.append(self.R[x,y] - prediction)
    #예측값 리스트와 오차값 리스트를 numpy array형태로 변환
    self.predictions = np.array(self.predictions)
    self.errors = np.array(self.errors)
    #error를 활용해서 RMSE 도출
    return np.sqrt(np.mean(self.errors**2))

  def sgd(self):
    for i,j,r in self.samples:
      #사용자 i 아이템 j에 대한 평점 예측치 계산
      prediction = self.get_prediction(i,j)
      # 실제 평점과 비교한 오차 계산
      e = (r-prediction)

      # 사용자 평가 경향 계산 및 업데이트
      self.b_u[i] += self.alpha *(e - (self.beta * self.b_u[i]))
      # 아이템 평가 경향 계산 및 업데이트
      self.b_d[j] += self.alpha *(e - (self.beta * self.b_d[j]))

      # P 행렬 계산 및 업데이트
      self.P[i,:] += self.alpha * ((e * self.Q[j,:]) - (self.beta * self.P[i,:]))
      # Q 행렬 계산 및 업데이트
      self.Q[j,:] += self.alpha * ((e * self.P[i,:]) - (self.beta * self.Q[j,:]))

  def get_prediction(self,i,j):#평점 예측값 구하는 함수
    #전체 평점 + 사용자 평가 경향 + 아이템에 대한 평가 경향 + i번쨰 사용자의 요인과 j번째 아이템 요인의 행렬 곱
    prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i,:].dot(self.Q[j,].T) 
    return prediction
  
  # Test Set 선정
  def set_test(self,ratings_test):
    test_set = []
    for i in range(len(ratings_test)):
      x = self.user_id_index[ratings_test.iloc[i,0]] #사용자 id
      y = self.item_id_index[ratings_test.iloc[i,1]] #영화 id
      z = ratings_test.iloc[i,2] #실제 평점
      test_set.append([x,y,z])
      self.R[x,y] = 0 #안해주면 전체 데이터에 대한 트레이닝이 이루어짐
    self.test_set = test_set
    return test_set

  # Test Set RMSE 계산
  def test_rmse(self):
    error = 0
    for one_set in self.test_set:
      predicted = self.get_prediction(one_set[0],one_set[1]) #사용자 ID, 영화 ID
      error += pow(one_set[2] - predicted,2)
    return np.sqrt(error/len(self.test_set))

  def test(self):
    self.P = np.random.normal(scale=1./self.K,
                              size=(self.num_users,self.K))
    self.Q = np.random.normal(scale=1./self.K,
                              size=(self.num_items,self.K))
    self.b_u = np.zeros(self.num_users)
    self.b_d = np.zeros(self.num_items)
    self.b = np.mean(self.R[self.R.nonzero()]) #테스트셋에서 0으로 처리하여서 트레이닝 셋만

    #트레이닝 셋에 대해서 데이터셋 구성
    rows, columns = self.R.nonzero()
    self.samples = [(i,j,self.R[i,j]) for i,j in zip(rows,columns)]

    training_process = []
    for i in range(self.iterations):
      np.random.shuffle(self.samples)
      self.sgd()
      rmse1 = self.rmse()
      rmse2 = self.test_rmse()
      training_process.append((i+1,rmse1,rmse2))
      if self.verbose:
        if (i+1) % 10 == 0 :
          print('Iteration : %d ; Train RMSE = %.4f ; Test RMSE = %.4f'% (i+1,rmse1,rmse2))
      return training_process

  def get_one_prediction(self,user_id,item_id):
    return self.get_prediction(self.user_id_index[user_id],
                               self.item_id_index[item_id])
  def full_prediction(self):
    return self.b + self.b_u[:,np.newaxis] + self.b_d[np.newaxis,:] + self.P.dot(self.Q.T)

R_temp = ratings.pivot(index='user_id',
                       columns='movie_id',
                       values='rating').fillna(0)
hyper_params = {
    'K' : 30,
    'alpha' : 0.001,
    'beta' : 0.02,
    'iterations' : 100,
    'verbose' : True
}
mf = NEW_MF(R_temp, hyper_params)
test_set = mf.set_test(ratings_test)
result = mf.test()
1:1 문의하기를 이용해주세요.

답변 1

답변을 작성해보세요.

1

안녕하세요.

거친코딩입니다.

 

강의내용대로 학습자님께서 잘 따라와주셨는데,

저도 세심하게 살펴보니 2가지 문제점이 발견되었습니다.

1) get_prediction에서 연산 식 오류 self.Q[j,]  -> self.Q[j,:]

def get_prediction(self,i,j):#평점 예측값 구하는 함수
    #전체 평점 + 사용자 평가 경향 + 아이템에 대한 평가 경향 + i번쨰 사용자의 요인과 j번째 아이템 요인의 행렬 곱
    prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i,:].dot(self.Q[j,].T) 
    return prediction

2)def test(self)절에서 return indent 오류

맨끝에 return training_process의 위치를 올바르게 수정해주세요.

training_process = []
    for i in range(self.iterations):
      np.random.shuffle(self.samples)
      self.sgd()
      rmse1 = self.rmse()
      rmse2 = self.test_rmse()
      training_process.append((i+1,rmse1,rmse2))
      if self.verbose:
        if (i+1) % 10 == 0 :
          print('Iteration : %d ; Train RMSE = %.4f ; Test RMSE = %.4f'% (i+1,rmse1,rmse2))
      return training_process

감사합니다.

-거친코딩 드림-

Code_Slave님의 프로필

Code_Slave

질문자

2022.06.06

헉.. 강의 3번 정도 돌려보면서 찾아봤고 없어서 질문 드렸는데 오타가 있었네요 ㅠㅠ

코드도 길어서 번거로우셨을텐데 감사합니다..

+말씀해주신대로 수정하니 잘나오네요!!

해결하셨다니 다행입니다😊

해당 강의뿐만 아니라 뒷 강의 내용도 유익한 내용들이 많이 있으니 꼭 완강 하시길 바래요.

감사합니다.

-거친코딩 드림-

채널톡 아이콘