Куда текут абстракции многопоточности

Или казалось бы, причём здесь Java








Глеб Смирнов, me@gvsmirnov.ru

Disclaimer

На самом деле, всё не так. Изложенный на данном семинаре материал является общим и поверхностным описанием того, как и почему работает многопоточность. Во многих реальных случаях происходящее может отличаться от описанного здесь. Многие из теоретических аспектов также опущены или упрощены.

Just another brick in the memory wall

Moore's law

Число транзисторов, умещающихся на интегральной схеме, удваивается каждые два года

Частота процессоров почти не растёт

За последние 5-10 лет практически не увеличилась частота процессора, поскольку возникли физические ограничения

Частота vs размер

Частота vs энергоэффективность

Параллельные алгоритмы в массы! Нужно больше блоков исполнения! Даёшь больше нативных потоков! Не допустим простоя процессора! Нужно использовать модель акторов!

Cache coherency

"А что, если у потоков есть общая память?"

Протокол MESI

Сообщения протокола MESI

Store Buffers

Не дожидаясь Invalidate Acknowledge, поместить
новое значение в буфер и продолжить выполнять инструкции

Когда придут все acknowledge, произвести запись

CPU 1 CPU 2
value = 10;
finished = true;
while (!finished);
assert value == 10;
Казалось бы, что может пойти не так?
finished: E
value: I
finished: I
value: E
> value = 10;
< store_buffer(value)
< read_inv(value)
> while (!finished);
< read(finished)
> finished = true;
finished: M
< read_resp(finished)
finished: S



finished: S
> assert value == 10;

Assertion FAILS!

< read_resp(value)
< invalidate_ack(value)
value: I
                            
< writeback(value)

Invalidate queues

Invalidate Acknowledge будет отправлен моментально,
а фактически инвалидацию процессор сделает тогда,
когда ему это будет удобно. При этом до того, как он это сделает,
не будет отправлено ни одно сообщение по соответствующей ячейке памяти

CPU 1 CPU 2
value = 10;
finished = true;
while (!finished);
assert value == 10;
Казалось бы, что ещё может пойти не так?
finished: E
value: S
finished: I
value: S
> value = 10;
< store_buffer(value)
< invalidate(value)
> while (!finished);
< read(finished)
> finished = true;
finished: M
< read_resp(finished)
finished: S
invalidate_ack(value)
invalidate_queue(value)

finished: S
> assert value == 10;

Assertion fails AGAIN!

< writeback(value)
value: I

Trade-off

Performance vs "free" coherence

Есть грань между производительностью и "простотой" написания многопоточного приложения

Memory Model

Описание того, как потоки взаимодействуют с памятью в некоторой среде исполнения. Позволяет "автору" кода (разработчику, компилятору, ...) понять, как их код всё же будет выполнен

Memory Barrier

Инструкция для процессора, запрещающая допускать reordering инструкций чтения и/или записи. В простейшем случае различают

Store Memory Barrier

Требование у процессора выполнить все store, уже находящиеся
в буфере, прежде чем выполнять те, что попадут туда
после этой инструкции

Load Memory Barrier

Требование у процессора применить все invalidate, уже находящиеся в очереди, прежде чем выполнять какие-либо инструкции load

CPU 1 CPU 2
value = 10;
storeMemoryBarrier();
finished = true;
while (!finished);
loadMemoryBarrier();
assert value == 10;
finished: E
value: S
finished: I
value: S
> value = 10;
< store_buffer(value)
< invalidate(value)
> while (!finished);
< read(finished)
> storeMemoryBarrier();
value: M
< writeback(value)
value: E
invalidate_ack(value)
invalidate_queue(value)


> finished = true;
finished: M
< read_resp(finished)
finished: S



finished: S


< read_resp(value)
value: S

> storeMemoryBarrier();
value: I
< read(value)

value: S

> assert value == 10;

Assertion passed

Membar ‐ инструмент HMM

Различных HMM очень много: они варьируются не только от вендора к вендору, но даже и внутри одной линейки одного производителя

Write Once

Run Anywhere

Java Memory Model

Модель памяти, не зависящая от реального железа, на котором запущена JVM

Отношение
happens-before

Пусть есть поток X и поток Y (не обязательно отличающийся
от потока X). И пусть есть операции A (выполняющаяся в потоке X)
и B (выполняющаяся в потоке Y)

В таком случае, A happens-before B означает, что все изменения, выполненные потоком X до момента операции A и изменения, которые повлекла эта операция, видны потоку Y в момент выполнения операции B и после выполнения этой операции

Отношение happens-before транзитивно

operationA happens-before operationB

Где есть happens-before

Visibility

Гарантия того, что изменения, выполненные одним потоком, будут видны другому потоку

Пример с volatile

volatile boolean finished = false;

// Thread 1
value = 10;
finished = true;

// Thread 2
while (!finished);
assert value == 10;

Пример с final

class ValueHolder {
  int first;
  final int second;
  ValueHolder(int first, int second) {
    this.first = first;
    this.second = second;
  }
}

//Thread 1
value = new ValueHolder(4, 2);

//Thread 2
while(value == null);
assert value.first == 4;
                        // May fail!

С final всё не так просто

Использование final даёт особое, не транзитивное, happens-before

Не транзитивное? А так?

class ValueHolder {
  int first;
  final int second;
  ValueHolder(int val) {
    this.first = val;
    this.second = this.first;
  }
}

//Thread 1
value = new ValueHolder(1334);

//Thread 2
while(value == null);
assert value.first == 1334;
                             // Still may fail!

С final всё не так просто

happens-before есть от любой записи, случившейся-до заморозки сконструированного объекта (freeze), но только до тех чтений, что обращаются к этому объекту или его dereference chain

На одном happens-before далеко не уедешь

volatile int someInt;

// ...

someInt++; // not atomic!

Atomicity

Compare And Set (CAS, Compare And Swap) ‐ операция, неделимо проверяющая значение ячейки и, если оно соответствует ожиданиям, меняет его на указанное.

boolean compareAndSet(Wrapper<T> target, T expected,
                                         T newValue) {
    if(target.getValue().equals(expected)) {
        target.setValue(newValue);
        return true;
    } else {
        return false;
    }
}

Atomicity

Используйте пакет java.util.concurrent.atomic, если необходима только атомарность. Атомарные операции нативно поддерживаются большинством современных процессоров.

Пример с AtomicInteger

AtomicInteger visitors = new AtomicInteger();

//...

logger.debug("Visitor #" + visitors.incrementAndGet());

Ещё пример с AtomicInteger

AtomicInteger masterId = new AtomicInteger();

//...

if(masterId.compareAndSet(NO_MASTER, myId)) {
    logger.info("I am the master, performing operation");
    performOperation();
} else {
    logger.info("Other master detected");
}

Mutual Exclusion

Исключение влияния потоков друг на друга. В критической секции
в один момент времени может находиться лишь один поток

Следует использовать, если поток должен изменить несколько значений, и между этими изменениями состояние системы неконсистентно

Mutual Exclusion: основные инструменты

Пример с synchronized

synchronized(this) {
    lastUserId = user.id;
    lastAction = currentAction;
}

Пример с ReentrantLock

ReentrantLock lock = new ReentrantLock();

//...

lock.lock();

try {
    lastUserId = user.id;
    lastAction = currentAction;
} finally {
    lock.unlock();
}

ReentrantLock vs synchronized

© Java Platform Performance BoF
by Aleksey Shipilev and Sergey Kuksenko
http://shipilev.net/pub/talks/jeeconf-May2011-performanceBoF.pdf

Synchronization

Согласованность очерёдности действий потоков. Например, поток-вычислитель не может ничего вычислить, пока не подгрузилась в фоне свежая порция данных

Synchronization: основные инструменты

Пример с wait/notify + synchronized

boolean finished = false;
// Thread 1
value = 10;
synchronized(this) {
    finished = true;
    notifyAll();
}


// Thread 2
while(!finished) {
    synchronized(this) {
        wait();                                             //BROKEN!
    }
}

assert value == 10;

Пример с wait/notify + synchronized

boolean finished = false;
                                                         //MESSY!
// Thread 1
value = 10;
synchronized(this) {
    finished = true;
    notifyAll();
}



// Thread 2
while(true) {
    synchronized(this) {
        if(!finished) wait();
        if(finished) break;
    }
}

assert value == 10;

Пример с wait/notify + synchronized

boolean finished = false;
// Thread 1
value = 10;
synchronized(this) {
    finished = true;
    notifyAll();
}


// Thread 2
synchronized(this) {
    while(!finished) {
        wait(); // monitorexit
    }
}

assert value == 10;

Пример с CountDownLatch

CountDownLatch latch = new CountDownLatch(1);

//Thread 1
value = 10;
latch.countDown();

//Thread 2
latch.await();
assert value == 10;

Ещё полезные инструменты

Summary

Полезно почитать: