인프런 커뮤니티 질문&답변

포팃님의 프로필 이미지
포팃

작성한 질문수

실전! FastAPI 입문

(실습) 회원가입 알림 - Background Task

INFO sqlalchemy.engine.Engine ROLLBACK

해결된 질문

작성

·

55

·

수정됨

0

swagger 로 opt 생성, 검증 api 실행을 했더니
sqlalchemy.engine.Engine ROLLBACK 로그가 출력이 됩니다. 원인을 모르겠습니다...

  • 로그

INFO:     Application startup complete.
INFO:     127.0.0.1:63654 - "GET /docs HTTP/1.1" 200 OK
email-validator not installed, email fields will be treated as str.
To install, run: pip install email-validator
INFO:     127.0.0.1:63654 - "GET /openapi.json HTTP/1.1" 200 OK
2024-10-31 14:25:27,367 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2024-10-31 14:25:27,367 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-10-31 14:25:27,369 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2024-10-31 14:25:27,369 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-10-31 14:25:27,369 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2024-10-31 14:25:27,370 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-10-31 14:25:27,372 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-10-31 14:25:27,377 INFO sqlalchemy.engine.Engine SELECT user.id, user.username, user.password, todo_1.id AS id_1, todo_1.contents, todo_1.is_done, todo_1.user_id 
FROM user LEFT OUTER JOIN todo AS todo_1 ON user.id = todo_1.user_id 
WHERE user.username = %(username_1)s
2024-10-31 14:25:27,377 INFO sqlalchemy.engine.Engine [generated in 0.00018s] {'username_1': 'admin'}
INFO:     127.0.0.1:63658 - "POST /users/log-in HTTP/1.1" 200 OK
2024-10-31 14:25:27,603 INFO sqlalchemy.engine.Engine ROLLBACK
INFO:     127.0.0.1:63664 - "POST /users/email/otp HTTP/1.1" 200 OK
2024-10-31 14:26:02,514 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-10-31 14:26:02,514 INFO sqlalchemy.engine.Engine SELECT user.id, user.username, user.password, todo_1.id AS id_1, todo_1.contents, todo_1.is_done, todo_1.user_id 
FROM user LEFT OUTER JOIN todo AS todo_1 ON user.id = todo_1.user_id 
WHERE user.username = %(username_1)s
2024-10-31 14:26:02,514 INFO sqlalchemy.engine.Engine [cached since 35.14s ago] {'username_1': 'admin'}
INFO:     127.0.0.1:63671 - "POST /users/email/verify HTTP/1.1" 200 OK
Sending email to admin@fastapi.com!
2024-10-31 14:26:12,519 INFO sqlalchemy.engine.Engine ROLLBACK

 

from typing import List, Optional

from fastapi import Depends
from sqlalchemy import select, delete
from sqlalchemy.orm import Session

from database.connection import get_db
from database.orm import ToDo, User


class ToDoRepository:
    def __init__(self, session: Session = Depends(get_db)):
        self.session = session

    def get_todos(self) -> List[ToDo]:
        return list(self.session.scalars(select(ToDo)))

    def get_todo_by_todo_id(self, todo_id: int) -> ToDo | None:
        return self.session.scalar(select(ToDo).where(ToDo.id == todo_id))

    def create_todo(self, todo: ToDo) -> ToDo:
        self.session.add(instance=todo)
        self.session.commit()    
        self.session.refresh(instance=todo) 
        return todo 

    
    def update_todo(self, todo: ToDo) -> ToDo:
        self.session.add(instance=todo)
        self.session.commit()
        self.session.refresh(instance=todo)
        return todo

    
    def delete_todo(self, todo_id: int) -> None:
        self.session.execute(delete(ToDo).where(ToDo.id == todo_id))
        self.session.commit()

class UserRepository:
    def __init__(self, session: Session = Depends(get_db)):
        self.session = session

    
    def get_user_by_username(self, username: str) -> User | None:
        return self.session.scalar(select(User).where(User.username == username))

    
    def save_user(self, user: User) -> User:
        self.session.add(instance=user)
        self.session.commit()    
        self.session.refresh(instance=user)
        return user

 

import random
import time

import bcrypt
from datetime import datetime, timedelta
from jose import jwt


class UserService:
    encoding: str = "UTF-8"
    JWT_SECRET_KEY: str = "f002393019e8776398370aa671767b860b702854724591cd0da5fc97bda3daf1"
    JWT_ALGORITHM: str = "HS256"

    def hash_password(self, plain_password: str) -> str:
        hashed_password: bytes = bcrypt.hashpw(
            plain_password.encode(self.encoding),
            salt=bcrypt.gensalt()
        )
        return hashed_password.decode(self.encoding)

    def verify_password(
            self, plain_password: str, hashed_password: str
    ) -> bool:
       
        return bcrypt.checkpw(
            plain_password.encode(self.encoding),
            hashed_password.encode(self.encoding)
        )

    def creat_jwt(self, username: str) -> str:
        return jwt.encode(
            {
                "sub": username,
                "exp": datetime.now() + timedelta(days=1), 
            }, self.JWT_SECRET_KEY, algorithm=self.JWT_ALGORITHM
        )

    def decode_jwt(self, access_token: str) -> str:
        payload: dict = jwt.decode(
            access_token,
            self.JWT_SECRET_KEY,
            algorithms=[self.JWT_ALGORITHM]
        )

        return payload["sub"] 

    @staticmethod 
    def create_otp() -> int:
        return random.randint(1000, 9999)

    @staticmethod
    def send_email_to_user(email: str) -> None:
        time.sleep(10)
        print(f"Sending email to {email}!")

 

from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks

from cache import redis_client
from database.orm import User
from database.repository import UserRepository
from schema.request import SignUpRequest, LoginRequest, CreateOTPRequest, VerifyOTPRequest
from schema.response import UserSchema, JWTResponse
from security import get_access_token
from service.user import UserService

router = APIRouter(prefix="/users", tags=["USER"])

@router.post("/sign-up", status_code=201)
def user_sign_up_handler(
        request: SignUpRequest,
        user_service: UserService = Depends(),
        user_repository: UserRepository = Depends(),
):
    
    hashed_password: str = user_service.hash_password(
        plain_password=request.password
    )
    
    user: User = User.create(
        username=request.username,
        hashed_password=hashed_password
    )
    
    user: User = user_repository.save_user(user)
    return UserSchema.from_orm(user)

@router.post("/log-in", status_code=200)
def user_log_in_handler(
    request: LoginRequest,
    user_service: UserService = Depends(),
    user_repository: UserRepository = Depends(),
):
   
    user: User | None  = user_repository.get_user_by_username(
        username=request.username
    )
    if not user:
        raise HTTPException(status_code=404, detail="User Not Found")

    verified: bool = user_service.verify_password(
        plain_password=request.password, hashed_password=user.password
    )
    if not verified:
        raise HTTPException(status_code=401, detail="Not Authorized")

    access_token: str = user_service.creat_jwt(username=user.username)

    return JWTResponse(access_token=access_token)

@router.post("/email/otp")
def create_otp_handler(
    request: CreateOTPRequest,
    _: str = Depends(get_access_token), 
    user_service: UserService = Depends()
):
   
    otp: int = user_service.create_otp()

    redis_client.set(request.email, otp)
    redis_client.expire(request.email, 3 * 60) 
    return {"otp": otp}

@router.post("/email/verify")
def verify_otp_handler(
    request: VerifyOTPRequest,
    background_tasks: BackgroundTasks,
    access_token: str = Depends(get_access_token), 
    user_service: UserService = Depends(),
    user_repo: UserRepository = Depends(),
):

    otp: str | None = redis_client.get(request.email)
    if not otp:
        raise HTTPException(status_code=400, detail="Bad Request")

    if request.otp != int(otp):
        raise HTTPException(status_code=400, detail="Bad Request")

    username: str = user_service.decode_jwt(access_token=access_token)
    user: User | None = user_repo.get_user_by_username(username)
    if not user:
        raise HTTPException(status_code=404, detail="User Not Found")

    background_tasks.add_task(
        user_service.send_email_to_user,
        email="admin@fastapi.com"
    )

    return UserSchema.from_orm(user)

 

 

 

답변 1

0

신동현님의 프로필 이미지
신동현
지식공유자

안녕하세요. 별도로 rollback()을 호출하지 않더라도 sqlalchemy에 의해 session이 모두 사용되고 close()가 호출되는 시점에 자동적으로 rollback()이 호출됩니다.

https://docs.sqlalchemy.org/en/20/orm/session_basics.html#closing

포팃님의 프로필 이미지
포팃

작성한 질문수

질문하기