[Spring] 경매 입찰 & 작품 구매 동시성 문제 해결 3) Redis Lock

개요

 

[Spring] 경매 입찰/작품 구매 동시성 문제 해결 1) DB Lock

개요 GitHub - sjiwon/Advanced-Another-Art: AI 기반 작품 경매 플랫폼 (Refactoring) AI 기반 작품 경매 플랫폼 (Refactoring). Contribute to sjiwon/Advanced-Another-Art development by creating an account on GitHub. github.com 현재 리팩

sjiwon-dev.tistory.com

 

[Spring] 경매 입찰/작품 구매 동시성 문제 해결 2) MySQL Named Lock

개요 [Spring] 경매 입찰/작품 구매 동시성 문제 해결 1) DB Lock 개요 GitHub - sjiwon/Advanced-Another-Art: AI 기반 작품 경매 플랫폼 (Refactoring) AI 기반 작품 경매 플랫폼 (Refactoring). Contribute to sjiwon/Advanced-Anoth

sjiwon-dev.tistory.com

앞선 포스팅에서 Pessimistic Write Lock & MySQL Named Lock을 통해서 경매 입찰 & 작품 구매에 대한 동시성 문제를 해결하였다


Pessimistic Write Lock의 DB Record Lock으로 인해 다른 로직에 영향을 주는 것을 해결하고자 MySQL Named Lock을 적용해보았고 그 결과는 성공적이였다

하지만 MySQL Named Lock을 통해서 제어할 경우 아래와 같은 요소들을 적절히 고려해야 한다

  1. 적절한 DBCP 사이즈는 어떻게 결정할지
  2. Lock과 관련된 Connection을 어떻게 기존 로직과 분리해서 처리할지
  3. Lock을 얻고난 후 반드시 명시적 해제를 적용하였는지


휴먼 에러가 발생할 가능성이 존재하고 값비싼 DB Connection에 대한 제어를 잘못하면 전체 서비스 성능에 영향을 미칠 수 있기 때문에 더 효율적인 방안이 없나 고민을 할 필요가 있어보인다


고민을 하다가 현재 프로젝트에서는 인증번호 TTL을 관리하기 위해서 Redis를 활용하고 있다

따라서 MySQL Named Lock의 접근 방법과 동일하게 외부 어디선가 Lock을 관리해야 하고 이 영역을 Redis로 생각하고 구현해보자

 

 

Redis를 활용한 동시성 제어 1) Lettuce

implementation("org.springframework.boot:spring-boot-starter-data-redis")

spring-boot-starter-data-redis의 기본 Client로 제공되는 Lettuce는 특정 명령어를 통해서 Lock이라는 개념을 관리할 수 있다
결국 Thread가 Lock을 얻고 핵심 로직에 진입하기 위해서는 아래 2가지 절차를 Atomic하게 진행해야 한다

  1. Lock이 존재하는지 확인 (다른 쓰레드가 이미 점유하고 있는지)
  2. Lock이 프리하면 획득하고 진입
이 2가지 연산을 Atomic하게 처리하기 위해서 활용하는 명령어는 setnx(set if not exists)이다

 

@Component
@RequiredArgsConstructor
public class BidFacade {
    private static final String ACQUIRE = "1";
 
    private final StringRedisTemplate redisTemplate;
    private final BidUseCase target;
 
    public void invoke(final BidCommand command) throws InterruptedException {
        final String key = "AUCTION:" + command.auctionId();
 
        while (!tryLock(key)) {
            Thread.sleep(100); // Redis에 가하는 Spin Lock 부하를 줄이기 위해서 잠시 Sleep
        }
 
        try {
            target.invoke(command);
        } finally {
            unlock(key);
        }
    }
 
    private boolean tryLock(final String key) {
        return redisTemplate.opsForValue().setIfAbsent(key, ACQUIRE);
    }
 
    private void unlock(final String key) {
        redisTemplate.delete(key);
    }
}

  • 동시성 제어에 성공하였다

하지만 Lettuce를 활용한 위와 같은 제어 방식은 아래 2가지 단점이 존재한다

 

1. 무한 루프 대기

위의 코드를 보면 while(!tryLock(key))를 통해서 lock을 획득하려고 시도한다


그런데 만약 특정 Thread가 lock을 획득하는데 성공하고 비즈니스 로직을 진행하던 도중 Application의 의도치 않은 오류 때문에 lock을 해제하지 못했다고 가정하자

이렇게 된 순간 해당 Thread가 아닌 나머지 Thread들은 영원히 Lock을 얻지 못하고 무한 루프에 빠지게 되는 것이다


이 경우는 maxRetry 정책을 직접 구현해주면 어느정도 해결이 될듯하다

private static final int MAX_RETRY = 3; // 최대 재시도 횟수
 
public void invoke(final BidCommand command) throws InterruptedException {
    final String key = "AUCTION:" + command.auctionId();
    int retry = 0;
 
    while (!tryLock(key)) {
        if (++retry == MAX_RETRY) { // 최대 횟수 확인
            throw new RuntimeException();
        }
 
        Thread.sleep(100); // Redis에 가하는 Spin Lock 부하를 줄이기 위해서 잠시 Sleep
    }
 
    try {
        target.invoke(command);
    } finally {
        unlock(key);
    }
}

 

2. Spin Lock

무한 루프에 빠지게 되는 위험은 최대 재시도 횟수 제한을 적용함으로써 어느정도 커버가 가능하다
하지만 근본적으로 위와 같은 구현을 하게 되면 Lock을 획득할때까지 Redis에 가해지는 지속적인 부하가 Redis 서버에 부담이 될 수 있다는 것이다


위의 코드는 이러한 부하를 약간은 방지하기 위해서 Thread.sleep(100)을 통해서 재시도를 하기 위한 텀을 제공하였다
그래도 이 텀 간격은 언제든지 수정될 수 있고 요청이 많을수록, 작업이 오래 걸릴수록 이러한 부하는 점점 Redis 서버에 큰 부담이 될 것이다

 

 

Redis를 활용한 동시성 제어 2) Redisson

implementation("org.redisson:redisson-spring-boot-starter:${property("redissonVersion")}")

Lettuce와 비슷하게 Redisson도 Netty를 활용한 Non-Blocking I/O 메커니즘이다
하지만 Lettuce와 다른 점은 Lock을 관리하기 위한 구현체 자체를 제공해준다는 점이다

Redisson은 위에서 지적한 Lettuce의 2가지 단점 (무한 대기, Spin Lock)을 어떻게 해결함으로써 더 효율적으로 동작하는지 확인해보자

 

1. 무한 대기? → Timeout

RedissonLock은 획득 시점에 다음과 같은 필드 정보를 추적한다

  • waitTime = Lock을 획득할때까지 대기하는 시간
  • leaseTime = Lock이 만료되는 시간
Lock을 얻는 시점에서부터 이러한 대기 & 만료 시간을 함께 관리하기 때문에 Thread가 Lock을 얻기 위해서 무한 대기하는 상황이 발생하지 않는다

 

2. Spin Lock? → pub/sub

 

What is pub/sub? | Redisson

The pub/sub pattern is a key concept in software architecture and design for performing asynchronous communication between parties. But what is pub/sub, and how does pub/sub work in practice?

redisson.org

lock을 얻는 프로세스를 따라가보면 subscribe가 보인다

  • CompletableFuture를 활용해서 비동기적으로 구독을 진행한다
    • 이후 구독한 채널에 Lock Release와 관련된 메시지가 도달하면 그 시점에 Lock을 얻으려는 시도를 다시 진행한다
  • waitTime을 고려해서 timeout이 발생하면 예외를 발생시키고 구독하고 있는 Channel에 대한 구독을 취소한다

Redisson의 pub/sub 기반 Lock을 획득하는 프로세스는 다음과 같다

  1. tryLock을 통해서 대기 없이 Lock을 요구할 수 있고 다른 Thread와 경합이 없다면 그대로 Lock을 획득
  2. Lock을 획득하지 못했으면 이후 subscribe한 channel로부터 "Lock이 해제되었습니다"라는 메시지를 받으면 재시도
  3. waitTime이 끝날때까지 Lock을 획득하지 못하면 return false


이러한 pub/sub 기반 Lock 요청 방식으로 인해 이전 Lettuce에서 보았던 Spin Lock보다 훨씬 Redis 서버에 부하를 덜 주게 된다

 

Scripting with Lua

Executing Lua in Redis

redis.io

  • 또한 Lock을 획득하려는 시도는 루아 스크립트를 통해서 Atomic하게 연산된다

 

RedissonLock 적용

@Component
@RequiredArgsConstructor
public class BidFacade {
    private final RedissonClient redissonClient;
    private final BidUseCase target;
 
    public void invoke(final BidCommand command) throws InterruptedException {
        final String key = "AUCTION:" + command.auctionId();
        final RLock lock = redissonClient.getLock(key);
 
        try {
            if (!tryLock(lock)) {
                return; // 획득 못하면 return -> 재시도 정책 추가 가능
            }
 
            target.invoke(command);
        } finally {
            unlock(lock);
        }
    }
 
    private boolean tryLock(final RLock lock) throws InterruptedException {
        return lock.tryLock(5, 1, TimeUnit.SECONDS);
    }
 
    private void unlock(final RLock lock) {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            lock.unlock();
        } else {
            throw new RuntimeException("anonymous try unlock or timeout...");
        }
    }
}

 

 

재사용을 위한 AOP 메커니즘 활용

Redisson을 활용한 동시성 제어 로직은 경매 입찰뿐만 아니라 작품 구매에서도 활용할 예정이다
따라서 재사용성을 높이기 위해서 애노테이션 + AOP 메커니즘을 적용하는 것이 낫다고 판단된다

 

@DistributedLock

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
    String keyPrefix();
 
    String keySuffix();
 
    long waitTime() default 5000L;
 
    long leaseTime() default 3000L;
 
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
 
    boolean withInTransaction() default false;
}

keyPrefix & keySuffix로 나눈 이유는 물론 key하나로 SpEL을 통해서 문자열을 완성시킬 수 있지만 내부 문자열 구조자체가 깔끔하지 않다고 판단해서 분리하였다

  • keyPrefix = 고정적인 값
  • keySuffix = 메소드에 넘어온 인자를 활용한 가변적인 값

 

ExpressionParser

public class DistributedLockNameGenerator {
    private static final ExpressionParser expressionParser = new SpelExpressionParser();
 
    public static Object generate(
            final String prefix,
            final String key,
            final String[] parameterNames,
            final Object[] args
    ) {
        final StandardEvaluationContext context = new StandardEvaluationContext();
 
        for (int i = 0; i < parameterNames.length; i++) {
            context.setVariable(parameterNames[i], args[i]);
        }
 
        return prefix + parseKey(key, context);
    }
 
    private static Object parseKey(final String key, final StandardEvaluationContext context) {
        return expressionParser.parseExpression(key).getValue(context, Object.class);
    }
}

  • Key Prefix로 들어가는 값은 동적으로 적용되는 auctionId이기 때문에 이를 파싱하기 위한 커스텀한 Parser를 구현할 필요가 있다

 

AOP 로직

@Component
public class AopWithTransactional {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Object proceed(final ProceedingJoinPoint joinPoint) throws Throwable {
        return joinPoint.proceed();
    }
}
  • target 로직을 Transaction Scope에서 처리하기 위한 Helper
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class DistributedLockAop {
    private final RedissonClient redissonClient;
    private final AopWithTransactional aopWithTransactional;
 
    @Around("@annotation(DistributedLock)")
    public Object lock(final ProceedingJoinPoint joinPoint) {
        final MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        final Method method = signature.getMethod();
        final DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
 
        final String key = (String) DistributedLockNameGenerator.generate(
                distributedLock.keyPrefix(),
                distributedLock.keySuffix(),
                signature.getParameterNames(),
                joinPoint.getArgs()
        );
        final RLock lock = redissonClient.getLock(key);
 
        try {
            acquireLock(lock, distributedLock);
 
            log.info(
                    "Thread[{}] -> [{}] lock acquired with in transaction = {}",
                    Thread.currentThread().getName(),
                    lock.getName(),
                    distributedLock.withInTransaction()
            );
 
            if (distributedLock.withInTransaction()) {
                return aopWithTransactional.proceed(joinPoint);
            }
            return joinPoint.proceed();
        } catch (final InterruptedException e) {
            throw new RuntimeException("Interrupt occurred when acquire lock...", e);
        } catch (final Throwable e) {
            throw new RuntimeException(e);
        } finally {
            release(lock);
        }
    }
 
    private void acquireLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
        if (!tryLock(lock, distributedLock)) {
            throw new RuntimeException("Failed to acquire lock...");
        }
    }
 
    private boolean tryLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
        return lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
    }
 
    private void release(final RLock lock) {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            try {
                lock.unlock();
                log.info("Thread[{}] -> [{}] lock released", Thread.currentThread().getName(), lock.getName());
            } catch (final Throwable e) {
                log.error("Failed to release lock", e);
            }
        } else {
            log.error(
                    "[{}] Alreay unlock or timeout... -> isLocked = {} || isHeldByCurrentThread = {}",
                    Thread.currentThread().getName(),
                    lock.isLocked(),
                    lock.isHeldByCurrentThread()
            );
            throw new RuntimeException("anonymous try unlock or timeout...");
        }
    }
}

 

최종 적용

1) 입찰 로직

@UseCase
@RequiredArgsConstructor
public class BidUseCase {
    private final AuctionReader auctionReader;
    private final ArtReader artReader;
    private final MemberReader memberReader;
    private final BidInspector bidInspector;
    private final BidProcessor bidProcessor;
 
    @DistributedLock(
            keyPrefix = "AUCTION:",
            keySuffix = "#command.auctionId",
            withInTransaction = true
    )
    public void invoke(final BidCommand command) {
        final Auction auction = auctionReader.getById(command.auctionId());
        final Art art = artReader.getById(auction.getArtId());
        final Member bidder = memberReader.getById(command.memberId());
 
        bidInspector.checkBidCanBeProceed(auction, art, bidder, command.bidPrice());
        bidProcessor.execute(auction, bidder, command.bidPrice());
    }
}

2) 구매 로직

@UseCase
@RequiredArgsConstructor
public class PurchaseArtUseCase {
    private final ArtReader artReader;
    private final MemberReader memberReader;
    private final PurchaseProcessor purchaseProcessor;
 
    @DistributedLock(
            keyPrefix = "ART:",
            keySuffix = "#command.artId",
            withInTransaction = true
    )
    public void invoke(final PurchaseArtCommand command) {
        final Art art = artReader.getById(command.artId());
        final Member owner = memberReader.getById(art.getOwnerId());
        final Member buyer = memberReader.getById(command.memberId());
 
        if (art.isAuctionType()) {
            purchaseProcessor.purchaseAuctionArt(art, owner, buyer);
        } else {
            purchaseProcessor.purchaseGeneralArt(art, owner, buyer);
        }
    }
}
 
@Service
@RequiredArgsConstructor
public class PurchaseProcessor {
    private final AuctionReader auctionReader;
    private final PurchaseInspector purchaseInspector;
    private final PurchaseWriter purchaseWriter;
    private final PointRecordWriter pointRecordWriter;
    private final AssociatedPointTransactionProcessor associatedPointTransactionProcessor;
 
    @AnotherArtWritableTransactional
    public void purchaseAuctionArt(
            final Art art,
            final Member owner,
            final Member buyer
    ) {
        final Auction auction = auctionReader.getByArtId(art.getId());
        purchaseInspector.checkAuctionArt(auction, art, buyer);
        art.closeSale();
 
        final Purchase purchase = Purchase.purchaseAuctionArt(art, buyer, auction.getHighestBidPrice());
        proceedPurchase(purchase, owner, buyer);
        associatedPointTransactionProcessor.executeWithPurchaseAuction(owner, buyer, purchase.getPrice());
    }
 
    @AnotherArtWritableTransactional
    public void purchaseGeneralArt(
            final Art art,
            final Member owner,
            final Member buyer
    ) {
        purchaseInspector.checkGeneralArt(art, buyer);
        art.closeSale();
 
        final Purchase purchase = Purchase.purchaseGeneralArt(art, buyer);
        proceedPurchase(purchase, owner, buyer);
        associatedPointTransactionProcessor.executeWithPurchaseGeneral(owner, buyer, purchase.getPrice());
    }
 
    private void proceedPurchase(
            final Purchase purchase,
            final Member owner,
            final Member buyer
    ) {
        purchaseWriter.save(purchase);
        pointRecordWriter.save(
                PointRecord.addArtSoldRecord(owner, purchase.getPrice()),
                PointRecord.addArtPurchaseRecord(buyer, purchase.getPrice())
        );
    }
}
 
@Service
@RequiredArgsConstructor
public class AssociatedPointTransactionProcessor {
    @AnotherArtWritableTransactional
    public void executeWithPurchaseAuction(
            final Member owner,
            final Member buyer,
            final int price
    ) {
        // 1. 구매자 포인트 차감
        buyer.increaseAvailablePoint(price); // 입찰 시 소모한 포인트 누적 차감 문제 해결
        buyer.decreaseTotalPoint(price);
 
        // 2. 판매자 포인트 적립
        owner.increaseTotalPoint(price);
    }
 
    @AnotherArtWritableTransactional
    public void executeWithPurchaseGeneral(
            final Member owner,
            final Member buyer,
            final int price
    ) {
        // 1. 구매자 포인트 차감
        buyer.decreaseTotalPoint(price);
 
        // 2. 판매자 포인트 적립
        owner.increaseTotalPoint(price);
    }
}

 

 

Redis 분산락 Timeout & Optimistic Lock

Redis 분산락은 Timeout이 발생했지만 Application Transaction은 여전히 유효하다면?

 


이러한 케이스에서는 다시 동시성 문제가 발생할 수 있기 때문에 위와 같은 문제를 방지하기 위해서 2차적인 방어 로직으로 Optimistic Lock을 활용해서 데이터 정합성을 보장하는 것이 좋아보인다

  • CAS(Compare & Set) 연산 - OptimisticLocking을 통해서 여러 트랜잭션에서의 데이터 정합성을 보장하는 메커니즘

 

@Getter
@NoArgsConstructor(access = PROTECTED)
@Entity
@Table(name = "auction")
public class Auction extends BaseEntity<Auction> {
    @Column(name = "art_id", nullable = false, updatable = false, unique = true)
    private Long artId;
 
    @Embedded
    private Period period;
 
    @Column(name = "highest_bidder_id")
    private Long highestBidderId;
 
    @Column(name = "highest_bid_price", nullable = false)
    private int highestBidPrice;
 
    @Version
    private long version;
 
    @OneToMany(mappedBy = "auction", cascade = CascadeType.PERSIST)
    private final List<AuctionRecord> auctionRecords = new ArrayList<>();
    
    ...
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
    String keyPrefix();
 
    String keySuffix();
 
    long waitTime() default 5000L;
 
    long leaseTime() default 3000L;
 
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
 
    boolean withInTransaction() default false;
 
    /**
     * 분산락 Timeout을 고려한 Optimistic Lock Retry
     */
    int withRetry() default -1;
}
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class DistributedLockAop {
    private final RedissonClient redissonClient;
    private final AopWithTransactional aopWithTransactional;
 
    @Around("@annotation(DistributedLock)")
    public Object lock(final ProceedingJoinPoint joinPoint) {
        final MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        final Method method = signature.getMethod();
        final DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
 
        final String key = (String) DistributedLockNameGenerator.generate(
                distributedLock.keyPrefix(),
                distributedLock.keySuffix(),
                signature.getParameterNames(),
                joinPoint.getArgs()
        );
        final RLock lock = redissonClient.getLock(key);
 
        int currentRetry = 0;
        while (true) {
            try {
                if (tryMaximum(currentRetry++, distributedLock, method)) {
                    throw new RuntimeException("Retry Exception...");
                }
 
                acquireLock(lock, distributedLock);
 
                log.info(
                        "Thread[{}] -> [{}] lock acquired with in transaction = {}",
                        Thread.currentThread().getName(),
                        lock.getName(),
                        distributedLock.withInTransaction()
                );
 
                if (distributedLock.withInTransaction()) {
                    return aopWithTransactional.proceed(joinPoint);
                }
                return joinPoint.proceed();
            } catch (final ObjectOptimisticLockingFailureException e) {
                log.info(
                        "[{}] Optimistic Lock Version Miss... -> retry = {}, maxRetry = {}, withInTransaction = {}",
                        Thread.currentThread().getName(),
                        currentRetry,
                        distributedLock.withRetry(),
                        distributedLock.withInTransaction()
                );
                try {
                    Thread.sleep(100);
                } catch (final InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            } catch (final InterruptedException e) {
                throw new RuntimeException("Interrupt occurred when acquire lock...", e);
            } catch (final Throwable e) {
                throw new RuntimeException(e);
            } finally {
                release(lock);
            }
        }
    }
 
    private boolean tryMaximum(final int currentRetry, final DistributedLock distributedLock, final Method method) {
        if (distributedLock.withRetry() != -1 && distributedLock.withRetry() == currentRetry) {
            log.error("Retry Exception... -> method = {}", method);
            return true;
        }
        return false;
    }
 
    private void acquireLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
        if (!tryLock(lock, distributedLock)) {
            throw new RuntimeException("Failed to acquire lock...");
        }
    }
 
    private boolean tryLock(final RLock lock, final DistributedLock distributedLock) throws InterruptedException {
        return lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
    }
 
    private void release(final RLock lock) {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            try {
                lock.unlock();
                log.info("Thread[{}] -> [{}] lock released", Thread.currentThread().getName(), lock.getName());
            } catch (final Throwable e) {
                log.error("Failed to release lock", e);
            }
        } else {
            log.error(
                    "[{}] Alreay unlock or timeout... -> isLocked = {} || isHeldByCurrentThread = {}",
                    Thread.currentThread().getName(),
                    lock.isLocked(),
                    lock.isHeldByCurrentThread()
            );
            throw new RuntimeException("anonymous try unlock or timeout...");
        }
    }
}
@UseCase
@RequiredArgsConstructor
public class BidUseCase {
    private final AuctionReader auctionReader;
    private final ArtReader artReader;
    private final MemberReader memberReader;
    private final BidInspector bidInspector;
    private final BidProcessor bidProcessor;
 
    @DistributedLock(
            keyPrefix = "AUCTION:",
            keySuffix = "#command.auctionId",
            withInTransaction = true,
            withRetry = 3
    )
    public void invoke(final BidCommand command) {
        final Auction auction = auctionReader.getById(command.auctionId());
        final Art art = artReader.getById(auction.getArtId());
        final Member bidder = memberReader.getById(command.memberId());
 
        bidInspector.checkBidCanBeProceed(auction, art, bidder, command.bidPrice());
        bidProcessor.execute(auction, bidder, command.bidPrice());
    }
}