ConcurrentHashMap

ConcurrentHashMap은 동시성을 지원하는 해시맵 클래스입니다. 기본 HashMap 클래스와 마찬가지로 키-값 쌍을 저장할 수 있으나, ConcurrentHashMap은 멀티스레드 환경에서 동시성을 지원하기 위해 설계되었습니다. 멀티스레드 애플리케이션에서 여러 스레드가 동시에 맵에서 데이터를 읽고 쓰거나, 따로 동기화 블록을 사용하는 대신 ConcurrentHashMap이 제공하는 동시성 제어 메커니즘을 활용하려면 ConcurrentHashMap을 사용할 수 있습니다.

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>  
implements ConcurrentMap<K,V>, Serializable {
	// ...
	// 주어진 키에 해당하는 값이 없거나 null이면,
	// mappingFunction을 사용해서 새 값을 계산하고 맵에 저장한다.
	// 이미 값이 존재하면 현재 값을 반환한다.
	public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { /* ... */ }
	// 주어진 키에 해당하는 값이 존재하면,
	// mappingFunction을 사용해서 새 값을 계산하고 맵에 저장한다.
	// 계산된 값이 null이면 키-값 쌍이 맵에서 제거된다.
	// 주어진 키에 값이 없으면 아무 일도 하지 않는다.
	public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { /* ... */ }
	// 키에 대한 현재 값을 사용하여 remappingFunction을 통해서
	// 새 값을 계산하고 맵에 저장한다.
	// 계산된 값이 null이면 키-값 쌍이 맵에서 제거된다.
	public V compute(K key,  
	BiFunction<? super K, ? super V, ? extends V> remappingFunction) { /* ... */ }
	// 주어진 키에 해당하는 값이 없거나 null이면 주어진 값이 맵에 저장된다.
	// 이미 값이 존재하면 remappingFunction을 통해 주어진 값과 현재 값을 병합하고
	// 병합된 값을 맵에 저장한다.
	// 병합된 값이 null이면 키-값 쌍이 맵에서 제거된다.
	public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { /* ... */ }
}

지원하는 메서드는 HashMap 등과 같은 Map 인터페이스의 구현체와 거의 동일하기 때문에 새로 추가된 메서드만 살펴보도록 합시다. 기억이 안 난다면 Map 부분을 다시 보고 오셔도 좋습니다.

예시 코드

computeIfAbsent()

이 예제에서 computeIfAbsent() 메서드는 주어진 이름이 nameGroups에 없으면 해당 이름을 키로 하고 값으로 새 ArrayList를 초기화합니다. 그 후 해당 키의 값 리스트에 이름을 추가하는 예시다. 이렇게 하면 각 이름을 키로 하여 그룹화를 할 수 있습니다.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public class GroupingExample {
    public static void main(String[] args) {
        String[] names = {"앨리스", "밥", "찰리", "앨리스", "밥", "찰리", "앨리스"};

        ConcurrentHashMap<String, List<String>> nameGroups = new ConcurrentHashMap<>();

        for (String name : names) {
            // computeIfAbsent를 사용하여 nameGroups에 이름이 없으면 새 ArrayList를 생성하고 추가한다.
            nameGroups.computeIfAbsent(name, key -> new ArrayList<>()).add(name);
        }

        System.out.println(nameGroups);
    }
}

computeIfPresent()

computeIfPresent() 메서드는 키가 이미 존재할 때만 remappingFunction을 적용해서 값을 업데이트하는데 사용됩니다. 아래 예시에서는 각 이름의 출현 횟수를 기록해서 출력하고 있습니다.

import java.util.concurrent.ConcurrentHashMap;

public class NameCounterExample {
    public static void main(String[] args) {
        String[] names = {"앨리스", "밥", "찰리", "앨리스", "밥", "찰리", "앨리스"};

        ConcurrentHashMap<String, Integer> nameCounts = new ConcurrentHashMap<>();

        // 먼저 초기 값을 설정한다.
        for (String name : names) {
            nameCounts.putIfAbsent(name, 0);
        }

        // computeIfPresent()를 사용하여 각 이름의 출현 횟수를 업데이트한다.
        for (String name : names) {
            nameCounts.computeIfPresent(name, (key, count) -> count + 1);
        }

        System.out.println(nameCounts);
    }
}

compute()

compute() 메서드는 키-값 쌍에 대해서 주어진 remappingFunction을 적용해 값을 업데이트합니다. 키가 존재하지 않으면 새로운 키-값 쌍을 추가하게 됩니다. 아래 예시에서는 규칙에 따라서 각 문자를 다른 문자로 치환하고 있습니다.

import java.util.concurrent.ConcurrentHashMap;

public class StringReplacer {
    public static void main(String[] args) {
        ConcurrentHashMap<Character, Character> replacements = new ConcurrentHashMap<>();
        replacements.put('A', 'T');
        replacements.put('T', 'A');
        replacements.put('C', 'G');
        replacements.put('G', 'C');

        String input = "ATCGTAGCTACGT";
        System.out.println("변경 전: " + input);

        StringBuilder output = new StringBuilder();
        for (char c : input.toCharArray()) {
            output.append(replacements.compute(c, (key, value) -> value != null ? value : key));
        }

        System.out.println("변경 후: " + output.toString());
    }
}

merge()

아래 예시에서는 merge()를 사용해서 각 후보자에 대한 투표 수를 계산하고 있습니다. 다시 한번 살펴보면 merge() 메서드는 주어진 키에 해당하는 값이 없거나 null이면 주어진 값이 맵에 저장되고, 이미 값이 존재하면 remappingFunction을 통해 주어진 값과 현재 값을 병합하고 병합된 값을 맵에 저장하게 됩니다.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public class VoteCounter {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> voteCounts = new ConcurrentHashMap<>();

        List<String> votes = Arrays.asList("앨리스", "밥", "앨리스", "앨리스", "찰리", "밥", "앨리스", "밥");

        for (String vote : votes) {
            voteCounts.merge(vote, 1, Integer::sum);
        }

        System.out.println("투표 횟수:");
        for (String candidate : voteCounts.keySet()) {
            System.out.println(candidate + ": " + voteCounts.get(candidate));
        }
    }
}

한 걸음 더 나아가기

ConcurrentHashMap의 주요 설계 목표는 동시 읽기 가능(보통 get() 메서드, 그리고 이터레이터 관련 메서드)을 유지하면서 업데이트 간 충돌을 최소화하는 것입니다. 부차적인 목표는 공간 소비를 HashMap과 비슷하거나 더 낮게 유지하고, 많은 스레드가 빈 테이블에 초기 삽입을 빠른 속도로 수행할 수 있도록 지원하는 것입니다. ConcurrentHashMap의 내부를 살펴보기 전에 알아야 할 개념들을 다시 한번 빠르게 짚고 넘어가도록 해봅시다. 이미 기반 개념을 잘 알고 계신다면 바로 ConcurrentHashMap의 내부 구조로 건너뛰셔도 무방합니다.

해시(Hash)

해시 함수(Hash Function)

해시 함수(짧게 줄여서 해시)는 임의의 길이를 갖는 어떤 데이터를 입력받아 고정된 길이의 데이터로 매핑하는 역할을 하며, 이 해시 함수로 나온 결과 값을 보통 해시 값이라고 부릅니다. 해시 함수를 이용하면 큰 데이터 집합이나 복잡한 데이터 구조를 간단하게 표현하면서, 일정한 규칙에 따라서 인덱싱이나 검색 등의 작업을 쉽게 처리할 수 있습니다. 자바에서는 이런 해시 함수를 사용해서 키를 해시 값으로 변환하여 빠르게 키-값 쌍을 저장하고 검색할 수 있도록 하고 있습니다. 예를 들어서 해시 함수가 간단하게 \(h(x)=x \;mod\; 11\)이라고 한다면 100을 해시에 넣었을 때 1이라는 해시 값을 얻을 수 있습니다.

출처: https://en.wikipedia.org/wiki/Hash_function

아래는 자바 String 클래스의 hashCode() 메서드를 가져온 것인데, 반복문을 통해 각 문자를 가져와서 해시 값을 갱신하는 것을 볼 수 있습니다. 직접 자바 코드에서 String 타입의 변수에 hashCode()를 호출하면 실제 해시 값을 얻을 수 있습니다.

public static int hashCode(byte[] value) {
    int h = 0;
    int length = value.length >> 1;
    for (int i = 0; i < length; i++) {
        h = 31 * h + getChar(value, i);
    }
    return h;
}

해시 테이블(Hash Table)

해시 테이블은 말 그대로 해시 함수를 사용해서 키를 해시 값으로 매핑하고, 이 해시 값을 인덱스로 삼아서 값을 저장하거나 검색하는데 효율적인 자료구조입니다. 이때 데이터가 저장되는 곳을 슬롯(slot) 혹은 버킷(bucket)이라고 부릅니다. 자바에서는 대표적으로 HashMap과 ConcurrentHashMap이 그렇습니다.

출처: https://en.wikipedia.org/wiki/Hash_table

위의 그림을 예로 들면 키 'John Smith'라는 문자열이 해시 함수를 거쳐 2라는 해시 값을 얻고 이를 버킷의 인덱스로 사용하게 됩니다. 2번 버킷에 John의 연락처인 '521-1234'가 저장된 것을 보실 수 있습니다.

해시 충돌(Hash Collision)

해시 테이블은 여러모로 유용한 자료구조이며 다양한 프로그래밍 언어에서 널리 사용되고 있지만, 해시 함수가 떠안고 있는 문제가 있습니다. 바로 임의의 길이의 데이터를 고정된 길이의 데이터로 매핑하게 되면서, 둘 이상의 키에 동일한 인덱스를 생성하는 '해시 충돌(hash collision)'을 피해 갈 수 없다는 것입니다. 만약 \(n\)개의 비둘기집이 있고, \(n+1\)마리의 비둘기가 있다면 적어도 한 비둘기집에서는 필연적으로 2마리 이상의 비둘기가 있어야 한다는 비둘기집 원리(pigeonhole principle)를 따르게 됩니다. 간단한 해시 테이블 구현 예시를 통해서 이를 확인해 보도록 하겠습니다.

public class SimpleHashTable {
    private static final int TABLE_SIZE = 5;
    private final String[] table;

    public SimpleHashTable() {
        table = new String[TABLE_SIZE];
    }

    public void put(String key, String value) {
        int index = hash(key);
        table[index] = value;
    }

    public String get(String key) {
        int index = hash(key);
        return table[index];
    }

    private int hash(String key) {
        return key.hashCode() % TABLE_SIZE;
    }

    public static void main(String[] args) {
        SimpleHashTable hashTable = new SimpleHashTable();
        hashTable.put("사과", "빨간색");
        hashTable.put("바나나", "노란색");
        hashTable.put("망고", "노란색");
        hashTable.put("포도", "보라색");

        System.out.println("사과: " + hashTable.get("사과"));
        System.out.println("바나나: " + hashTable.get("바나나"));
        System.out.println("망고: " + hashTable.get("망고"));
        System.out.println("포도: " + hashTable.get("포도"));
    }
}

여기서는 크기가 5인 해시 테이블에 4개의 키-값 쌍을 집어넣는 것을 볼 수 있습니다. 하지만 코드를 실행시키면 아래와 같이 우리의 기대와는 다른 결과를 볼 수 있습니다.

바로 '망고'와 '포도'의 해시 값이 2로 동일하여 해시 충돌이 발생해 2번 버킷에 저장된 '노란색'이란 값이 포도에 의해 '보라색'으로 변경된 것을 볼 수 있습니다. 물론 해시 함수 자체도 간단하고 인위적으로 해시 충돌을 유도하도록 작은 테이블 크기로 나눈 게 문제이지만 전하려던 바는 충분히 전해졌으리라 생각합니다. 이러한 해시 충돌을 피하기 위해서 어떤 방법을 사용할 수 있을까요? 개방 주소법(open addressing), 분리 연결법(separate chaining) 등 다양한 방법이 있지만 여기서는 ConcurrentHashMap 내부에서 사용하는 분리 연결법 위주로 보도록 하겠습니다.

좋은 해시 함수는 무엇일까요?

좋은 해시 함수는 해시 테이블의 인덱스를 고르게 분포시켜야 하며, 동일한 입력값에 대해서 항상 같은 해시 값을 반환해야 합니다. 또한 해시 테이블의 주요 장점 중 하나가 빠른 검색 속도이기 때문에 해시 함수의 계산 속도가 빠를수록 전체적인 성능이 향상됩니다. 그리고 입력값의 작은 변경에도 해시 값이 크게 바뀌어야 한다는 눈사태 효과(avalanche effect)를 확인할 수 있어야 합니다. 이를 통해서 유사한 입력값이 서로 다른 해시 값을 가지도록 만들어서 해시 충돌을 줄일 수 있습니다.

분리 연결법(Separate Chaining)

정의

분리 연결법은 해시 충돌이 발생했을 때 이를 동일한 버킷에 저장하는데 이를 (일반적으로) 링크드 리스트의 형태로 저장하는 방법을 말합니다. 충돌이 일어날 경우, 아래 그림처럼 해당 인덱스의 버킷에 있는 연결된 자료구조에 새로운 키-값 쌍이 추가되게 됩니다. 이렇게 해서 서로 다른 키-값 쌍이 동일한 인덱스를 공유하더라도 저장 및 검색을 할 수 있게 됩니다.

출처: https://en.wikipedia.org/wiki/Hash_table

위 그림에서는 'John Smith'와 'Sandra Dee'의 인덱스가 152로 충돌한 것을 확인할 수 있는데, 이때 'Sandra Dee'를 나타내는 새로운 노드를 추가하고 이를 'John Smith' 뒤에 연결함으로써 해시 충돌을 처리하는 것을 볼 수 있습니다.

예시 코드

분리 연결법을 통해서 간단하게 구현된 예시 코드를 살펴보도록 하겠습니다. 아래 코드에서는 해시 충돌을 더 잘 확인할 수 있도록 키의 해시 값을 TABLE_SIZE(여기서는 해시 테이블의 크기가 10)로 나눈 나머지의 값을 버킷의 인덱스로 사용하고 있습니다. 각 노드는 링크드 리스트 구조로 동작하도록 다음 노드에 대한 레퍼런스를 가지고 있음을 보실 수 있습니다. 

public class CustomHashTable<K, V> {
    private static final int TABLE_SIZE = 10;
    private final Node<K, V>[] table;

    @SuppressWarnings("unchecked")
    public CustomHashTable() {
        table = (Node<K, V>[]) new Node<?, ?>[TABLE_SIZE];
    }

    public void put(K key, V value) {
        int index = hash(key);
        Node<K, V> newNode = new Node<>(key, value, null);

        if (table[index] == null) {
            table[index] = newNode;
        } else {
            Node<K, V> currentNode = table[index];
            while (currentNode.next != null) {
                if (currentNode.key.equals(key)) {
                    currentNode.value = value;
                    return;
                }
                currentNode = currentNode.next;
            }

            if (currentNode.key.equals(key)) {
                currentNode.value = value;
            } else {
                currentNode.next = newNode;
            }
        }
    }

    public V get(K key) {
        int index = hash(key);
        Node<K, V> currentNode = table[index];

        while (currentNode != null) {
            if (currentNode.key.equals(key)) {
                return currentNode.value;
            }
            currentNode = currentNode.next;
        }

        return null;
    }

    private int hash(K key) {
        return key.hashCode() % TABLE_SIZE;
    }

    public void printBuckets() {
        for (int i = 0; i < TABLE_SIZE; i++) {
            System.out.print("버킷 " + i + ": ");
            Node<K, V> currentNode = table[i];
            while (currentNode != null) {
                System.out.print("(" + currentNode.key + ", " + currentNode.value + ") ");
                currentNode = currentNode.next;
            }
            System.out.println();
        }
    }

    private static class Node<K, V> {
        K key;
        V value;
        Node<K, V> next;

        Node(K key, V value, Node<K, V> next) {
            this.key = key;
            this.value = value;
            this.next = next;
        }
    }

    public static void main(String[] args) {
        CustomHashTable<String, String> hashTable = new CustomHashTable<>();
        hashTable.put("red", "#FF0000");
        hashTable.put("green", "#00FF00");
        hashTable.put("blue", "#0000FF");
        hashTable.put("white", "#FFFFFF");

        hashTable.printBuckets();
    }
}

이해를 돕기 위해서 버킷의 상태를 그림으로 살펴보도록 해봅시다. 키 'red'와 'white'의 해시 값이 동일하여 해시 충돌이 난 것을 볼 수 있으며, 이를 5번 버킷에서 확인할 수 있습니다. 만약에 get() 메서드를 통해 키 white에 매핑된 값을 가져오려면 반복문을 통한 선형 탐색으로 가져오게 됩니다.

 

여기서 충돌이 점점 더 잦아지면 어떻게 될까요? 한 버킷에 n개의 노드가 몰린 경우 n번의 루프를 돌아야 겨우 값을 가져올 수 있게 됩니다. 따라서 자바 8의 ConcurrentHashMap에서는 버킷의 노드 수에 따라서 링크드 리스트가 아닌 균형 탐색 트리(balanced search tree)로 구조를 변환하기도 합니다. 이처럼 버킷과 연결된 자료구조는 일반적으로 링크드 리스트가 사용되지만, 상황에 따라 가변 배열(dynamic array)이나 균형 탐색 트리 등이 사용될 수 있습니다.

가시성과 원자성(Visibility and Atomicity)

멀티스레드 하면 빼놓을 수 없는 가시성과 원자성에 대해서 먼저 살펴보고 그 후에 ConcurrentHashMap 내부에서 사용되는 CAS 연산이 무엇인지 살펴보도록 하겠습니다.

가시성(Visibility)

영어 단어 visibility는 '눈으로 볼 수 있는 정도, 알아볼 수 있는 정도'라는 의미를 가졌는데, 여기서 무엇을 볼 수 있다는 걸까요? 간단히 말하면 가시성은 한 스레드에서 공유 변수의 값을 변경했을 때 다른 그 스레드가 그 변경을 볼 수 있는지, 다시 말해서 그 변경된 값을 올바르게 읽어낼 수 있는지에 대한 여부를 나타냅니다. 한 스레드가 어떤 공유 변수의 값을 변경했다고 하더라도, 다른 스레드가 보는 값은 이전의 값일 수도 있다는 것입니다. 간단하게 아래의 예시를 살펴봅시다.

public class VisibilityExample {
    private static boolean stop = false;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            int i = 0;
            while (!stop) {
                i++;
            }
            System.out.println("최종 i 값: "+ i);
        });

        thread.start();

        Thread.sleep(1000);

        stop = true;
        System.out.println("메인 스레드가 stop 플래그를 true로 설정함");
    }
}

위의 예제를 실행하면 어떤 결과가 나타날까요? 스레드는 정상적으로 stop 플래그의 변경을 확인하고 즉각적으로 반복문을 벗어날 수 있을까요? 예제를 실행해보면 컴파일러 혹은 사용하고 있는 JVM 구현체, 시스템 환경에 따라 정상적으로 i 값을 출력하고 종료되기도, 아니면 스레드가 계속 반복문을 돌면서 죽지 않아 프로그램이 종료되지 않는 것을 확인할 수 있습니다. 이러한 문제는 왜 발생하는 걸까요?

메모리 계층 구조

그 이유는 바로 컴파일러가 모든 작업이 싱글 스레드 환경에서 실행된다고 가정하기 때문입니다. 컴파일러는 이러한 가정 하에 캐시나 메인 메모리에서 읽거나 쓰는 것보다 빠르기에 CPU 레지스터에 데이터를 로드하거나, 싱글 스레드 환경에서 동일한 결과를 보장한다면 명령어의 순서를 바꾸는 등 다양한 최적화를 시도할 수 있습니다. 이는 MESI 프로토콜을 통해 캐시 일관성을 유지하는 경우에도 동일한 문제가 일어납니다. 따라서 가시성으로 인한 문제를 피하기 위해서는 적절한 동기화나 volatile를 통해 모든 읽기/쓰기 작업이 로컬 레지스터를 건너뛰고 캐시에 바로 접근하도록 하고, 일부 컴파일러 최적화(예: 호이스팅)를 방지하여 가시성 문제를 해결해야만 합니다. 컴파일러는 실제로 루프 안에서 stop의 값을 변경하지 않으므로 !stop과 같은 식(expression)의 평가를 위로 끌어올릴 수 있습니다. 이렇게 하면 루프 내에서 해야 되는 작업이 줄어들어서 루프가 더 빠르게 실행됩니다. 따라서 다음과 같이 코드가 변경됩니다.

// ...
    if (!stop) {
    	while (true) {
        	i++;
    	}
    }
// ...

실제로 싱글 스레드 환경에서는 이렇게 최적화해도 결과는 같으며, 최적화가 일어나는 부분을 아래의 어셈블리 코드에서 확인하실 수 있습니다. 명령 프롬프트에서 실행할 때 '-Djava.compiler=NONE' 옵션을 줘서 JIT 컴파일러를 사용하지 않도록 지시하면 컴파일러 최적화가 일어나지 않아서 정상적으로 프로그램이 종료되는 것을 확인하실 수 있습니다.

volatile

이 키워드는 컴파일러의 일부 최적화(예: 재배열, 호이스팅 등)를 방지하고 어떤 이유로든 이 값을 레지스터든 캐시든 캐싱해서는 안 된다고 전할 수 있습니다. 간단하게 말하면 volatile 키워드는 모든 스레드가 해당 변수의 최신 값을 항상 확인할 수 있도록 보장해준다고 할 수 있습니다. 참고로 재배열에 관해서는 이미 스레드 2편에서 살펴봤으니 궁금하신 분들은 이곳으로 이동해 주세요.

public class VisibilityExample {
    private static volatile boolean stop = false;
	/* ... */
 }

위와 같이 수정하면 가시성으로 인한 문제가 해결된 것을 볼 수 있습니다. 하지만 volatile로 선언했다고 하더라도 i++와 같이 여러 개의 연산으로 구성된 복합 연산을 원자적으로 만들지는 않으니 주의하도록 합시다. 즉, 원자성을 보장하지 않으며 가시성만 보장합니다. 여기서 원자성(atomicity)은 스레드 1편에서 살펴봤으니 궁금하신 분들은 이곳으로 이동해 주세요.

원자성(atomicity)

원자성은 더 이상 쪼개질 수 없는 성질을 말하는데, 어떤 것이 원자성을 가지고 있다면 원자적(atomic)이라고 합니다. 덧붙여서, 원자적 연산(atomic operation)은 말 그대로 쪼갤 수 없는 연산을 말합니다.

확인 후 행동(Check-Then-Act) 패턴

이것도 스레드 1편에서 다뤘던 내용이지만 다시 한 번 짚고 넘어가도록 하겠습니다. 패턴의 이름 그대로 무언가를 검사한 뒤에 행동한다는 것인데, 코딩을 할 때면 아래와 같은 일을 빈번하게 하게 됩니다.

public void increment() {
    if (counter < 10000) { // 확인(check)
        counter++; // 행동(act)
    }
}

하지만 만약에 만약 실행 중인 두 개의 스레드가 아래와 같은 순서로 접근한다면 'count가 10000보다 작은 경우에만' 이라는 조건을 붙였음에도 불구하고 count의 값이 10001, 10002 혹은 그 이상이 될 수도 있습니다.

아래 코드를 한 번 실행해보도록 하겠습니다. 과연 계속해서 일관된 값을 출력해 낼 수 있을까요?

public class CheckThenActExample {
    public static void main(String[] args) throws InterruptedException {
        SharedCounter sharedCounter = new SharedCounter();

        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                sharedCounter.increment();
            }
        };

        Thread[] threads = new Thread[10];

        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(incrementTask);
            threads[i].start();
        }

        for (int i = 0; i < 10; i++) {
            threads[i].join();
        }

        System.out.println("최종 카운터 값: " + sharedCounter.getCounter()); // 10000?
    }

    public static class SharedCounter {
    	// volatile은 가시성이나 순서 규칙을 보장하지만 check-then-act 자체가 원자적이지 않다.
        private volatile int counter = 0;

        public void increment() {
            if (counter < 10000) {
                counter++;
            }
        }

        public int getCounter() {
            return counter;
        }
    }
}

위 코드를 컴파일 후 실행하면 항상 결과가 10000이 나오는 것은 아니라는 걸 확인하실 수 있습니다.

CAS(Compare and Swap) 연산

도입

이번에는 CAS 연산을 사용해서 방금 확인 후 행동 연산을 원자적 연산으로 만들어보도록 하겠습니다. 물론 아래와 같이 synchronized를 사용해서 해결을 할 수도 있겠지만, 이 경우에는 한 스레드가 해당 동기화 블록을 모두 점유하기 때문에 다른 스레드는 아무 작업을 하지 못하고 기다려야 하는 문제가 있습니다.

public static class SharedCounter {
    private volatile int counter = 0;

    public synchronized void increment() {
        if (counter < 10000) {
            counter++;
        }
    }
    // ...
}

이로 인해서 성능 저하와 자원의 낭비가 일어날 수 있습니다. 이러한 문제를 해결하기 위해서 CAS 연산과 같이 락(lock)을 사용하지 않고 동시성 문제를 처리하는 논블로킹(non-blocking) 혹은 락 프리(lock-free) 방법이 등장하게 되었습니다.

락 프리(lock-free)

말 그대로 락을 사용하지 않는 알고리즘입니다. 락은 한 번에 하나의 스레드만 특정 코드 블록에 접근할 수 있도록 하는 방법이지만, 이로 인해서 성능 저하나 데드락 등의 문제가 발생할 수 있습니다. 반면, 락 프리 알고리즘은 락을 사용하지 않고 원자적 연산(예: CAS)을 활용해서 여러 스레드 간의 동기화를 수행합니다.

동작 방식

CAS 연산은 원자적 연산으로 이를 통해 락을 사용하지 않고도 동시성 문제를 해결할 수 있습니다. 아래와 같이 세 가지의 인자를 사용하게 됩니다.

  1. 작업할 메모리 위치 (공유 변수) \(V\)
  2. 예상하고 있는 값 \(A\)
  3. 새로운 값 \(B\)

CAS 연산의 동작을 말로 풀어서 설명해보면 "\(V\)에 들어 있는 값이 \(A\)라고 생각하는데, 만약에 실제로 \(V\)의 값이 \(A\)라면 \(B\)라는 값으로 바꿔줘. 만약 \(V\)의 값이 \(A\)가 아니라면 아무 작업도 하지 말고 \(V\)의 값이 뭔지 알려줘."라는 것입니다. 이를 직접 코드로 구현하면 다음과 같습니다.

public synchronized int compareAndSwap(int expectedValue, int newValue) {
	int oldValue = value;
    if (oldValue == expectedValue)
    	value = newValue;
    return oldValue;
}

만약 여러 스레드가 동시에 CAS 연산을 이용하여 특정 변수의 값을 수정하려고 할 때, 오직 한 스레드만이 성공적으로 값을 변경할 수 있습니다. 나머지 스레드들은 변경에 실패하겠지만, 락을 획득하여 대기 상태에 머무르는 대신, 수정에 실패했음을 알리는 통보를 받고 다시 시도할 기회를 얻게 됩니다.

예시 코드

현대의 CPU에서는 원자적인 CAS 연산을 기본적으로 지원하고 있는데, 자바 5 이전부터는 네이티브 코드를 작성하지 않는 한 하드웨어 프로세서의 CAS 연산을 호출할 수 없었지만 자바 5부터 java.util.concurrent.atomic 패키지의 Atomic 클래스를 통해서 하드웨어에서 지원하는 CAS 연산을 사용할 수 있게 되었습니다. CAS 연산을 직접적으로 지원하는 플랫폼의 경우에는 자바 프로그램을 실행할 때 CAS 연산 호출 부분을 직접 해당하는 기계어 코드(lock cmpxchg)로 변환해서 실행하게 됩니다. 만약 하드웨어에서 CAS 연산을 지원하지 않는 최악의 경우에는 JVM가 자체적으로 스핀 락(spin lock)을 사용해서 CAS 연산을 구현하게 됩니다.

스핀 락(Spin Lock)

회전을 의미하는 스핀(spin)이라는 이름에서도 알 수 있듯이, 스레드는 락을 획득하기 위해서 계속해서 회전하며 대기하는 것처럼 동작하는 것을 '스핀'이라고 부릅니다. 다시 말해서, 락을 이미 다른 스레드가 가져간 경우에 현재 스레드가 락이 풀릴 때까지 기다리면서 블록되지 않고 계속해서 루프를 돌며 락을 획득하려고 시도합니다(busy waiting). 락이 풀리면 스레드는 락을 획득하고 공유 리소스에 접근할 수 있게 됩니다.

import java.util.concurrent.atomic.AtomicInteger;

public class CheckThenActExample {

    public static void main(String[] args) throws InterruptedException {
        SharedCounter sharedCounter = new SharedCounter();

        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                sharedCounter.increment();
            }
        };

        Thread[] threads = new Thread[10];

        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(incrementTask);
            threads[i].start();
        }

        for (int i = 0; i < 10; i++) {
            threads[i].join();
        }

        System.out.println("최종 카운터 값: " + sharedCounter.getCounter()); // 10000
    }

    public static class SharedCounter {
        private AtomicInteger counter = new AtomicInteger(0);

        public void increment() {
            int current;
            do {
                current = counter.get();
            } while (!counter.compareAndSet(current, current < 10000 ? current + 1 : current));
        }

        public int getCounter() {
            return counter.get();
        }
    }
}

프로그램을 실행해보면 항상 일관된 값을 출력하는 것을 확인하실 수 있습니다. JIT 컴파일러가 실제로 컴파일한 코드를 살펴보면 cmpxchg 명령어 앞에 lock prefix가 붙은 것을 볼 수 있는데 CPU 레벨에서 해당 연산이 원자적으로 수행되도록 지원한다는 것을 나타냅니다. 이 lock prefix는 하나의 명령어만을 보호하며, CPU 자체에서 구현되기 때문에 소프트웨어에서 추가적으로 구현할 필요가 없습니다.

AtomicInteger의 내부 살펴보기

이 AtomicInteger의 내부를 살펴보면 다음과 같습니다. value가 volatile로 선언된 것을 확인할 수 있고, compareAndSet() 메서드 내부에서는 Unsafe 클래스를 사용하고 있는 걸 볼 수 있습니다. 이 Unsafe 클래스에는 말 그대로 안전하지 않은 저수준 연산을 수행하는 네이티브 메서드들이 모여있습니다. 그 중에서 저희가 확인해 볼 부분은 compareAndSetInt() 네이티브 메서드입니다.

public class AtomicInteger extends Number implements java.io.Serializable {
	private static final long VALUE
        = U.objectFieldOffset(AtomicInteger.class, "value");
	private volatile int value;
    
	// 만약 현재 값이 예상 값과 같으면, 값이 새로운 값으로 원자적으로 설정되며,
    // 메모리 효과는 VarHandle.compareAndSet에 지정된 대로 적용된다.
    // 성공적인 경우 true를 반환하고, false가 반환되는 경우 실제 값이 예상 값과 같지 않았음을 나타낸다.
	public final boolean compareAndSet(int expectedValue, int newValue) {
        return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
    }
    // ...
}

public final class Unsafe {
	// 이 메서드는 자바 변수를 원자적으로 업데이트한다.
    // 현재 값이 expected와 같다면, 변수 값을 x로 변경한다.
    // 이 연산은 volatile 읽기와 쓰기와 같은 메모리 의미론을 가지며,
    // 이 메서드는 C11의 atomic_compare_exchange_strong에 해당한다.
    @IntrinsicCandidate
    public final native boolean compareAndSetInt(Object o, long offset,
                                                 int expected,
                                                 int x);
    // ...
}

OpenJDK의 핫스팟 내부를 살펴보면 아래와 같이 cmpxchg를 사용하고 있는 것을 확인할 수 있습니다. 아래 함수의 역할은 주어진 객체의 특정 필드에 대해서 CAS 연산을 수행하고 연산의 성공 여부를 반환합니다.

// compareAndSetInt() 메서드에 대한 네이티브 구현. (openjdk/jdk/src/hotspot/share/prims/unsafe.cpp#L742)
// cmpxchg(Compare-And-Exchange)는 공유 메모리 위치의 값을 비교한 뒤에,
// 그 값이 예상 값과 동일한 경우 새로운 값으로 교체한다.
// 이 과정은 원자적으로 수행되며, 중간 상태를 다른 스레드가 관찰할 수 없도록 보장한다.
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
  oop p = JNIHandles::resolve(obj);
  volatile jint* addr = (volatile jint*)index_oop_from_field_offset_long(p, offset);
  return Atomic::cmpxchg(addr, e, x) == e;
} UNSAFE_END

CAS 연산에는 ABA 문제가 있지만 이것까지 설명하면 내용이 너무 길어질 것 같아서 생략하도록 하겠습니다.

자바 8 이후 ConcurrentHashMap의 내부 구조

생성자

initialCapacity는 맵의 초기 용량, concurrencyLevel은 맵을 동시에 업데이트하는 스레드 수를 말합니다. 여기서 loadFactor는 해시 테이블 내에 저장된 요소의 수와 테이블 크기 사이의 비율을 말하는데, 이 값이 높을수록 이 비율이 높아져 충돌 가능성이 증가하여 성능 저하가 있을 수 있으나 메모리 사용량은 감소하게 됩니다. 참고로 이 값은 오로지 초기 테이블 용량에만 영향을 줍니다.

적재율(load factor)

적재율은 위에서 언급한 대로 해시 테이블 내에 저장된 요소의 수와 테이블 크기 사이의 비율을 말합니다. 즉, 해시 테이블의 크기를 \(N\), 키의 개수를 \(K\)라고 했을 때 적재율은  \(\frac{K}{N}\)이 됩니다. 해시 테이블은 키 값을 인덱스로 사용하는 구조이기 때문에 적재율이 1보다 큰 해시 테이블의 경우에는 반드시 충돌이 발생하게 됩니다.

private static final int MAXIMUM_CAPACITY = 1 << 30;

public ConcurrentHashMap(int initialCapacity,
						 float loadFactor, int concurrencyLevel) {
	if (initialCapacity < concurrencyLevel)
		initialCapacity = concurrencyLevel;
	long size = (long)(1.0 + (long)initialCapacity / loadFactor);
	int cap = (size >= (long)MAXIMUM_CAPACITY) ?
		MAXIMUM_CAPACITY : tableSizeFor((int)size);
	this.sizeCtl = cap;
}

여기서 tableSizeFor()는 주어진 용량보다 크거나 같은 가장 작은 2의 거듭제곱 값을 반환한게 됩니다. 즉, 내부 해시 테이블의 크기를 2의 거듭제곱으로 유지하는 것입니다. MAXIMUM_CAPACITY은 말 그대로 해시 테이블의 최대 용량으로 \(2^{30}\)까지만 허용합니다. 상위 2비트를 제어 목적으로 사용하고 있기 때문입니다.

spread()

해시 값을 더 고르게 분포시키기 위해서 사용되는 보조 메서드입니다. 즉, 해시 테이블에서 해시 충돌을 줄이고 성능을 개선시키는 역할을 합니다.

// ConcurrentHashMap 내부에서 사용되는 특별한 해시 값들
static final int MOVED     = -1; // ForwardingNode
static final int TREEBIN   = -2; // TreeBin
static final int RESERVED  = -3; // ReservationNode
    
// 최상위 비트를 제외한 모든 비트가 1인 32비트 정수
static final int HASH_BITS = 0x7fffffff;

// 양수 범위 내에서 해시 값이 분포되도록 HASH_BITS와 AND 연산한다.
// ConcurrentHashMap에서 음수 해시 값은 특별한 의미를 가지기 때문이다.
static final int spread(int h) {
	// 상위 16비트와 하위 16비트를 섞어 해시 값의 분포를 개선한다.
	return (h ^ (h >>> 16)) & HASH_BITS;
}

tabAt()

ConcurrentHashMap의 내부 테이블에서 주어진 인덱스 i에 있는 노드를 안전하게 읽어 오는 역할을 합니다. 이 메서드는 원자적(atomic)으로 값을 읽어 오기 때문에 동시성 문제없이 사용할 수 있습니다.

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
	// 내부에서 getReferenceVolatile() 메서드를 호출하는데
	// 주어진 객체 o에서 offset 위치에 있는 참조 값을 원자적으로 가져온다.
	// volatile은 가시성(visibility)과 메모리 순서를 보장하는 역할을 한다.
	return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}

put()

각 빈(bin)에는 노드의 리스트가 있으며, 대부분 리스트에는 0개 혹은 1개의 노드만 존재합니다. 여기서 핵심은 CAS(Compare And Swap) 연산을 통해서 구현되어 있으며, 각 빈마다 락 객체를 할당하는 것에 메모리 공간을 낭비하는 걸 피하기 위해 빈 리스트의 첫 번째 노드 자체를 락으로 사용하고 있습니다. 참고로 해시 버킷의 키-값 쌍 노드의 인덱스는 (n - 1) & hash로 계산할 수 있습니다.

// onlyIfAbsent가 false인 putVal().
// 키가 현재 맵에 없는 경우에도 해당 키로 값을 저장할 수 있다.
public V put(K key, V value) {  
	return putVal(key, value, false);  
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
	// 키와 값이 null일 수는 없다.
	if (key == null || value == null) throw new NullPointerException();
	// 키의 해시 값을 더 고르게 분포시킨다.
	int hash = spread(key.hashCode());
	// 현재 빈(즉, 버킷)의 노드 수를 추적하는 변수다.
	int binCount = 0;
	// 중간에 동적으로 크기가 변하거나(resizing), 해시 충돌 등을 이유로
	// 반복문을 계속 실행하면서 매번 확인하고 찾게 된다.
	for (Node<K,V>[] tab = table;;) {
		Node<K,V> f; int n, i, fh; K fk; V fv;
		// 테이블이 비어있거나 아직 초기화 되지 않은 경우 초기화시킨다.
		// 크기를 별도로 지정하지 않은 경우 DEFAULT_CAPACITY(16)을
		// 초기 테이블 크기로 사용한다.
		if (tab == null || (n = tab.length) == 0)
			tab = initTable();
		// 해당 해시에 대한 버킷이 비어있으면 새 노드를 추가하고 루프를 종료한다.
		else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
			// 버킷이 비어있으면 CAS 연산을 통해 노드를 추가한다.
			if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
				break;
		}
		// 현재 테이블이 리사이징 중이면, 기존 테이블을 새로운 테이블로 전송시킨다.
		else if ((fh = f.hash) == MOVED)
			tab = helpTransfer(tab, f);
		// onlyIfAbsent가 참이면 해당 키가 없는 경우에만 값을 저장한다.
		else if (onlyIfAbsent
				 && fh == hash
				 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
				 && (fv = f.val) != null)
			return fv;
		else {
			V oldVal = null;
			// 동기화를 통해 현재 빈(버킷)의 첫 번째 노드를 잠근다.
			synchronized (f) {
				// 노드가 잠겨 있을 때, 업데이트 전에 해당 노드가 여전히 첫 번째 노드인지 확인한다. 그렇지 않으면 다시 시도한다. (double checking)
				if (tabAt(tab, i) == f) {
					if (fh >= 0) {
						binCount = 1;
						// 현재 처리 중인 빈(버킷)의 링크드 리스트를 순회한다.
						for (Node<K,V> e = f;; ++binCount) {
							K ek;
							// 기존의 값을 새로운 값으로 교체한다.
							if (e.hash == hash &&
								((ek = e.key) == key ||
								 (ek != null && key.equals(ek)))) {
								oldVal = e.val;
								if (!onlyIfAbsent)
									e.val = value;
								break;
							}
							// 기존의 값이 없는 경우 새 노드를 체인의 끝에 추가한다(separate chaining).
							Node<K,V> pred = e;
							if ((e = e.next) == null) {
								pred.next = new Node<K,V>(hash, key, value);
								break;
							}
						}
					}
					// TreeBin인 경우, 트리에 값을 추가하거나 기존 값을 갱신한다.
					else if (f instanceof TreeBin) {
						Node<K,V> p;
						binCount = 2;
						if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
													   value)) != null) {
							oldVal = p.val;
							if (!onlyIfAbsent)
								p.val = value;
						}
					}
					// ReservationNode는 여러 스레드가 동시에 똑같은 키에 computeIfAbsent와 같은
					// 연산을 수행하려고 할 때 충돌을 방지하기 위해서 '예약된 노드'임을 나타낸다.
					else if (f instanceof ReservationNode)
						throw new IllegalStateException("Recursive update");
				}
			}
			// 빈의 노드 수가 임계값(TREEIFY_THRESHOLD, 즉 8)을 넘어서면
			// 해당 빈의 자료구조를 링크드 리스트 대신 트리로 변환한다.
			// (참고로 해시 테이블의 길이가 MIN_TREEIFY_CAPACITY[64]보다 작다면, 테이블의 크기를 두 배로 늘리고 트리 변환 작업을 뒤로 연기함)
			// 이를 통해 성능 향상을 도모하고, 긴 링크드 리스트를 통해서
			// 검색을 해야했던 복잡도가 O(n)에서 O(logN)으로 줄어들게 된다.
			// 참고로 사용되는 트리는 레드-블랙 트리의 특수한 형태다.
			if (binCount != 0) {
				if (binCount >= TREEIFY_THRESHOLD)
					treeifyBin(tab, i);
				if (oldVal != null)
					return oldVal;
				break;
			}
		}
	}
	// ConcurrentHashMap의 크기가 1씩 증가한다.
	// 필요하면 현재 테이블을 확장할 필요가 있는지도 확인한다.
	addCount(1L, binCount);
	return null;
}

아래처럼 빈(버킷)이 레드-블랙 트리(TreeBin)로 관리가 되거나 단순한 링크드 리스트로 관리가 되는 것을 확인할 수 있습니다. 탐색 효율을 높이기 위해 필요에 따라서 버킷의 링크드 리스트 구현을 레드-블랙 트리로 전환하기도 하며, 특수한 노드(ForwardingNode)로 현재 테이블이 확장 중임을 나타내거나, 아직 값은 할당되지 않았지만 계산 중인 노드라 예약된 노드(ReservationNode)도 확인할 수 있습니다.

CAS 연산(Compare and Swap)을 통해서 원자성(atomicity)을 보장하며, volatile을 통해 가시성(visibility)과 메모리 순서를 보장하게 되어 스레드 세이프하도록 구현이 된 것을 추가적으로 확인할 수 있습니다. 자바 7 이전에는 세그먼트 단위(기본적으로 16개의 세그먼트)로 독립적으로 잠겼지만, 자바 8 이후부터는 락 단위가 더 세분화되어서 해시 테이블 내에 있는 버킷의 첫 번째 노드를 잠그게 됩니다. 따라서 전보다 동시성이 증가했다고 볼 수 있습니다.

참고

  • Java Concurrency in Practice by Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea (Addison-Wesley Professional, 2006, ISBN: 978-0321349606)

'프로그래밍 관련 > 자바' 카테고리의 다른 글

번외편. CompletableFuture  (0) 2023.04.12
31편. 스레드(Thread) (4)  (0) 2023.04.09
30편. 스레드(Thread) (3)  (0) 2023.04.07
invokedynamic의 내부 동작  (0) 2022.05.22
38편. 레코드(Record)  (0) 2022.05.20