김영한의 실전 자바 - 고급 1편(생산자 소비자 문제: Object - wait/notify, ReentarantLock - await/signal, BlockingQueue)

2025. 1. 8. 23:12·김영한의 실전 자바 - 고급 1편

생산자 소비자 문제가 있다고 가정하자. 생산자는 p1, p2, p3이 있고 소비자는 c1, c2, c3이 있으며 데이터 버퍼는 2개가 최대다. 생산자 3명이 생산 이후 소비자 3명이 소비를 시도하게 되면 버퍼 용량 부족으로 데이터가 하나 유실 된다. 이를 멀티스레딩 환경에서 해결할 수 있는 방법이 무엇이 있는가. 아래 그림을 활용해 설명하시오.

  • 멀티 스레딩 환경이므로 put과 take 함수는 synchronized로 lock을 통해서만 접근 가능하도록 설정해야 한다.
  • 버퍼가 다 찼을 때 데이터 유실을 방지하고자 버퍼가 다 찬 경우 put 함수에서 while문과 sleep을 활용해 버퍼가 빌 때 까지 대기하는 방법은 lock을 계속 잡고 있어 다른 스레드가 take를 할 수 없는 상황이 벌어진다.
  • 따라서, put 함수에서 만약 버퍼가 꽉 찬 경우 Object.wait() 함수를 호출해 lock을 획득할 때 까지 기다리고 추후 lock 획득 및 사용 완료 시 lock을 반환하고 Runnable 상태에서 Waiting 상태로 변환된 후 스레드 대기 집합으로 들어가도록 한다.
  • take 함수에서는 queue에서 데이터를 소비한 이후에는 Object.notify() 함수를 호출해 스레드 대기 집합에 있는 스레드가 깨어나도록 한다.
  • 스레드 대기 집합에서 깨어난 스레드는 Waiting에서 Runnable 상태가 되며, 다시 lock을 점유하게 되면 put을 할 수 있게 된다.

 

 

만약 위의 예제에서 소비자 3개가 먼저 실행된 이후 생산자 3개가 실행되면 발생할 수 있는 wait,  notify의 한계는 무엇인가?(take, put 각각 함수에 wait, notify 세팅은 되어있다)

  • c1, c2, c3이 먼저 실행되면 c1, c2, c3은 소비할 데이터가 버퍼에 없기 때문에 wait 함수를 호출하고 스레드 대기 집합에 들어가게 됨
  • p1이 put 을 실행하며 데이터를 버퍼에 넣고 나서 notify()함수를 호출하게 되면 c1, c2, c3중에 하나가 깨어나고 데이터를 소비하게 된다.
  • 데이터를 소비하고 나면 소비자도 notify() 함수를 호출하기 때문에 또 다른 소비자가 깨어나게 되는데, 이 소비자는 버퍼에서 소비할 데이터가 없어 다시 wait()을 호출하고 스레드 대기 집합에 들어가게 된다.
  • 이와 같이, notify() 함수는 내가 원하는 스레드만 지정해서 깨울 수 있는 기능은 없어 불필요한 스레드를 깨우는 비효율성을 야기할 수 있다는 단점이 있다.

 

 

위 예제의 비효율성을 ReentarantLock을 통해서 어떻게 해결할 수 있는가?

  • Synchronized와 함께 Object.wait(), Object.notify() 함수를 사용하며 발생하는 비효율성 문제는 결국 어떤 스레드를 깨울지 결정할 수 없다는 것이 문제임
  • ReentarantLock에서 producer, consumer에 대한 두 condition을 정의하고 await()으로 대기해야 할 때는 각자의 condition에서 대기를 하고, 상대방을 깨워야 할 때는 상대방 condition에서 signal()을 호출하면 됨.
  • 즉, 기존에 Synchronized를 사용하는 방식은 스레드들이 wait을 통해 대기하는 곳이 스레드 대기 집합 하나라서 누가 깨어날 지 알수가 없었지만 ReentarantLock을 사용하면 스레드가 대기하는 집합을 여러 개 둬 역할별로 대기 공간을 따로 둬 어디 대기 집합을 깨울지를 결정할 수 있다는 것.
public class BoundedQueueV5 implements BoundedQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition();
    private final Condition consumerCond = lock.newCondition();
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV5(int max) {
        this.max = max;
    }

    @Override
    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
                log("[put] 큐가 가득 참, 생산자 대기");
                try {
                    producerCond.await();
                    log("[put] 생산자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
            consumerCond.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("[take] 큐에 데이터가 없음, 소비자 대기");
                try {
                    consumerCond.await();
                    log("[take] 소비자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            String data = queue.poll();
            log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
            producerCond.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

 

 

위와 같은 기능을 Java에서 제공해주는 라이브러리에 대해 설명하시오.

  • java.util.concurrent.BlockingQueue 라는 인터페이스와 그 구현체들이 있으며 ArrayBlockingQueue는 배열 기반 고정 크기 버퍼, LinkedBlockingQueue는 링크 기반 무한 버퍼로 구현되어 있다.
  • Queue가 가득 찼다고 해서 계속 대기하거나 Queue가 비었다고 계속 대기하는 건 또 성능상 문제가 있으며, 이는 상황에 따라 다른 작업을 해야할 수 있다.
  • 예를 들어, 큐가 가득 찼을 때 생각할 수 있는 선택지는 4가지가 있다.
    • 예외를 던진다.
    • 예외를 받아서 처리한다.
    • 대기하지 않고 즉시 false 를 반환한다.
    • 대기한다. 특정 시간 만큼만 대기한다.
  • 따라서 BlockingQueue는 이에 대해 아래와 같은 함수들을 제공한다.

정리된 큐 메서드 분류

분류 메서드 예시
Throws Exception add(e), remove(), element()
Special Value offer(e), poll(), peek()
Blocks put(e), take()
Times Out offer(e, time, unit), poll(time, unit)

 

1. Throws Exception (예외 발생)

큐가 가득 차거나 비어 있는 상황에서 예외를 던지는 메서드들입니다.

메서드설명예외 발생 조건

add(e) 지정된 요소를 큐에 추가합니다. 큐가 가득 차면 IllegalStateException 예외 발생
remove() 큐에서 요소를 제거하고 반환합니다. 큐가 비어 있으면 NoSuchElementException 예외 발생
element() 큐의 머리 요소를 반환하지만, 큐에서 제거하지는 않습니다. 큐가 비어 있으면 NoSuchElementException 예외 발생

2. Special Value (특수 값 반환)

큐가 가득 차거나 비어 있는 상황에서 예외 대신 특수 값을 반환하는 메서드들입니다.

메서드 설명 반환 값
offer(e) 지정된 요소를 큐에 추가하려고 시도합니다. 큐가 가득 차면 false 반환
poll() 큐에서 요소를 제거하고 반환합니다. 큐가 비어 있으면 null 반환
peek() 큐의 머리 요소를 반환하지만, 큐에서 제거하지는 않습니다. 큐가 비어 있으면 null 반환

3. Blocks (대기)

큐가 가득 차거나 비어 있는 경우, 조건이 충족될 때까지 대기하는 메서드들입니다.

메서드 설명 동작 방식
put(e) 지정된 요소를 큐에 추가할 때까지 대기합니다. 큐가 가득 차면 공간이 생길 때까지 대기
take() 큐에서 요소를 제거하고 반환합니다. 큐가 비어 있으면 요소가 준비될 때까지 대기

4. Times Out (시간 대기)

지정된 시간 동안만 대기하고, 시간이 초과되면 동작을 중지하는 메서드들입니다.

메서드 설명 반환 값
offer(e, time, unit) 지정된 요소를 큐에 추가하려고 시도하며, 지정된 시간 동안 대기합니다. 시간이 초과되면 false 반환
poll(time, unit) 큐에서 요소를 제거하고 반환합니다. 시간이 초과되면 중지하고 null을 반환합니다. 시간이 초과되면 null 반환

 

BlockingQueue를 적용하면 코드는 아래와 같이 변한다.

package thread.bounded;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * BoundedQueueV6_1 implements a bounded queue using ArrayBlockingQueue.
 */
public class BoundedQueueV6_1 implements BoundedQueue {

    private final BlockingQueue<String> queue;

    /**
     * Constructs a BoundedQueue with the specified maximum capacity.
     *
     * @param max the maximum number of elements the queue can hold
     */
    public BoundedQueueV6_1(int max) {
        queue = new ArrayBlockingQueue<>(max);
    }

    /**
     * Adds the specified element to the queue, waiting if necessary for space to become available.
     *
     * @param data the element to add to the queue
     */
    public void put(String data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    /**
     * Retrieves and removes the head of the queue, waiting if necessary until an element becomes available.
     *
     * @return the head of the queue
     */
    public String take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    /**
     * Returns a string representation of the queue.
     *
     * @return a string representing the queue
     */
    @Override
    public String toString() {
        return queue.toString();
    }
}

 

그리고, put 함수를 예제로 들면 아래와 같이 구현되어 있다.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * A simplified version of ArrayBlockingQueue implementation.
 * It uses a fixed-size array and ReentrantLock with Conditions for thread synchronization.
 */
public class ArrayBlockingQueue<E> {

    private final Object[] items; // Array to hold queue elements
    private int count;            // Current number of elements in the queue
    private int putIndex;         // Index for inserting new elements

    private final ReentrantLock lock;          // Lock for synchronizing access
    private final Condition notEmpty;          // Condition to wait when the queue is empty
    private final Condition notFull;           // Condition to wait when the queue is full

    /**
     * Constructor to initialize the queue with a fixed capacity.
     *
     * @param capacity the maximum number of elements the queue can hold
     */
    public ArrayBlockingQueue(int capacity) {
        this.items = new Object[capacity];
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.notFull = lock.newCondition();
    }

    /**
     * Inserts the specified element into the queue, waiting if necessary for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     */
    public void put(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                notFull.await(); // Wait until space is available
            }
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    /**
     * Helper method to add the element to the queue.
     *
     * @param e the element to add
     */
    private void enqueue(E e) {
        items[putIndex] = e;
        putIndex = (putIndex + 1) % items.length;
        count++;
        notEmpty.signal(); // Signal that the queue is no longer empty
    }
}

 

'김영한의 실전 자바 - 고급 1편' 카테고리의 다른 글

김영한의 실전 자바 - 고급 1편(동시성 컬렉션)  (0) 2025.01.11
김영한의 실전 자바 - 고급 1편(CAS)  (1) 2025.01.11
김영한의 실전 자바 - 고급 1편(volatile, synchronized, LockSupport, ReentrantLock)  (1) 2025.01.05
김영한의 실전 자바 - 고급 1편(스레드 생명주기, join, interrupt, yield)  (2) 2025.01.04
김영한의 실전 자바 - 고급 1편(프로세스, 스레드)  (3) 2025.01.02
'김영한의 실전 자바 - 고급 1편' 카테고리의 다른 글
  • 김영한의 실전 자바 - 고급 1편(동시성 컬렉션)
  • 김영한의 실전 자바 - 고급 1편(CAS)
  • 김영한의 실전 자바 - 고급 1편(volatile, synchronized, LockSupport, ReentrantLock)
  • 김영한의 실전 자바 - 고급 1편(스레드 생명주기, join, interrupt, yield)
5jyan5
5jyan5
  • 5jyan5
    jyan
    5jyan5
  • 전체
    오늘
    어제
    • 분류 전체보기 (242)
      • 김영한의 스프링 핵심 원리(기본편) (8)
      • 김영한의 스프링 핵심 원리 - 고급편 (11)
      • 김영한의 스프링 MVC 1편 (1)
      • 김영한의 스프링 DB 1편 (3)
      • 김영한의 스프링 MVC 2편 (3)
      • 김영한의 ORM 표준 JPA 프로그래밍(기본편) (9)
      • 김영한의 스프링 부트와 JPA 활용2 (2)
      • 김영한의 실전 자바 - 중급 1편 (1)
      • 김영한의 실전 자바 - 고급 1편 (9)
      • 김영한의 실전 자바 - 고급 2편 (9)
      • Readable Code: 읽기 좋은 코드를 작성.. (2)
      • 김영한의 실전 자바 - 고급 3편 (9)
      • CKA (118)
      • 개발 (37)
      • 경제 (4)
      • 리뷰 (1)
      • 정보 (2)
  • 블로그 메뉴

    • 링크

    • 공지사항

    • 인기 글

    • 태그

      jpq
      자바
      양방향 맵핑
      gesingleresult
      @within
      reentarantlock
      Target
      requset scope
      페치 조인
      조회 성능 최적화
      WAS
      김영한
      고급
      JPQL
      단방향 맵핑
      버퍼
      typequery
      스레드
      log trace
      @args
      @discriminatorcolumn
      jdk 동적 프록시
      락
      빈 후처리기
      @discriminatorvalue
      Thread
      cglib
      프록시
      프록시 팩토리
      hibernate5module
    • 최근 댓글

    • 최근 글

    • hELLO· Designed By정상우.v4.10.2
    5jyan5
    김영한의 실전 자바 - 고급 1편(생산자 소비자 문제: Object - wait/notify, ReentarantLock - await/signal, BlockingQueue)
    상단으로

    티스토리툴바