강의

멘토링

커뮤니티

인프런 커뮤니티 질문&답변

Green Lee님의 프로필 이미지
Green Lee

작성한 질문수

[매일 완독 챌린지] 저자와 함께하는 <FastAPI로 기획에서 출시까지>

8.3 ~ 8.4절 : 타임슬롯 관리와 예약하기 API 구현하기 (p299~p321)

4주 2회차 과제

해결된 질문

작성

·

30

0

안녕하세요.

 

과제를 잘 못 이해한건가 싶기도 하지만 아래와 같은 조건하에 구현을 해 봤습니다.

 

수업 과정에서 Front-end는 고정인 상황이기 때문에 기존 API는 변경이 없다는 가정하에 변경 해 봤습니다.

  1. 시간대1개와 요일n개의 설정이 하나의 row로 묶여있는 상태로는 요일별 시간대 변동에 대응하기 까다로운 것 같아 요일별 개별 row로 저장하도록 일부 수정 하였습니다. API endpoint 모델 변경을 피하기 위해서 내부적으로 여러개의 요일별 row로 나누어 저장하도록 했습니다.

     

  2. 고민중에 작가님께서 올려 놓으신 레퍼런스 코드를 봤는데, 책에서 언급하신 것 처럼 postgres 경우와 sqlite 경우로 코드가 분기 되는 것을 보았고, 지금은 교육 과정이기 때문이라고 생각 하지만 배포 코드와 개발 환경 코드가 다른것은 여러모로 좋지 않은 것 같아 현재는 sqlite 기준으로 개별 DB구현에서만 지원하는 것은 배제하는 방향으로 구현 했습니다.

     

  3. 1과 같이 변경 함으로써, 기 등록된 정보의 수정에서는 개별 날짜 별 시간대 설정 면에서 자유도가 생겼다고 생각 됩니다.

  4. 타임슬롯의 변경과 삭제에 관해서는 내부적으로는 time-slot으로 관리하지만, 외부공개 id는 아니므로, 키 로서 start-time, end-time, weekday (API형태로서는 리스트)조합으로 정의 해서 동작 하도록 구현했습니다.

  5. 특히 삭제의 경우는 부득이 delete method의 경우는 payload가 포함되는것이 받아들여지지 않는 경우도 있는것을 고려해서, 새 등록 값 두가지(new_start_time과, new_end_time) 값이 모두 제공되고 같은 경우를 특정하여 삭제 동작으로 정의하여 동작하도록 구현했습니다.

 

class TimeSlotUpdateByGroupIn(SQLModel):
    start_time: time
    end_time: time
    weekdays: Weekdays
    new_start_time: time | None = None
    new_end_time: time | None = None
    new_weekdays: Weekdays | None = None

    @model_validator(mode="after")
    def check_update_fields(self):
        update_fields = {
            "new_start_time": self.new_start_time,
            "new_end_time": self.new_end_time,
            "new_weekdays": self.new_weekdays,
        }
        if not any(value is not None for value in update_fields.values()):
            raise ValueError("최소 하나의 수정 필드는 반드시 제공되어야 합니다.")
        return self

 

@router.post("/time-slots", status_code=status.HTTP_201_CREATED, response_model=TimeSlotOut)
async def create_time_slot(
    user: CurrentUserDep,
    session: DbSessionDep,
    payload: TimeSlotCreateIn,
) -> TimeSlotOut:
    if not user.is_host:
        raise GuestPermissionError()

    weekdays = sorted(set(payload.weekdays))

    # dup. check with already exist one
    stmt = select(TimeSlot).where(
        and_(
            TimeSlot.calendar_id == user.calendar.id,
            TimeSlot.weekday.in_(weekdays),
            TimeSlot.start_time < payload.end_time,
            TimeSlot.end_time > payload.start_time,
        )
    )
    result = await session.execute(stmt)
    existing_time_slot = result.scalars().first()
    if existing_time_slot:
        raise TimeSlotOverlapError()

    time_slots = [
        TimeSlot(
            calendar_id=user.calendar.id,
            start_time=payload.start_time,
            end_time=payload.end_time,
            weekday=weekday,
        )
        for weekday in weekdays
    ]

    session.add_all(time_slots)
    await session.commit()

    if not time_slots:
        raise TimeSlotOverlapError()

    return TimeSlotOut(
        start_time=time_slots[0].start_time,
        end_time=time_slots[0].end_time,
        weekdays=weekdays,
        created_at=time_slots[0].created_at,
        updated_at=time_slots[0].updated_at,
    )


@router.get(
    "/time-slots/{host_username}",
    status_code=status.HTTP_200_OK,
    response_model=list[TimeSlotOut],
)
async def get_host_timeslots(
    host_username: str,
    session: DbSessionDep,
) -> list[TimeSlotOut]:
    stmt = (
        select(User)
        .where(User.username == host_username)
        .where(User.is_host.is_(true()))
    )
    result = await session.execute(stmt)
    host = result.scalar_one_or_none()
    if host is None or host.calendar is None:
        raise HostNotFoundError()
    
    stmt = select(TimeSlot).where(TimeSlot.calendar_id == host.calendar.id)
    result = await session.execute(stmt)
    time_slots = result.scalars().all()

    grouped: dict[tuple[time, time], TimeSlotOut] = {}
    for time_slot in time_slots:
        key = (time_slot.start_time, time_slot.end_time)
        if key not in grouped:
            grouped[key] = TimeSlotOut(
                start_time=time_slot.start_time,
                end_time=time_slot.end_time,
                weekdays=[time_slot.weekday],
                created_at=time_slot.created_at,
                updated_at=time_slot.updated_at,
            )
            continue

        grouped[key].weekdays.append(time_slot.weekday)
        if time_slot.created_at < grouped[key].created_at:
            grouped[key].created_at = time_slot.created_at
        if time_slot.updated_at > grouped[key].updated_at:
            grouped[key].updated_at = time_slot.updated_at

    if not grouped:
        raise TimeSlotNotFoundError()
    
    for time_slot in grouped.values():
        time_slot.weekdays = sorted(set(time_slot.weekdays))

    return list(grouped.values())

@router.patch(
    "/time-slots",
    status_code=status.HTTP_200_OK,
    response_model=TimeSlotOut | None,
)
async def update_time_slot(
    user: CurrentUserDep,
    session: DbSessionDep,
    payload: TimeSlotUpdateByGroupIn,
) -> TimeSlotOut | None:
    if not user.is_host:
        raise GuestPermissionError()

    current_weekdays = sorted(set(payload.weekdays))
    stmt = select(TimeSlot).where(
        and_(
            TimeSlot.calendar_id == user.calendar.id,
            TimeSlot.start_time == payload.start_time,
            TimeSlot.end_time == payload.end_time,
        )
    )
    result = await session.execute(stmt)
    current_slots = result.scalars().all()
    if not current_slots:
        raise TimeSlotNotFoundError()

    existing_weekdays = sorted({slot.weekday for slot in current_slots})
    if existing_weekdays != current_weekdays:
        raise TimeSlotNotFoundError()

    delete_only = (
        payload.new_start_time is not None
        and payload.new_end_time is not None
        and payload.new_start_time == payload.new_end_time
    )

    if delete_only:
        current_ids = [slot.id for slot in current_slots]
        await session.execute(delete(TimeSlot).where(TimeSlot.id.in_(current_ids)))
        await session.commit()
        return None

    new_start_time = payload.new_start_time or payload.start_time
    new_end_time = payload.new_end_time or payload.end_time
    new_weekdays = sorted(set(payload.new_weekdays or payload.weekdays))

    if new_start_time >= new_end_time:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail="시작 시간은 종료 시간보다 빨라야 합니다.",
        )

    current_ids = [slot.id for slot in current_slots]
    stmt = select(TimeSlot).where(
        and_(
            TimeSlot.calendar_id == user.calendar.id,
            TimeSlot.weekday.in_(new_weekdays),
            TimeSlot.start_time < new_end_time,
            TimeSlot.end_time > new_start_time,
            TimeSlot.id.not_in(current_ids),
        )
    )
    result = await session.execute(stmt)
    existing_time_slot = result.scalars().first()
    if existing_time_slot:
        raise TimeSlotOverlapError()

    await session.execute(delete(TimeSlot).where(TimeSlot.id.in_(current_ids)))
    new_time_slots = [
        TimeSlot(
            calendar_id=user.calendar.id,
            start_time=new_start_time,
            end_time=new_end_time,
            weekday=weekday,
        )
        for weekday in new_weekdays
    ]
    session.add_all(new_time_slots)
    await session.commit()

    return TimeSlotOut(
        start_time=new_start_time,
        end_time=new_end_time,
        weekdays=new_weekdays,
        created_at=new_time_slots[0].created_at,
        updated_at=new_time_slots[0].updated_at,
    )

답변 1

0

한날님의 프로필 이미지
한날
지식공유자

과제 의도는 SQL 조건문을 연습하는 것인데, 색다른 접근법을 생각하셨군요! 시간 범위에 요일들이 들어가는 의존 방향을 요일에 타임슬롯이 들어가는 방식이 흥미로워요. 🙂 월요일엔 타임슬롯 1, 2, 3, 화요일엔 타임슬롯 1, 4, 5라고 인식하게 되어 생각흐름에 자연스레 맞춰져 직관적이에요.

수고하셨습니다!

Green Lee님의 프로필 이미지
Green Lee
질문자

피드백 감사합니다.

 

아무래도 제가 너무 요일별 시간관리에 집착했나봅니다 ㅎㅎ

Green Lee님의 프로필 이미지
Green Lee

작성한 질문수

질문하기