프로그래밍 관련/자바

30편. 스레드(Thread) (3)

LAYER6AI 2023. 4. 7. 19:54

스레드의 상태

자바의 스레드는 총 6개의 상태(NEW, RUNNABLE, WAITING, TIMED_WAITING, BLOCKED, TERMINATED)를 가지고 있습니다. 참고로 자바의 스레드는 JVM(Java Virtual Machine, 자바 가상 머신) 위에서 돌아가며, 여기에 나와 있는 상태들은 가상 머신의 상태를 말하는 것입니다. 다시 말해서, 운영체제 스레드의 상태를 나타내는 것은 아닙니다.

NEW

새로운 스레드를 만들면 NEW 상태가 됩니다. 이 상태의 스레드는 아직 시작되지 않았으며, start() 메서드를 호출하며 스레드를 시작하면 RUNNABLE 상태로 들어가게 됩니다.

Thread thread = new Thread(new ThreadA());
System.out.println(thread.getState()); // NEW

RUNNABLE

스레드를 생성하고 start() 메서드를 호출하면 NEW에서 RUNNABLE 상태로 이동합니다. 자바에서는 '실행 가능한(Runnable)' 상태와 '실행 중(Running)' 상태가 RUNNABLE로 합쳐져 있습니다.

Thread thread = new Thread(new ThreadA());
thread.start();
System.out.println(thread.getState()); // 높은 확률로 RUNNABLE

WAITING

대기 중인 스레드의 상태를 의미합니다. 아래에 있는 메서드 중 하나를 호출해서 스레드가 대기 상태에 있습니다.

  • Object.wait() (시간 제한 없음)
  • Thread.join() (시간 제한 없음)
  • LockSupport.park()

대기 중인 스레드는 다른 스레드가 특정 작업이 완료되기를 대기하고 있는 중입니다. 예를 들어서, 객체에서 Object.wait() 메서드를 호출한 후에 스레드는 해당 객체에서 Object.notify() 혹은 Object.notifyAll()을 호출하는 다른 스레드를 기다리고 있는 것입니다. Thread.join()을 호출한 스레드는 해당 스레드가 종료되기를 기다리고 있습니다.

public class WaitingStateExample {
    public static void main(String[] args) throws InterruptedException {
        Thread workingThread = new Thread(new WorkingRunnable());
        Thread waitingThread = new Thread(new WaitingRunnable(workingThread));

        workingThread.start();
        waitingThread.start();

        Thread.sleep(500); // 충분한 시간동안 스레드들이 실행될 수 있게 기다림
        System.out.println("WaitingThread 현재 상태: " + waitingThread.getState()); // WAITING

        workingThread.join();
        waitingThread.join();
    }

    public static class WorkingRunnable implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class WaitingRunnable implements Runnable {
        private final Thread workingThread;

        public WaitingRunnable(Thread workingThread) {
            this.workingThread = workingThread;
        }

        @Override
        public void run() {
            try {
                workingThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

TIMED_WAITING

스레드가 다른 스레드로부터 작업이 완료되기를 지정된 시간 동안 기다리는 상태를 말합니다. WAITING과 비슷한데 무한정 기다리는게 아니라 지정한 시간 만큼을 기다리고 있는 상태를 의미합니다. 아래의 메서드 중 하나를 호출해서 스레드가 TIMED_WAITING 상태에 있는 중일 수 있습니다.

  • Thread.sleep()
  • Object.wait() (시간 제한 있음)
  • Thread.join() (시간 제한 있음)
  • LockSupport.parkNanos()
  • LockSupport.parkUntil()
public class TimedWaitingStateExample {

    public static void main(String[] args) throws InterruptedException {
        Thread timedWaitingThread = new Thread(new TimedWaitingTask());

        timedWaitingThread.start();

        Thread.sleep(500); // 충분한 시간동안 스레드가 실행될 수 있게 기다림
        System.out.println("TimedWaitingThread state: " + timedWaitingThread.getState()); // TIMED_WAITING

        timedWaitingThread.join();
    }

    public static class TimedWaitingTask implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

BLOCKED

락을 기다리며 블록된(차단된, blocked) 스레드 상태를 의미합니다. 블록된 상태의 스레드는 synchronized 블록 혹은 메서드에 진입하려고 했으나 다른 스레드가 이미 해당 락을 보유하고 있거나, Object.wait()을 호출한 후에 락을 포기하고 깨어난 뒤 다시 synchronized 블록 혹은 메서드에 진입하기 위해서 락을 기다리고 있는 상태를 말합니다.

public class BlockedStateExample {

    private static final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(new BlockedTask(), "스레드 1");
        Thread thread2 = new Thread(new BlockedTask(), "스레드 2");

        synchronized (lock) {
            thread1.start();
            Thread.sleep(100);
            
            thread2.start();
            Thread.sleep(1000); // 충분한 시간동안 스레드들이 실행될 수 있게 기다림

            System.out.println("스레드 1 상태: " + thread1.getState()); // BLOCKED
            System.out.println("스레드 2 상태: " + thread2.getState()); // BLOCKED
        }

        thread1.join();
        thread2.join();
    }

    public static class BlockedTask implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "이(가) 락을 얻었습니다.");
            }
        }
    }
}

TERMINATED

TERMINATED 상태는 예외로 인해 스레드가 중단됐거나, 실행을 모두 끝낸 상태를 말합니다.

public class TerminatedStateExample {

    public static class SimpleRunnable implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread simpleThread = new Thread(new SimpleRunnable());
        simpleThread.start();

        System.out.println("join() 전: " + simpleThread.getState()); // 아마도 RUNNABLE, TIMED_WAITING 중 하나

        simpleThread.join();

        System.out.println("join() 후: " + simpleThread.getState()); // TERMINATED
    }
}

스레드의 동기화

Private 락(Private Lock)

아래는 private 락의 예입니다. 조금 다른게 있다면 락 전용 객체를 하나 만들고, 이 객체를 외부에서 접근할 수 없도록 private로 지정한 것입니다.

class Counter {
    private int count = 0;
    private Object lock = new Object();

    public void increment() {
    	synchronized (lock) {
    		count++;
    	}
    }
    
    /* ... */
}

여기서 동기화 메서드나 동기화 블록으로 충분한 것 같은데 왜 굳이 private 락 객체를 만들까요? 아래를 보면 그 이유를 알 수 있습니다.

class Counter {
    private int count = 0;

    public synchronized void increment() {
    	count++;
    }
    
    public int getValue() {
    	return count;
    }
}

class Worker implements Runnable {
    private Counter counter;

    public Worker(Counter counter) {
        this.counter = counter;
    }

    public void run() {
        for (int i = 0; i < 10000; i++) {
            counter.increment();
        }
    }
}

public class PrivateLockExample {
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        Thread worker = new Thread(new Worker(counter));

        worker.start();
        // 내부에서 무한정 대기하므로 객체 counter의 락이 풀리지 않음
        synchronized (counter) {
        	while (true) {
        		Thread.sleep(Integer.MAX_VALUE);
        	}
        }
    }
}

메인 스레드에서 객체 counter의 락을 획득하고 무한정 대기하면 자식 스레드는 시작되었음에도 불구하고 동기화 메서드인 increment()에 접근할 수 없습니다. 여기서 private 락 객체를 사용하면 외부에서 접근할 수 없기 때문에 이런 문제를 해결할 수 있습니다. 하지만 내부에서 프로그래머의 실수로 락 객체를 새로 할당할 수도 있으므로 아래와 같이 private final로 선언하는 것을 권장합니다.

private final Object lock = new Object();

Lock

synchronized와의 비교

동기화를 위해서 기존에 synchronized 키워드를 사용했었는데, 이 synchronized는 락을 블록 단위로 획득하거나 해제할 수 있었습니다. 

public synchronized void methodA() {
	// ...
}

public void methodB() {
	synchronized (this) {
		// ...
	}
}

그리고 여러 개의 락을 얻은 경우에는 락의 획득 순서와 반대로 락을 해제해야 된다는 특징이 있었습니다. 다시 말해서, 가장 마지막에 획득한 락을 가장 먼저 해제하고, 가장 먼저 획득한 락을 가장 마지막에 해제해야 했습니다. 그리고 락을 획득한 코드 블록이 끝날 때, 자동으로 모든 락이 해제되어야 했었습니다. 하지만 A의 락을 획득한 다음 B의 락을 획득하고, A의 락을 해제한 다음 C를 획득하고 B를 해제하는 식으로 락을 좀 더 유연하게 다뤄야 할 때도 있는데 이럴 때 Lock을 사용할 수 있습니다.

그뿐만이 아니라 Lock은 락을 확보할 때 시간 제한을 두거나, 스레드가 가지고 있는 락의 갯수를 확인하거나, 현재 스레드가 락을 소유하고 있는지 등 다양한 메서드를 제공하고 있습니다.

public interface Lock {
	// 락을 획득한다. 만약 락을 얻을 수 없으면 현재 스레드는 
	// 블록 상태(BLOCKED)가 되고 락을 획득할 때까지 대기한다.
    void lock();

	// 현재 스레드가 인터럽트 상태가 아닐 때 락을 획득할 수 있다.
	// 만약에, 현재 스레드가 이 메서드에 진입할 때 인터럽트 상태가 설정된 경우
	// 혹은 락을 획득하는 동안 인터럽트가 발생하면 InterruptedException
	// 이 발생하고 현재 스레드의 인터럽트 상태가 지워진다.
    void lockInterruptibly() throws InterruptedException;

	// 바로 락을 시도하고 성공 여부를 boolean 타입으로 반환한다.
	// 예를 들어, 락을 사용할 수 있으면 즉시 획득하고 true를 반환한다.
    boolean tryLock();

	// 지정한 시간 내에 락을 얻을 수 있으면 현재 스레드가 락을 획득하고
	// true를 반환한다. 만약 지정한 대기 시간이 지난 경우에는 false를 반환한다.
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

	// 락을 해제한다.
    void unlock();

	// 현재 락 인스턴스와 연결된 Condition 객체를 반환한다.
    Condition newCondition();
}

주의사항

하지만 여기서 주의해야 할 점은 synchronized와는 달리 예외가 발생하면 락을 풀지 못하고 크리티컬 섹션(critical section)을 빠져나가는 일이 있을 수 있으므로 try-catch문이나 try-finally문을 사용하여 예외가 발생하더라도 락을 풀 수 있도록 만들어야 한다는 것입니다. 만약에 프로그래머의 실수로 이를 잊는다면 관련된 문제가 발생했을 때 락이 언제 어디서 풀렸는지 기록이 남는 게 아니므로 문제의 원인을 찾기가 힘들어집니다.

Lock lock = new ReentrantLock();
/* ... */
lock.lock();
try {
	// 크리티컬 섹션
} finally {
	lock.unlock();
}

tryLock()

lock()과 unlock()은 대충 짐작이 가셨을테니, 여기서는 tryLock()만 살펴보도록 하겠습니다. tryLock()은 위에서도 살펴봤지만, 바로 락을 시도하고 그에 대한 성공 여부를 반환하는 메서드입니다. 보통은 아래와 같은 형태로 작성할 수 있습니다.

Lock lock = ...;
if (lock.tryLock()) {
	try {
		// 크리티컬 섹션
	} finally {
		lock.unlock();
	}
} else {
	// 락을 획득할 수 없으면
}

아래는 예시 코드입니다. 매 실행마다 10이란 결과를 얻을 수도 있고 10 미만의 결과를 얻을 수도 있는 걸 확인할 수 있습니다.

public class LockExample {
    private static final Lock lock = new ReentrantLock();
    public static int count = 0;

    public static Thread createNewThread() {
        return new Thread(() -> {
            if (lock.tryLock()) {
                try {
                    count++;
                } finally {
                    lock.unlock();
                }
            } else {
                System.out.println("스레드 " + Thread.currentThread().getName() + "(이)가 락을 얻는 데 실패했습니다.");
            }
        });
    }

    public static void main(String[] args) {
        int threadCount = 10;
        Thread[] threads = new Thread[threadCount];

        for (int i = 0; i < threads.length; i++)
            threads[i] = createNewThread();

        for (Thread value : threads)
            value.start();

        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("count = " + count);
    }
}

ReentrantLock

ReentrantLock 클래스는 자바 5 이후로 java.util.concurrent.locks 패키지에 추가되었습니다. synchronized를 이용한 동기화는 한계가 있을 때 보다 확장된 기능들을 갖춘 ReentrantLock을 사용하는 걸 고려해볼 수 있습니다. 사용법은 아래와 같이 lock() 메서드를 호출하여 락을 얻은 뒤에, unlock() 메서드를 호출하여 락을 해제합니다.

Lock lock = new ReentrantLock();
/* ... */
lock.lock();
try {
	// 크리티컬 섹션
} finally {
	lock.unlock();
}

공평성(fairness)

그리고 ReentrantLock 클래스의 생성자를 살펴보면 fair라는 boolean형 매개변수를 받는데, synchronized와는 달리 공평성(fairness)을 지원하기 때문입니다. 공정한 순서를 유지하기 위한 추가적인 작업이나 관리에 들어가는 오버헤드 부담과 같은 성능 저하 문제로 기본값은 false, 즉 불공평(non-fair) 모드로 되어 있습니다. 따라서 꼭 공평성이 필요한 경우가 아니라면 이를 true로 지정해 성능을 떨어뜨리는 결과를 얻을 필요는 없습니다.

공평성(fairness)

공평성의 의미는 이름 그대로 락의 획득 순서가 스레드들 사이에서 공평하게 이루어지는가를 나타냅니다. 즉, 기다리는 스레드들이 먼저 온 순서대로 자원에 액세스할 수 있다는 보장을 받게 됩니다. 반대로 공평성을 고려하지 않는 불공평(non-fair) 모드에서는 락의 획득 순서가 불규칙적으로 이루어질 수 있고, 어떤 스레드는 락을 오랫동안 기다려야 할 수도 있습니다.

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

언제 사용하는가?

ReentrantLock은 복잡한 동기화 요구 사항이 있을 때 사용할 수 있으나, synchronized와는 달리 락의 관리를 프로그래머에게 쥐여줌으로써 자칫 잘못하면 더 위험하다는 단점이 있습니다. 보통은 ReentrantLock에서 제공하는 기능이 필요한 게 아니면 간편하고 가독성이 좋은 synchronized를 사용하는 것을 권장합니다. ReentrantLock의 사용을 고려해볼 수 있는 것에는 아래와 같은 경우들이 있습니다.

  1. 락을 확보할 때 타임아웃을 지정해야 하는 경우
  2. 보호해야 하는 범위가 블록 단위를 벗어날 경우
  3. 기다리는 스레드들이 먼저 온 순서대로 자원에 액세스해야 함이 보장되어야 하는 경우
  4. 락을 확보하느라 대기 상태에 들어가 있을 때 인터럽트를 걸 수 있어야 하는 경우
  5. 주기적으로 락 상태를 확인하면서 락을 획득하려고 시도하는 경우. 즉, 락 획득에 실패한 경우 스레드가 다른 작업을 수행하거나 주기적으로 락 상태를 확인하는 경우를 말한다.

CountDownLatch

CountDownLatch는 이름 그대로 걸쇠(latch)는 걸쇠인데, 이 걸쇠에 카운터가 붙어있어서 카운터에 0이 찍힐 때까지 호출 스레드를 블록시킬 수 있습니다. 다르게 비유하면 CountDownLatch가 관문의 역할을 한다고 할 수 있습니다. 먼저 D라는 작업을 하기 위해서는 다른 스레드가 작업 중인 A, B, C가 완료된 상태여야 하는데 이런 경우에 CountDownLatch를 사용할 수 있습니다(어느 순서로 완료되었는지는 중요하지 않음).

좀 더 자세하게 말하면, CountDownLatch는 다른 스레드에서 수행 중인 작업들을 모두 완료할 때까지 대기할 수 있도록 해줍니다. 작업의 수를 미리 지정하고, 해당 작업이 모두 완료될 때까지 대기하는 스레드를 만들어서 CountDownLatch에 등록해 놓으면, 작업이 완료될 때마다 카운트가 감소하게 됩니다. 이 때 카운트가 0이 되면 대기 중인 스레드들은 모두 동시에 실행될 수 있게 됩니다. 

public class CountDownLatch {
	// ...
	// 현재 스레드가 인터럽트되지 않는 한,
	// 카운트가 0이 될 때까지 현재 스레드를 블록시킨다.
	// 현재 카운트가 0이면 이 메서드는 즉시 반환된다.
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

	// 현재 스레드가 인터럽트 되거나 지정한 대기 시간이 경과할 때까지 혹은
	// 해당 래치의 카운트가 0이 될 때까지 현재 스레드를 블록시킨다.
	// 현재 카운트가 0이면 이 메서드는 즉시 true를 반환한다.
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

	// 래치의 카운트를 1 감소시킨다. 래치의 카운트가 0이 되면 
	// await()을 호출하여 래치를 획득한 모든 스레드가 블록 상태에서 풀려난다.
	// 현재 카운트가 0이라면 아무 일도 일어나지 않는다.
    public void countDown() {
        sync.releaseShared(1);
    }

	// 래치의 현재 카운트를 반환한다.
    public long getCount() {
        return sync.getCount();
    }
}

전체적인 흐름을 살펴봅시다. 먼저 래치의 초기 카운트를 설정해야 합니다. 참고로 이 값은 한 번만 설정할 수 있고, 이 값을 리셋시키는 다른 메서드는 존재하지 않습니다. 다시 말해서 래치는 재사용이 불가능합니다.

CountDownLatch latch = new CountDownLatch(4);

await() 메서드를 호출한 모든 스레드는 이 카운트가 0에 도달하거나 다른 스레드에 의해서 인터럽트 될 때까지 기다립니다. 작업 스레드는 작업을 모두 끝냈거나 사전 준비를 모두 마쳤으면 countDown() 메서드를 호출해서 카운트다운을 수행합니다. 카운트가 0에 도달하면 await()을 호출하고 블록됐던 스레드들이 실행되기 시작합니다.

아래 예시에서는 카운트가 4로 시작하며 메인 메서드를 실행하는 메인 스레드는 다른 스레드의 작업이 모두 끝날 때까지 대기하게 됩니다.

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int numOfTasks = 4;
        CountDownLatch latch = new CountDownLatch(numOfTasks);

        for (int i = 1; i <= numOfTasks; i++) {
            Task task = new Task(i, latch);
            new Thread(task).start();
        }

        latch.await();
        System.out.println("모든 작업이 완료되었습니다!");
    }
}

class Task implements Runnable {
    private final int taskId;
    private final CountDownLatch latch;

    public Task(int taskId, CountDownLatch latch) {
        this.taskId = taskId;
        this.latch = latch;
    }

    @Override
    public void run() {
        System.out.println("작업 #" + taskId + "이 실행 중입니다...");
        // 시간이 오래 걸리는 작업을 수행하는 코드
        try {
            // 시뮬레이션을 위해 무작위로 스레드를 잠시 중지한다.
            Thread.sleep((int) (Math.random() * 3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("작업 #" + taskId + "이 완료되었습니다!");
        latch.countDown();
    }
}

CyclicBarrier

CyclicBarrier를 사용하면 어떤 집합에 있는 스레드들이 서로가 공통 배리어 포인트에 도달할 때까지 기다릴 수 있도록 할 수 있습니다. 여기서 cyclic 은 스레드의 블록 상태가 풀린 후에 래치(latch)와는 다르게 다시 사용할 수 있기 때문에 순환(cyclic)이라는 단어가 붙은 것입니다. 그리고 barrier는 말 그대로 '장벽'이나 '벽'이라는 뜻을 가지고 있습니다. 따라서 CyclicBarrier는 스레드들이 서로를 기다리는 '벽'이라는 의미를 가진다고 할 수 있습니다.

이러한 기능은 병렬 처리나 멀티 스레드 환경에서 작업의 순서를 조절하거나 동기화하는 데에 유용하게 사용할 수 있습니다. 예를 들어, 모든 스레드가 특정 작업을 마치기 전까지 다음 작업으로 진행하지 않도록 하거나, 여러 개의 스레드가 각자 독립적으로 작업을 수행한 후에 모든 결과를 합치는 작업이 필요할 때 사용할 수 있습니다. 

public class CyclicBarrier {
	// ...
	// 파티(스레드)의 수를 반환한다.
	// 즉, 배리어에 걸리기 전에 await() 메서드를 호출해야 하는 스레드의 수를 말한다.
	public int getParties() {
        return parties;
    }

	// 현재 배리어에 대해서 모든 파티(parties)가 await()을 호출할 때까지 대기한다. 
	// 현재 스레드가 마지막으로 도착하지 않은 경우, 아래 중 하나가
	// 일어날 때까지 현재 스레드가 블록된다.
	// (1) 마지막 스레드가 도착함
	// (2) 다른 스레드가 현재 스레드를 인터럽트함 
	// (3) 똑같이 배리어로 인해 블록 상태에 있는 다른 스레드 중 하나가 인터럽트됨
	// (4) 다른 스레드가 이 배리어의 reset() 메서드를 호출함
	// 여기서 (2), (3), (4)가 발생하면 BrokenBarrierException이 발생한다.
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

	// 위와 동일하나 모든 파티가 배리어에 도착하면 지정한 대기 시간이
	// 지날 때까지 대기한다.
	// 만약에 대기 시간이 경과해도 마지막 스레드가 도착하지 않으면
	// TimeoutException이 발생한다.
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

	// 이 배리어가 중단되었는지에 대한 여부를 반환한다.
	// 즉, 한 개 이상의 스레드가 생성 이후나 마지막 리셋 이후
	// 인터럽트나 타임아웃으로 인해 이 배리어를 벗어났거나,
	// 작업 도중 예외가 발생하여 실패했다면 true를 반환한다.
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

	// 배리어를 초기 상태로 되돌린다.
	// 현재 배리어에서 대기 중인 스레드가 있으면 해당 스레드에
	// BrokenBarrierException이 발생한다.
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
}

parties는 CyclicBarrier를 사용하는 스레드의 수를 의미합니다. await() 메서드는 다른 모든 스레드가 '벽'에 도달할 때까지 기다리게 하며, 모든 스레드가 도달하면 '벽'이 허물어지고 모든 스레드가 블록 상태에서 벗어나 다음 작업으로 진행할 수 있게 됩니다. 그 후, 배리어는 다시 초기 상태로 돌아가 다음 배리어 포인트를 준비하게 됩니다. 

이러한 방식으로 CyclicBarrier는 여러 개의 작업을 동시에 처리하는데 유용하게 사용할 수 있습니다. 아래 예시에서는 3개의 스레드가 "안녕하세요!"를 출력한 뒤 무작위로 1초에서 5초 사이의 대기 시간을 갖습니다. 다른 스레드들이 작업을 모두 끝내야 배리어를 통과하는 것에 주목합시다.

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;

public class SimpleCyclicBarrierExample {

    public static void main(String[] args) {
        int numOfThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numOfThreads, new BarrierAction());

        for (int i = 0; i < numOfThreads; i++) {
            new Thread(new Worker(barrier)).start();
        }
    }

    public static class BarrierAction implements Runnable {
        @Override
        public void run() {
            System.out.println("모든 스레드가 작업을 완료했습니다!");
        }
    }

    static class Worker implements Runnable {
        private final CyclicBarrier barrier;

        public Worker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": 안녕하세요!");

			// 시간이 오래 걸리는 작업을 수행하는 코드
            try {
	            // 시뮬레이션을 위해 무작위로 스레드를 잠시 중지한다.
                int randomDelay = ThreadLocalRandom.current().nextInt(1, 6) * 1000;
                Thread.sleep(randomDelay);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + "이(가) 작업을 완료했습니다.");

            try {
                barrier.await();
                System.out.println(Thread.currentThread().getName() + "이(가) 배리어를 통과했습니다.");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

세마포어(Semaphore)

카운팅 세마포어(counting semaphore)는 특정 리소스를 동시에 사용하려는 스레드의 수를 제한하고자 할 때 사용합니다. 카운트가 0이 아닌 동안에는 스레드는 세마포어(semaphore)를 획득하고 작업을 진행할 수 있습니다. 스레드가 세마포어를 해제하면 카운트가 증가합니다. 기존에 봤던 동기화 기법들은 크리티컬 섹션에 하나의 스레드만 접근할 수 있었으나, 세마포어를 사용하면 여러 스레드가 크리티컬 섹션에 진입하도록 제한할 수 있습니다.

public Semaphore(int permits) {
	sync = new NonfairSync(permits);
}

세마포어를 사용할 때는 처음에 퍼밋(permits)을 지정해야 하는데, 여기서 퍼밋은 리소스에 대한 동시 접근을 제한하는 데 사용되는 개념입니다. 이름 그대로 일종의 가상 토큰 혹은 허가서라고 이해하면 됩니다. 예를 들어, 퍼밋 값이 1인 세마포어는 동시에 하나의 스레드만 리소스에 접근할 수 있도록 제한하며, 이를 바이너리 세마포어(항상 0과 1의 카운트를 가짐, binary semaphore)라고 부르고 우리가 기존에 봤던 동기화 기법(뮤텍스)과 유사하다고 할 수 있습니다. 만약에 퍼밋 값이 3이라면 동시에 최대 3개의 스레드가 리소스에 접근할 수 있도록 허용할 수 있습니다.

예시 살펴보기

이해를 돕기 위해서 예를 살펴봅시다. 예를 들어서, 공용 화장실에 3개의 칸이 있다고 가정해봅시다. 이 경우, 카운팅 세마포어의 퍼밋 값은 3이라고 할 수 있습니다. 이렇게 설정하면, 한 번에 최대 3명의 사람이 화장실을 이용할 수 있게 됩니다.

세마포어의 acquire() 메서드는 화장실의 칸을 사용하려는 사람이 도착했을 때 호출됩니다. 사용 가능한 퍼밋(현재는 3)이 있으면 카운터가 감소하고 사용자는 화장실 칸을 사용할 수 있게 됩니다. 위와 같은 상황에서는 사용자가 화장실(크리티컬 섹션)로 진입해서 남은 퍼밋수가 2로 떨어진 것을 확인하실 수 있습니다. 

만약 화장실이 가득 차 있으면(카운터가 0이면), 다른 사용자는 누군가 화장실을 나올 때까지 기다려야 합니다. 세마포어의 release() 메서드는 화장실에서 나온 사용자가 호출합니다. 이 메서드가 호출되면 카운터가 증가하고, 기다리고 있는 다른 사용자가 화장실을 사용할 수 있게 됩니다.

비록 비유긴 했지만, 이처럼 카운팅 세마포어는 공유 리소스(혹은 공유 자원, 화장실 칸)의 동시 사용을 제한하고, 동시 사용자 수를 조절할 수 있게 해줍니다. 이를 통해서 화장실 내부에서의 동시 사용자 수를 안전하게 관리할 수 있게 됩니다. 이제 세마포어 클래스 내부를 살펴봅시다. 보통 acquire()와 release() 메서드가 주로 사용됩니다.

public class Semaphore implements java.io.Serializable {
	// 세마포어로부터 퍼밋(permit)을 획득하고, 사용 가능한 퍼밋이 없으면
	// 인터럽트 되거나 퍼밋을 얻을 수 있을 때까지 현재 스레드는 블록된다.
	// 퍼밋을 얻을 수 있으면 퍼밋의 수를 1만큼 감소시킨다.
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

	// 퍼밋을 해제하고 세마포어로 반환한다.
    public void release() {
        sync.releaseShared(1);
    }

	// 호출 시 사용 가능한 퍼밋이 있는 경우에만 퍼밋을 얻는다.
	// 퍼밋이 없는 경우 이 메서드는 즉시 false 값을 반환한다.
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

	// 호출 시 사용 가능한 퍼밋이 있는 경우에 퍼밋을 얻는다.
	// 퍼밋이 없는 경우에는 지정한 시간 만큼 퍼밋을 얻기 위해 대기하다가,
	// 얻으면 true를 반환하고 얻지 못하면 false를 반환한다.
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

	// 세마포어에서 현재 사용 가능한 퍼밋의 수를 반환한다.
	// 이 메서드는 일반적으로 디버깅이나 테스트 목적으로 사용된다.
    public int availablePermits() {
        return sync.getPermits();
    }
    // ...
}

소유권(ownership)

여기서 추가로 알아두어야 할 점은 기존에 살펴봤던 뮤텍스(mutex)와는 달리 세마포어에는 공유 리소스에 대한 소유권에 대한 개념이 없기 때문에, 다른 스레드가 락을 해제할 수도 있습니다. synchronized 같은 경우에는 리소스에 대한 소유권을 가진 스레드만이 해당 리소스에 대한 락을 해제할 수 있었습니다. release() 메서드의 자바독을 살펴보면 다음과 같은 문장을 볼 수 있습니다.

public class Semaphore implements java.io.Serializable {
	// ...
	// ... 세마포어에서 퍼밋을 해제하려는 스레드가 반드시 acquire() 메서드를 호출하여
	// 해당 퍼밋을 해제할 필요는 없습니다. ...
	public void release() {
        sync.releaseShared(1);
    }
}

예시 코드

아래 코드에서는 Resource 클래스가 Semaphore를 사용하여 동시에 사용 가능한 리소스의 수를 제한합니다. 여기서는 최대 3개의 스레드만이 동시에 리소스를 사용할 수 있습니다. 메인 메서드에서는 10개의 스레드를 생성하여 리소스를 사용하도록 요청하지만, 세마포어로 인해서 동시에 3개의 스레드만이 리소스를 사용할 수 있습니다. 나머지 스레드들은 사용 가능한 리소스가 생길 때까지 기다리게 됩니다.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

public class SemaphoreExample {

    public static class Resource {
        private final Semaphore semaphore;

        public Resource(int permits) {
            semaphore = new Semaphore(permits);
        }

        public void use(CountDownLatch latch) {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "가 리소스를 사용하고 있습니다.");
                Thread.sleep(2000); // 실제 작업을 수행하는 코드
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(Thread.currentThread().getName() + "가 리소스를 해제했습니다.");
                semaphore.release();
                latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Resource resource = new Resource(3);
        CountDownLatch latch = new CountDownLatch(10);

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> resource.use(latch), i + "번 스레드");
            thread.start();
        }

        // 모든 스레드가 종료될 때까지 기다립니다.
        latch.await();
    }
}

번외

말 그대로 번외입니다. 아래의 클래스들은 관심이 있으신 분들만 읽어보시고, 다음 편으로 넘어가셔도 무방합니다.

ReadWriteLock

기존의 락들은 동시성을 보장할 수는 있었으나, 공유 자원에 대한 접근을 하나의 스레드만 허용했기 때문에 여러 스레드가 동시에 읽기 작업을 수행하는 상황에서는 성능 문제가 일어날 수 있습니다. 그러나 대부분의 경우에는 데이터가 간간이 변경되기는 하지만 상대적으로 읽기 작업이 많이 일어나게 되는데, 이런 상황에서는 락의 조건을 조금 풀어서 읽기 연산은 여러 스레드에서 동시에 실행할 수 있도록 하면 성능이 향상되지 않을까요?

이러한 요구사항에 대응하기 위해 등장한 것이 공유 자원에 대한 동시성 접근을 지원하는 새로운 락인 ReadWriteLock입니다. ReadWriteLock은 읽기 작업은 여러 스레드가 동시에 수행할 수 있지만, 쓰기 작업은 하나의 스레드만 수행할 수 있도록 제어하는 락입니다. 즉, 읽기 작업은 서로 간에 영향을 주지 않으므로 동시에 수행해도 문제가 없지만, 쓰기 작업은 하나의 스레드가 끝날 때까지 다른 스레드는 대기해야 합니다.

ReadWriteLock은 ReentrantLock 클래스와 마찬가지로 java.util.concurrent 패키지에 속해 있으며, 아래와 같은 메소드를 제공합니다. 데이터를 변경하는 스레드가 없는 한 여러 스레드가 동시에 읽을 수 있으며, 한 번에 하나의 스레드만 데이터를 변경할 수 있기 때문에 쓰기 작업용 락이 해제될 때까지 다른 스레드(읽기 쓰레드와 쓰기 스레드 모두)는 블록됩니다. 반대로 다른 스레드가 읽는 동안에 쓰기 스레드가 데이터를 변경하려고 하면 쓰기 스레드도 읽기 작업용 락이 풀릴 때까지 블록됩니다.

public interface ReadWriteLock {
    // 읽기 작업용 락을 반환한다.
    Lock readLock();

    // 쓰기 작업용 락을 반환한다.
    Lock writeLock();
}

참고로 읽기 작업용 락이랑 쓰기 작업용 락이 있으니 내부적으로 두 개의 락 객체를 쓰고 있는건가 싶지만 내부적으로는 하나의 ReadWriteLock 객체가 사용됩니다. Lock과 동일하게 보통은 아래와 같이 사용하게 됩니다.

ReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
try {
	// 크리티컬 섹션
} finally {
	lock.readLock().unlock();
}

언제 사용하는가?

ReadWriteLock은 캐시나 사전 같이 읽기 작업이 쓰기 작업보다 훨씬 많은 특정 상황에서 병렬 프로그램의 성능을 크게 높일 수 있도록 설계되었습니다(예: 초기에 데이터가 채워지고 이후에 드물게 수정되는 경우). 반대로 읽기 작업이 쓰기 작업보다 다소 적은 경우 혹은 읽기 작업이 너무 짧은 경우에는 읽기-쓰기 잠금 구현의 오버헤드 때문에 synchronized를 사용하는 게 더 간단하고 효율적일 수 있습니다. 실제로 ReadWriteLock이 현재 상황에 적합한지 아닌지는 성능 분석을 통해 판단해야 합니다.

Exchanger

Exchanger는 두 개의 스레드가 연결되는 배리어(barrier)이며, 배리어 포인트에 도달하면 양쪽의 스레드가 서로 갖고 있던 값을 교환합니다. 이를 통해서 스레드 사이에 안전한 데이터 교환을 보장할 수 있습니다. Exchanger를 사용하는 예시로는 데이터를 처리하는 스레드와 결과를 출력하는 스레드 사이에서 데이터 교환을 할 때 사용할 수 있습니다.

public class Exchanger<V> {
	// 다른 스레드가 교환 지점(exchange point)에 도달할 때까지 기다리고,
	// 그 후 상대 스레드에게 주어진 객체를 전달하고 상대 스레드가 보내온 객체를 수신한다.
    @SuppressWarnings("unchecked")
    public V exchange(V x) throws InterruptedException {
        // ...
    }

	// 위 메서드와 동일하지만 대기하다가 지정한 시간이 지나면,
	// TimeoutException이 발생한다.
    @SuppressWarnings("unchecked")
    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        // ...
    }
}

스레드가 exchange() 메서드를 호출하면 다른 스레드가 exchange() 메서드를 호출할 때까지 해당 스레드는 블록됩니다. 

두 개의 스레드가 exchange() 메서드를 호출하면 각 스레드는 상대방의 값과 자신의 값을 교환하고 반환합니다. 이러한 방식으로 두 개의 스레드가 서로 값을 교환할 수 있습니다.
아래 예시에서는 Exchanger를 통해서 두 개의 스레드가 문자열을 교환하는 것을 볼 수 있습니다. 각 스레드에서 exchanger.exchange() 메서드를 호출하면 해당 스레드는 블로킹되어 다른 스레드가 동일한 메서드를 호출할 때까지 대기하게 됩니다. 두 스레드 모두 메서드를 호출하면 서로의 데이터를 교환하고 결과를 출력합니다.

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread threadA = new Thread(new ThreadA(exchanger));
        Thread threadB = new Thread(new ThreadB(exchanger));

        threadA.start();
        threadB.start();
    }

    public static class ThreadA implements Runnable {
        private final Exchanger<String> exchanger;

        public ThreadA(Exchanger<String> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            String message = "안녕, 스레드 B!";
            try {
                String exchangedMessage = exchanger.exchange(message);
                System.out.println("스레드 A가 받음: " + exchangedMessage);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ThreadB implements Runnable {
        private final Exchanger<String> exchanger;

        public ThreadB(Exchanger<String> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            String message = "안녕, 스레드 A!";
            try {
                String exchangedMessage = exchanger.exchange(message);
                System.out.println("스레드 B가 받음: " + exchangedMessage);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Phaser

Phaser는 다시 사용할 수 있는 동기화 배리어(barrier)로, CyclicBarrier와 CountDownLatch와 비슷한 기능을 하지만 보다 더 유연한 기능을 제공합니다. Phaser는 여러 단계로 나뉘어진 작업을 수행할 때 유용합니다. 

final int numberOfThreads = 4;
Phaser phaser = new Phaser(numberOfThreads);

public class Phaser {
	// ...
	// phase - 다음 단계(phase)로 진행하기 위해 필요한 파티(스레드)의 수
	public Phaser(int parties) {  
		this(null, parties);  
	}
	// ...
}

Phaser를 사용해서 특정 단계가 완료될 때까지 스레드가 대기하도록 할 수 있으며, 해당 단계가 끝나면 다음 단계로 넘어갈 수 있다는 점에서 CyclicBarrier와 유사합니다. 하지만 Phaser는 CyclicBarrier와 같이 고정된 수의 스레드를 기다리는 게 아니라, Phaser는 동기화에 참여하는 스레드의 수를 동적으로 조절할 수 있다는 점에서 차이가 있습니다.

public class Phaser {
	// ...
	// 동기화에 참여할 수 있는 스레드의 수를 늘린다.
    public int register() { /* ... */ }

	// 등록된 스레드가 현재 단계를 완료했음을 알리는 데 사용한다.
	// 모든 등록된 스레드가 arrive()를 호출하면 Phaser는 다음 단계로 진행한다.
	// 마치 CyclicBarrier의 await()과 비슷하다고 생각하면 된다.
	public int arrive() { /* ... */ }

	// 현재 스레드가 해당 단계를 완료했음을 알리고,
	// Phaser에서 스레드를 등록 해제한다. 즉, 스레드가 이후의 단계에
	// 참여하지 않게 된다. 이 메서드를 호출하면 Phaser에서 관리하는
	// 스레드의 수가 하나 감소하게 된다.
    public int arriveAndDeregister() { /* ... */ }

	// 이 메서드는 현재 스레드가 해당 단계를 완료했음을 알리고,
	// 다음 단계로 진행되기 전에 모든 등록된 스레드가 해당 단계를 완료할 때까지
	// 기다린다. 즉, 스레드는 다음 단계로 진행하기 전에 다른 스레드들이 현재
	// 단계를 완료할 때까지 블록된다.
    public int arriveAndAwaitAdvance() { /* ... */ }
}

여기서 arrive() 메서드는 파티(보통 실행 스레드)가 어떤 작업(혹은 그 작업의 일부)을 완료했음을 의미합니다. 하지만 이 메서드는 현재 스레드가 단계가 끝날 때까지 다른 스레드를 기다리지 않습니다. 만약에 현재 단계(phase)를 마무리하고 다른 모든 스레드도 해당 단계를 완료할 때까지 기다리려면 arriveAndAwaitAdvance() 메서드를 호출해야 합니다.

우선은 이해를 돕기 위해서 아래의 예시 코드를 살펴봅시다.

예시 코드

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1); // 초기 파티 수를 1로 설정 (메인 스레드 포함)
        int numThreads = 3;

        for (int i = 0; i < numThreads; i++) {
            phaser.register(); // 새로운 스레드를 등록
            new Thread(new Task(phaser), (i + 1) + "번 스레드").start();
        }
        System.out.println("현재 파티 수: " + phaser.getRegisteredParties());

        // 모든 스레드가 각 단계를 완료할 때까지 메인 스레드가 기다림
        for (int phase = 0; phase < 3; phase++) {
            phaser.arriveAndAwaitAdvance();
            System.out.println((phase + 1) + "번 단계가 완료되었습니다.");
        }

        System.out.println("현재 파티 수: " + phaser.getRegisteredParties());
    }

    static class Task implements Runnable {
        private final Phaser phaser;

        Task(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            for (int phase = 0; phase < 3; phase++) {
                System.out.println(Thread.currentThread().getName() + "는 " + (phaser.getPhase() + 1) + "번 단계에서 작업 중입니다...");
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if (phase == 2) {
                    phaser.arriveAndDeregister(); // 마지막 단계에서 스레드를 해제
                } else {
                    phaser.arriveAndAwaitAdvance(); // 다른 스레드가 해당 단계를 완료할 때까지 기다림
                }
            }
        }
    }
}

여기서 초기 파티 수는 1로 설정되어 있습니다. 일반적으론 초기에 파티 수를 지정하지만, register() 메서드를 통해서 동기화에 참여할 파티 수를 동적으로 조절할 수 있습니다.

Phaser phaser = new Phaser(1); // 초기 파티 수를 1로 설정 (메인 스레드 포함)

phaser.register() 메서드를 통해서 새로운 파티(스레드)를 등록한 것을 볼 수 있습니다. 당연히 getRegisteredParties()로 현재 등록된 파티 수를 얻어오면 메인 스레드와 추가된 스레드 3개를 포함해서 4가 나올 것입니다.

int numThreads = 3;

for (int i = 0; i < numThreads; i++) {
	phaser.register(); // 새로운 스레드를 등록
	new Thread(new Task(phaser), (i + 1) + "번 스레드").start();
}
System.out.println("현재 파티 수: " + phaser.getRegisteredParties()); // 4

모든 스레드가 단계를 각각 마무리 할 때까지 메인 스레드는 기다리게 됩니다. Phaser 내부에는 단계(phase)를 나타내는 내부 변수가 있으며, 이 값은 0 부터 시작하며 단계가 올라갈수록 이 값도 1씩 증가하게 됩니다. 이는 getPhase() 메서드로 얻어올 수 있습니다.

for (int phase = 0; phase < 3; phase++) {
	phaser.arriveAndAwaitAdvance();
	System.out.println((phase + 1) + "번 단계가 완료되었습니다.");
}

그 다음 크게 주목할 부분은 아래 부분입니다. 0~1 단계에서는 arriveAndAwaitAdvance() 메서드를 호출하여 작업을 일찍 마친 스레드는 다른 스레드를 기다렸다가, 마지막 단계에서 모든 스레드는 도착했음을 알리고 arriveAndDeregister() 메서드를 호출하여 파티에서 빠져나가게 됩니다. 

if (phase == 2) {
	phaser.arriveAndDeregister(); // 마지막 단계에서 스레드를 해제
} else {
	phaser.arriveAndAwaitAdvance(); // 다른 스레드가 해당 단계를 완료할 때까지 기다림
}

따라서 마지막에 현재 파티 수를 찍어보면 4가 아닌 1(메인 스레드)이 찍힌 것을 볼 수가 있습니다.

System.out.println("현재 파티 수: " + phaser.getRegisteredParties()); // 1

Condition

Condition은 여러 스레드가 동시에 특정 상태를 기다리는 동안, 각각 다른 '기다리는 줄(대기 집합)'에 속할 수 있도록 해주는 도구라고 할 수 있습니다. 

예시 살펴보기

예를 들어서, 극장에서 영화 표를 구매하는 상황에 이를 비유해볼 수 있습니다. 극장에서 각기 다른 영화를 상영하고 있다고 가정해봅시다. 대기열을 하나만 사용한다면, 모든 영화를 보려는 사람들이 한 줄로 서서 기다려야 합니다. 이렇게 되면 영화별로 표를 구매하기 어렵고 비효율적일 수 있습니다.

이때 Condition이 도입되면, 각 영화에 대해서 따로 대기열을 만들 수 있게 됩니다. 이렇게 하면 각 영화를 보려는 사람들이 해당 영화의 대기열에 서서 기다리게 할 수 있습니다. 이렇게 여러 대기열이 있을 때, 특정 영화의 표를 사려는 사람들은 그 영화에 대한 대기열에만 집중할 수 있으며, 이를 통해 효율적으로 표를 구매할 수 있게 됩니다.

이와 마찬가지로 Condition은 여러 스레드가 특정 상태를 기다리는 동안 서로 다른 대기 집합(wait-sets)에 속할 수 있도록 하여, 효율적인 상호 작용과 작업 처리를 가능하도록 만들 수 있습니다. 예를 들어서 아래와 같은 예시 코드를 잠깐 살펴봅시다.

class MovieTheater {
	// Condition 객체는 lock 객체와 함께 사용된다.
    private final Lock lock = new ReentrantLock();
    // 아래 movie1Queue는 스레드들을 하나의 대기 집합으로 묶는 역할을 함
    private final Condition movie1Queue = lock.newCondition();
    // 현재 영화관에 1번 영화에 대한 남은 티켓 수
    private int movie1Tickets = 10;
    // ...
    
	public void buyTicketForMovie1() throws InterruptedException {
        lock.lock();
        try {
	        // 1번 영화의 티켓이 모자르다면
            while (movie1Tickets == 0) {
	            // 티켓이 새로 생길 때까지 무한정 대기한다. (signal)
                movie1Queue.await();
            }
            movie1Tickets--;
            System.out.println("영화 1 티켓 구매됨. 남은 티켓: " + movie1Tickets);
        } finally {
            lock.unlock();
        }
    }

	// ...
    public void addTicketForMovie1() {
        lock.lock();
        try {
	        // 티켓을 추가하고
            movie1Tickets++;
            // 1번 영화 티켓 구매를 기다리는 사용자를 한 명 깨운다.
            movie1Queue.signal();
        } finally {
            lock.unlock();
        }
    }
}

메서드 살펴보기

Condition 객체는 Lock 객체와 함께 사용되며, Lock 객체의 newCondition() 메서드를 호출하여 Condition 객체를 생성하게 됩니다. 이렇게 하면 하나의 락 객체에 대한 여러 대기 집합(wait-sets)을 가질 수 있게 되어서, 특정 상황에서 여러 스레드를 독립적으로 깨울 수 있게 됩니다.

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

생성된 Condition 객체는 await(), signal(), signalAll() 메서드를 제공하며, 각각 대기 상태로 들어가기, 대기 중인 스레드 중 하나를 깨우기, 모든 대기 중인 스레드를 깨우는 역할을 합니다.

public interface Condition {
	// 신호(signal)가 오거나 인터럽트가 발생할 때까지
	// 현재 스레드를 대기 상태로 만든다.
    void await() throws InterruptedException;

	// 신호(signal)가 오기 전까지 대기 상태로 만든다.
    void awaitUninterruptibly();

	// 신호(signal)가 오거나 인터럽트가 발생하거나 혹은
	// 지정한 대기 시간이 경과할 때까지 현재 스레드를 대기 상태로 만든다.
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;

	// 대기 중인 스레드 중 하나를 깨운다.
    void signal();

	// 대기 중인 모든 스레드를 깨운다.
	// 현재 조건(condition)에 대기 중인 스레드들이 있으면 모두 깨어난다.
    void signalAll();
}

예시 코드

다시 한 번 생산자-소비자 문제를 살펴보도록 하겠습니다. 여기에서는 Condition을 사용해서 문제를 해결합니다. 생산자는 물건을 버퍼에 넣고, 소비자는 버퍼에서 물건을 가져옵니다. 버퍼가 가득 찼을 때 생산자는 블록 상태로 전환되고, 버퍼가 비어 있을 때는 소비자가 블록 상태로 전환됩니다.

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Buffer buffer = new Buffer(5);
        new Thread(new Producer(buffer)).start();
        new Thread(new Consumer(buffer)).start();
    }
}

class Buffer {
    private final Queue<Integer> queue;
    private final int maxSize;
    private final Lock lock;
    private final Condition notFull;
    private final Condition notEmpty;

    public Buffer(int maxSize) {
        this.queue = new LinkedList<>();
        this.maxSize = maxSize;
        this.lock = new ReentrantLock();
        this.notFull = lock.newCondition();
        this.notEmpty = lock.newCondition();
    }

    public void put(int item) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == maxSize) {
                notFull.await();
            }
            queue.add(item);
            System.out.println("생산됨: " + item);
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public int get() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            int item = queue.poll();
            System.out.println("소비됨: " + item);
            notFull.signalAll();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

class Producer implements Runnable {
    private final Buffer buffer;

    public Producer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                buffer.put(i);
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private final Buffer buffer;

    public Consumer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                buffer.get();
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}