• 카테고리

    질문 & 답변
  • 세부 분야

    백엔드

  • 해결 여부

    미해결

테스트 동시성 관련 질문드립니다

23.07.23 01:10 작성 조회수 478

1

안녕하세요 강의 다 듣고 기능 추가해 가며 공부를 하고 있습니다. 그러다 막히는 부분이 있어 질문드리는데 강의 내용을 조금 벗어 나는거 같아 질문을 드려도 될지 모르겠는데 괜찮으시다면 답변 주시면 감사하겠습니다.

    @Transactional
    public OrderResponse createOrder(OrderCreateServiceRequest request, LocalDateTime registeredDateTime) {
        List<String> productCodes = request.getProductCodes();
        List<Product> products = findProductsBy(productCodes);

        Member member = memberRepository.findByPhoneNumber(request.getPhoneNumber()).get();

        deductStockQuantities(products);

        Order order = Order.create(products, member, registeredDateTime);
        return OrderResponse.of(orderRepository.save(order));
    }

order를 생성하는 부분에서 재고 감소 되는 부분을 동시성 처리를 해보려 하는데 테스트 코드에선 deductStockQuantities로 넘어가서 findAllByProductCodeIn 만 한번 돌고 롤백이 되더라구요.

    @Test
    public void create_order_with_concurrent_5_request() throws InterruptedException {
        //given
        createProducts();
        OrderCreateServiceRequest request1 = OrderCreateServiceRequest.builder()
                .productCodes(List.of("A002","A003"))
                .phoneNumber("010-1111-2222")
                .build();
        OrderCreateServiceRequest request2 = OrderCreateServiceRequest.builder()
                .productCodes(List.of("A002","A003"))
                .phoneNumber("010-1111-2222")
                .build();
        OrderCreateServiceRequest request3 = OrderCreateServiceRequest.builder()
                .productCodes(List.of("A002","A003"))
                .phoneNumber("010-1111-2222")
                .build();


        int numberOfThreads = 3;
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        CountDownLatch latch = new CountDownLatch(numberOfThreads);

        //when
        executorService.submit(() -> {
            try {
                orderService.createOrder(request1, LocalDateTime.now());
            }finally {
                latch.countDown();
            }
        });

        executorService.submit(() -> {
            try {
                orderService.createOrder(request2, LocalDateTime.now());
            }finally {
                latch.countDown();
            }
        });

        executorService.submit(() -> {
            try {
                orderService.createOrder(request3, LocalDateTime.now());
            }finally {
                latch.countDown();
            }
        });

        latch.await();

        //then
        List<Stock> stocks = stockRepository.findAllByProductCodeIn(List.of("A002","A003"));
        assertThat(stocks).hasSize(2)
                .extracting("productCode", "quantity")
                .containsExactlyInAnyOrder(
                        tuple("A002", 7),
                        tuple("A003", 7)
                );

    }
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    List<Stock> findAllByProductCodeIn(List<String> productCodes);

테스트 코드는 구글링해서 넣어보았는데 이런 부분 관련해서 따로 배운게 없어 잘 안되더라구요. 락도 걸어보고 했는데 어디서 안되는지 잘 모르겠어서 질문드립니다.

답변 1

답변을 작성해보세요.

0

안녕하세요, 김주희님! :)
추가적으로 학습하고 계신다니 대단하시네요 👍

일단 동시성 테스트를 할 때 스레드풀과 CountDownLatch를 활용하는 것은 맞습니다. 해당 코드에 문제가 있어 보이지는 않네요.

다만 질문 내용을 봤을 때 제가 정확히 어떤 부분이 문제이신지를 파악하지 못했습니다 🥲
제가 강의 중에 작성한 코드가 아니기 때문에 질문 내용만 보고는 쉽게 도움을 드리기가 어려운데요.

order를 생성하는 부분에서 재고 감소 되는 부분을 동시성 처리를 해보려 하는데 테스트 코드에선 deductStockQuantities로 넘어가서 findAllByProductCodeIn 만 한번 돌고 롤백이 되더라구요.

위 내용을 이해하지 못해서, 상세한 설명을 해주셔야 할 것 같습니다.

  1. 어떤 결과를 기대했고, 무엇이 문제였고 의도대로 동작하지 않았는지

  2. 동시성 처리를 위해 어디에 어떤 방법을 적용하신 것인지 (Lock에도 여러 종류가 있죠)

  3. 현재 테스트 코드나 프로덕션 코드에 어떤 부분을 추가적으로 작성하셨는지, 소스 전체를 올려주셔야 파악해 볼 수 있을 것 같아요. 올려주신 코드에는 많은 것들이 생략되어 있는 것 같습니다.

감사합니다. :)

김주희님의 프로필

김주희

질문자

2023.07.24

답변 감사합니다. 우선 테스트는 통과했습니다!

  1. 제가 테스트 해보고자 했던 부분은 동시에 여러 주문이 들어 왔을 때 재고가 감소하는 로직을 구현하려 했습니다.

  2. 동시성 처리를 위해선 StockRepository 에 select 부분에 비관적락을 걸어주었습니다

  3. 추가코드는 아래 작성하였습니다

OrderServiceTest

    private void createProducts() {
        Product product1 = createProduct("A001",1000);
        Product product2 = createProduct("A002",3000);
        Product product3 = createProduct("A003",5000);
        Product product4 = createProduct("A004",200000);
        Product product5 = createProduct("A005",500000);
        Product product6 = createProduct("A006",1000000);
        productRepository.saveAll(List.of(product1, product2, product3, product4, product5, product6));

        Stock stock1 = createStock("A001", 4);
        Stock stock2 = createStock("A002", 4);
        Stock stock3 = createStock("A003", 10);
        Stock stock4 = createStock("A004", 10);
        Stock stock5 = createStock("A005", 10);
        Stock stock6 = createStock("A006", 10);
        stockRepository.saveAll(List.of(stock1, stock2, stock3, stock4, stock5, stock6));

        Member member = createMember("010-1111-2222", "김주희", 29, GenderType.M);
        memberRepository.save(member);
    }

    @DisplayName("여러 쓰레드에서 주문 요청이 들어올 때, 재고 감소 로직을 검증한다")
    @Test
    void creat_order_with_concurrent_5_request() throws InterruptedException {
        //given
        CountDownLatch latch = new CountDownLatch (1);
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        executorService.execute(() -> {
            createProducts();
            latch.countDown();
        });
        latch.await();

        OrderCreateServiceRequest request1 = OrderCreateServiceRequest.builder()
                .productCodes(List.of("A001","A002","A003"))
                .phoneNumber("010-1111-2222")
                .build();
        OrderCreateServiceRequest request2 = OrderCreateServiceRequest.builder()
                .productCodes(List.of("A001","A002","A004","A005","A006"))
                .phoneNumber("010-1111-2222")
                .build();
        OrderCreateServiceRequest request3 = OrderCreateServiceRequest.builder()
                .productCodes(List.of("A001","A002","A004","A005"))
                .phoneNumber("010-1111-2222")
                .build();

        // 테스트용 로직
        Member member = memberRepository.findByPhoneNumber("010-1111-2222");

        //when
        int threadCount = 3;
        ExecutorService executorService2 = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch2 = new CountDownLatch (threadCount);

        executorService2.execute(() -> {
            orderService.createOrder(request1, OrderStatus.FOR_HERE, LocalDateTime.now());
            latch2.countDown();
        });
        executorService2.execute(() -> {
            orderService.createOrder(request2, OrderStatus.FOR_HERE, LocalDateTime.now());
            latch2.countDown();
        });
        executorService2.execute(() -> {
            orderService.createOrder(request3, OrderStatus.FOR_HERE, LocalDateTime.now());
            latch2.countDown();
        });

        latch2.await();

        //then
        List<Stock> stocks = stockRepository.findAllByProductCodeIn(List.of("A001", "A002", "A003", "A004", "A005", "A006"));
        assertThat(stocks)
                .extracting("productCode", "quantity")
                .containsExactlyInAnyOrder(
                        tuple("A001", 1),
                        tuple("A002", 1),
                        tuple("A003", 9),
                        tuple("A004", 8),
                        tuple("A005", 8),
                        tuple("A006", 9)
                );
    }
        CountDownLatch latch = new CountDownLatch (1);
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        executorService.execute(() -> {
            createProducts();
            latch.countDown();
        });
        latch.await();

이 부분에 await를 걸어 쓰레드를 멈추게 하니 테스트는 성공했습니다

OrderService

@Transactional(readOnly = true)
@RequiredArgsConstructor
@Service
public class OrderService {

    private final OrderRepository orderRepository;
    private final ProductRepository productRepository;
    private final MemberRepository memberRepository;
    private final StockRepository stockRepository;

    @Transactional
    public OrderResponse createOrder(OrderCreateServiceRequest request, OrderStatus orderStatus, LocalDateTime registeredDateTime) {
        List<String> productCodes = request.getProductCodes();
        List<Product> products = findProductsBy(productCodes);

        Member member = memberRepository.findByPhoneNumber(request.getPhoneNumber()).orElseThrow(() -> new CustomException(CodeEnum.MEMBER_NOTFOUND));

        deductStockQuantities(products);

        Order order = Order.create(products, member, orderStatus, registeredDateTime);
        return OrderResponse.of(orderRepository.save(order));
    }

    private List<Product> findProductsBy(List<String> productCodes) {
        List<Product> products = productRepository.findAllByProductCodeIn(productCodes);

        Map<String, Product> productMap = products.stream()
                .collect(Collectors.toMap(Product::getProductCode, p -> p));

        return productCodes.stream()
                .map(productMap::get)
                .collect(Collectors.toList());
    }

    private void deductStockQuantities(List<Product> products) {
        List<String> stockProductCodes = extractStockProductCodes(products);

        Map<String, Stock> stockMap = stockMapBy(stockProductCodes);
        Map<String, Long> productCountingMap = countingMapBy(stockProductCodes);

        for (String stockProductCode : new HashSet<>(stockProductCodes)) {
            Stock stock = stockMap.get(stockProductCode);
            int quantity = productCountingMap.get(stockProductCode).intValue();

            if (stock.quantityLessThan(quantity)) {
                throw new CustomException(CodeEnum.LESS_STOCK);
            }
            stock.deductQuantity(quantity);
        }
    }

    private static List<String> extractStockProductCodes(List<Product> products) {
        return products.stream()
                .map(Product::getProductCode)
                .collect(Collectors.toList());
    }

    private Map<String, Stock> stockMapBy(List<String> stockProductCodes) {
        List<Stock> stocks = stockRepository.findAllByProductCodeInWithPessimisticLock(stockProductCodes);
        return stocks.stream()
                .collect(Collectors.toMap(Stock::getCode, s -> s));
    }

    private Map<String, Long> countingMapBy(List<String> stockProductCodes) {
        return stockProductCodes.stream()
                .collect(Collectors.groupingBy(p -> p, Collectors.counting()));
    }
}
@Repository
public interface StockRepository extends JpaRepository<Stock, Long> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("select s from Stock s where s.code IN :codes")
    List<Stock> findAllByProductCodeInWithPessimisticLock(List<String> codes);

    List<Stock> findAllByProductCodeIn(List<String> productCodes);
}

처음에 질문 드렸을 당시에는 findAllByProductCodeInWithPessimisticLock(원래는 findAllByProductCodeIn) 부분이 실행되고 테스트가 결과가 나오지 않고 계속 돌고 있었습니다. 그런데 나중에 어쩌다 에러가 나왔는데 no value present라고 나오더라구요.

확인해보니 createOrder에서 findByPhoneNumber를 실행했을 때 디비에서 값을 가져오지 못해서 에러가 발생했습니다. 테스트용 로직이라고 적혀있는 findByPhoneNumber 에서는 member가 조회되는데 execute에서 실행된 createOrder 안에 findByPhoneNumber에서는 member가 제대로 조회되지 않고 있었습니다. 위에서 롤백이라고 말한 부분은 member를 조회하지 못해 테스트를 롤백하는데 latch.countDown() 은 실행되지 않아 테스트가 끝나지 않고 계속 대기 상태 였던것 같습니다.

제 생각에는 메인 쓰레드가 실행되고 아래 쓰레드가 실행되는데 메인 쓰레드에서 디비에 값을 insert 하기 전에 다른 쓰레드들이 실행되어 데이터의 싱크가 맞지 않는다 생각해 값을 insert 하는 createProducts 에 await를 거니 아래 쓰레드 에서 값을 제대로 가져와 테스트가 성공했습니다. 혹시, 이런 방식으로 해결하는게 맞을까요..?

네 주희님! 상세한 설명 감사합니다 ㅎㅎ 상황은 이해하였어요.

일단 먼저 파악해주신 내용과 테스트 실패 사유도 맞는 것 같아 보이는데요.
제가 의문을 가진 점은 createProducts()를 Async하게 실행시킬 필요가 있는가? 하는 점입니다.

Given 단계에서 상품을 생성하는 작업은 동기적으로 실행하여 상품을 준비해두고, 실제로 동시성 테스트를 하고자 하는 '주문 등록' 로직에만 스레드풀을 만들어 다수의 요청을 만들어야 의도한 테스트 상황이 될 것 같아요.
즉, createProducts() 쪽의 ExecutorService, CountDownLatch는 필요가 없을 것 같아요.
그렇게 수정하면 문제가 되는 상황이 해결될 것 같습니다 :)

강의와 별개로 추가 학습을 진행하시는 모습이 정말 대단하시네요. 언급만 드린 동시성 테스트도 시도하시다니 최고입니다 👍
추가 학습 목표하신 부분 끝까지 화이팅하시고, 또 궁금하신 점 있으시면 편하게 질문 올려주세요.
감사합니다. ☺️