Kihagyás

3. gyakorlat

Az előző gyakorlaton megismert monitor koncepció bizonyos esetekben erős megkötést jelent, mivel csak az adott objektum implicit zárjával és egyetlen feltételváltozójával dolgozhatunk ami korlátozza a párhuzamos programozás rugalmasságát. Például a vezérlés könnyebben holtpontba kerülhet, vagy bizonyos szinkronizációs problémamegoldás során nem érvényesül a természetesebb problémaleírás és kifejezésmód, ami így kevésbé áttekinthető és hatékony programot eredményez. Ezzel szemben ebben a gyakorlati anyagban megismert Lock és Condition interfészek nagyobb rugalmasságot és hatékonyságot kínálnak, segítségükkel például különálló feltételváltozókat hozhatunk létre (azaz több várakozási sort), és explicit módon kezelhetjük a zárolásokat.

Lock és Condition interfészek

Emlékeztető

Vegyük az előző órán vett termelő-fogyasztó probléma megoldását: mind a termelők, mind a fogyasztók ugyanazt az egyetlen feltételváltozót (várakozási sort) kénytelenek használni, ezért mindkét metódusban a notifyAll() hívásnak kell szerepelnie. Ez nem hatékony, hiszen a felélesztett folyamatok közül jó eséllyel egy kivételével mind visszakerül a várakozási sorra.

A ReentrantLock implementálja a Lock interfészt, amihez aztán tetszőleges számú feltételváltozót fogunk tudni hozzárendelni (Condition). Ezzel tehát egy újabb módszert kapunk a kölcsönös kizárás megvalósításához.

Lock implementációk

A gyakorlaton bemutatott Lock implementáció csak egy kis szeletét tárgyalja a különböző megoldásoknak.

Például, a ReentrantLock lehetőséget ad arra, hogy külső hatására a zárra várakozó blokkolt szál megszakításra (interrupt) kerüljön vagy akár időkorlátot is beállíthatunk a zárolásra várakozó szálak számára. Továbbá akár az éhezés (starvation) is elkerülhető vele ha fair stratégiával kerül példányosításra (a legrégebben várakozó szál kapja meg a zárat felélesztéskor). A ReentrantReadWriteLock specializálva van az olvasók-írók problémára, míg a StampedLock nem támogatja a beágyazott kritikus szakaszokba való belépést, ha ezek a szakaszok ugyanazzal a zárral vannak védve.

  • Új zár létrehozása: ReentrantLock lock = new ReentrantLock();

  • A zárolást a lock.lock() és az lock.unlock() metódusokkal tudjuk elvégezni.

  • A lock-hoz tartozó condition variable létrehozása: Condition condition = lock.newCondition();

  • A szál várakoztatása: condition.await();

  • Szál(ak) ébresztése: condition.signal(); vagy condition.signalAll();

Ne keverjük a synchronized és a Lock+Condition mechanizmusokat!

Ugyan hasonló célt szolgálnak, azonban ezeket nem szabad egyszerre használni egy adott kritikus szakaszban vagy osztályon belül, mert összeférhetetlenséget és nehezen észlelhető hibákat okozhatnak.

Termelő-fogyasztó probléma

Nézzük meg, hogy az előző órán vett termelő-fogyasztó problémát hogyan tudjuk megvalósítani a ReentrantLock osztály, és a Condition segítségével:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerProblemWithLock {
    public static void main(String[] args) {
        int numberOfProducers = 20;
        int numberOfConsumers = 10;
        Buffer buffer = new Buffer(10);

        ArrayList<Thread> threads = new ArrayList<>();
        for (int i = 0; i < numberOfProducers; i++) {
            Producer producer = new Producer("Producer-" + i, buffer);
            threads.add(producer);
        }
        for (int i = 0; i < numberOfConsumers; i++) {
            Consumer consumer = new Consumer("Consumer-" + i, buffer);
            threads.add(consumer);
        }

        // Szálak indítása véletlenszerű sorrendben
        Collections.shuffle(threads);
        for (Thread thread : threads) {
            thread.start();
        }
    }
}

class Buffer {
    private final int maxSize;
    private final ArrayList<Integer> list;
    private final Lock lock;
    private final Condition notFull;
    private final Condition notEmpty;

    Buffer(int maxSize) {
        this.maxSize = maxSize;
        this.list = new ArrayList<Integer>();
        this.lock = new ReentrantLock();
        this.notFull = lock.newCondition();
        this.notEmpty = lock.newCondition();
    }

    void deposit(Thread thread) {
        lock.lock();
        while (list.size() == maxSize) { // Ha a buffer tele van, várakozunk
            System.out.println(thread.getName() + " is waiting for free space.");
            try {
                notFull.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        int item = new Random().nextInt(100); // Elem hozzáadása
        try {
            Thread.sleep(100); // Szimuláljuk a munkát
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        list.add(item);
        System.out.println(thread.getName() + " buffer: " + list);
        notEmpty.signal();
        lock.unlock();
    }

    void extract(Thread thread) {
        lock.lock();
        while (list.isEmpty()) { // Ha a buffer üres, várakozunk
            System.out.println(thread.getName() + " is waiting for an item.");
            try {
                notEmpty.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        list.removeFirst(); // Elem kivétele a pufferből
        try {
            Thread.sleep(100); // Szimuláljuk a munkát
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(thread.getName() + " buffer: " + list);
        notFull.signal();
        lock.unlock();
    }
}

class Producer extends Thread {
    private final Buffer buffer;

    Producer(String name, Buffer buffer) {
        super(name);
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.deposit(this);
    }
}

class Consumer extends Thread {
    private final Buffer buffer;

    Consumer(String name, Buffer buffer) {
        super(name);
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.extract(this);
    }
}

Olvasók-írók probléma

Az olvasók-írók probléma egy klasszikus probléma, amely az erőforrás-megosztás optimalizálására fókuszál. Egy közös erőforráshoz, például egy adatbázishoz vagy fájlhoz, több szál férhet hozzá, de a hozzáférés típusa eltérő szabályokat igényel.

Több olvasó szál egyidejűleg férhet hozzá az erőforráshoz, mivel az olvasás nem módosítja az adatokat. Bizonyos megközelítések limitáljak az egyidőben történő olvasások számát is. Egy író szál kizárólagos hozzáférést igényel, azaz sem más író, sem olvasó nem férhet hozzá, amíg az írás zajlik. A kihívás az, hogy biztosítsuk az erőforrás konzisztenciáját (az inkonzisztens állapot adatvesztést és hibás működést eredményezhet), miközben minimalizáljuk a szálak várakozási idejét.

A probléma megoldásakor különböző stratégiák közül választhatunk, aszerint, hogy előnyben részesítjük-e valamelyik folyamatcsoportot a másikkal szemben.

  • Reader-preference: Az olvasó-preferencia azt jelenti, hogy ha olvasó van kritikus szakaszban, akkor az újonnan érkező olvasók bemehetnek mellé, még akkor is, ha van várakozó író. Előnye, hogy így maximalizálhatjuk a párhuzamosságot.

  • Writer-preference: Az író-preferencia azt jelenti, hogy az olvasók nem mehetnek be a kritikus szakaszba, ha van várakozó író, valamint írót választunk következő folyamatnak, amikor csak lehet. Emellett az írási műveletek frissítik az adatokat, így valószínűleg az alkalmazás szempontjából jobb, ha ez minél előbb megtörténik, és nem elavult adatokkal dolgozik.

Mindkét módszer hátránya, hogy éhezéshez vezethet, az előnyben részesített folyamatok bármeddig késleltethetik a másik csoport tagjait.

A ReentrantLock és a Condition osztályok segítségével hatékonyan kezelhető az olvasók és írók hozzáférése, ehhez külön feltételváltozókat érdemes létehozni az olvasók és írók számára, illetve 4 kizárólagos művelet segítségével vezéreljük a programot: olvasók belépési és kilépési műveleteivel és az írók belépési és kilépési műveletivel. A belépési műveletek fogják a várakoztatási feltételeket ellenőrizni (pl. egy író csak akkor léphessen be, ha nincs aktív olvasó vagy másik író).

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReadersWritersProblem {
    public static void main(String[] args) {
        Book book  =  new Book(2);

        ArrayList<Thread> threads = new ArrayList<>();
        for(int i = 0; i<10; i++) {
            Writer writer = new Writer("Writer-" + i, book);
            writer.start();
            Reader reader = new Reader("Reader-" + i, book);
            reader.start();
        }
    }
}

class Book {
    ReentrantLock lock;
    Condition readers;
    Condition writers;

    int maxReaders; // maximum ennyi párhuzamos olvasás történhet
    int currentReaders; // az aktuális olvasók számát tárolja
    boolean isWriting; // igaz, ha éppen írás történik

    public Book(int maxReaders) {
        this.maxReaders = maxReaders;
        this.isWriting = false;
        lock = new ReentrantLock();
        readers = lock.newCondition();
        writers = lock.newCondition();
    }

    void read() {
        // az olvasó megpróbál hozzáférni az erőforráshoz
        lock.lock();
        while (isWriting == true || currentReaders == maxReaders) {
            try {
                // System.out.println(Thread.currentThread().getName() + " is waiting to read.");
                readers.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        currentReaders++;
        lock.unlock();

        // ez nem része a kritikus szakasznak, az olvasás mehet párhuzamosan
        System.out.println(Thread.currentThread().getName() + " is reading.. (" + currentReaders + ")");

        // az olvasó befejezte az erőforrás használatát
        lock.lock();
        currentReaders--;
        readers.signalAll();
        writers.signal();
        lock.unlock();
    }

    void write() {
        // az író megpróbál hozzáférni az erőforráshoz
        lock.lock();
        while (isWriting == true || currentReaders > 0) {
            try {
                // System.out.println(Thread.currentThread().getName() + " is waiting to write.");
                writers.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        isWriting = true;
        lock.unlock();

        // ez nem része a kritikus szakasznak, de biztosan csak egy író fogja futtatni egy időpillanatban
        System.out.println(Thread.currentThread().getName() + " is writing..");

        // az író befejezte az erőforrás használatát
        lock.lock();
        isWriting = false;
        writers.signal();
        readers.signalAll();
        lock.unlock();
    }
}

class Reader extends Thread{
    Book book;

    public Reader(String name, Book book) {
        super(name);
        this.book = book;
    }

    @Override
    public void run() {
        this.book.read();
    }
}

class Writer extends Thread{
    Book book;

    public Writer(String name, Book book) {
        super(name);
        this.book = book;
    }

    @Override
    public void run() {
        this.book.write();
    }
}

Az olvasókat nem akadályozza semmi, hogy párhuzamosan olvassanak, illetve a kritikus szakaszban lévő olvasó mellé korlátozott számú új olvasó is bemehet, még akkor is, ha van várakozó író, így a fenti kódrészlet preferálja az olvasókat.

Semaphore osztály

A szemafor (semaphore) egy olyan objektum, amelynek segítségével megvalósítható a kölcsönös kizárás. Két privát adattagot tartalmaz, az egyik egy egész számot tárol, melyet a szemafor értékének nevezünk. A másik egy várakozási sor (queue), amely a folyamatok késleltetésekor kap szerepet. A szemafor létrehozásakor megadhatjuk a kezdőértékét (a várakozósor kezdetben mindig üres). Ha a tárolt érték csak 0 vagy 1 lehet, akkor a szemafort nevezhetjük bináris szemafornak (binary semaphore). A nem bináris szemafort pedig szokás számláló szemafornak (counting semaphore) is nevezni. Az utóbbi hasznos lehet, ha egy időben több, de korlátozott számú szálnak akarunk hozzáférést biztosítani egy adott erőforráshoz.

Szemafor értéke = erőforrás

A szemaforoknál az "erőforrás" inkább absztrakt fogalom, ami azt jelenti, hogy nem egy konkrét objektumra vagy adattagra vonatkozik, hanem inkább tekinthetjük egy hozzáférési jognak, amit a szálak kérhetnek.

Monitor szemaforként használva

A bináris szemafor nem más, mint egy olyan monitor, amelynek egy boolean változó tárolja a foglaltsági állapotát, a folyamatok egy feltételváltozó várakozási során várakozhatnak, két műveletét pedig két monitoreljárás valósítja meg. Szerencsére nem kell minden esetben egy ilyen működésű osztályt megvalósítanunk, mivel a Java-ban beépített Semaphore osztály is. A két művelete:

  • acquire(): Ez a művelet lefoglal egy erőforrást. Ha nincs elérhető erőforrás (pl. ha a számláló értéke 0), akkor a szál várakozni fog, amíg más szál fel nem szabadít egy erőforrást.

  • release(): Ez a művelet felszabadít egy erőforrást. Amikor egy szál befejezi a munkáját, és felszabadítja az erőforrást, a szemafor növeli a számlálót, és ha vannak várakozó szálak, azok közül egy folytathatja a munkáját.

Étkező filozófusok probléma

Az étkező filozófusok problémája az erőforrások megosztásának és kezelésének kihívásait szemlélteti a bináris szemaforok használatán keresztül. A probléma szerint több filozófus ül egy kör alakú asztalnál, és evéshez készülnek, amihez villákra van szükségük. Minden filozófus előtt egy tányér található, és a tányérok között egy-egy villa helyezkedik el. Ahhoz hogy egy filozófus enni tudjon, fel kell vennie a tányérja bal és jobb oldalán található 1-1 villát. Látható, hogy a verseny a villákért folyik, a villák reprezentálják az erőforrást. Ha egy filozófus befejezte az evést, akkor visszarakja a villákat az asztalra, amit így a szomszédos tányéroknál elhelyezkedő filozófusok használhatnak.

dining_philo

  • Minden filozófusnak szüksége van a jobb és bal oldali villájára az evéshez.

  • Egy villa egyszerre csak egy filozófus birtokában lehet (a villákat reprezentálhatjuk bináris szemaforként, így egyszerre csak egy filozófus használhatja azt).

  • Ha minden filozófus egyszerre próbálja megszerezni az azonos oldali villáját, akkor egyikük sem tudja befejezni az evést, és a futás holtpontba kerül.

1. megoldás

A holtpont elkerüléséhez alkalmazhatjuk például azt a megoldást, hogy az egyik filozófus fordított sorrendben veszi fel a villákat. Így mindenki ugyanabban a sorrendben veszik fel és teszik le a villákat, kivéve az egyiket, ezzel garantálva az erőforrások folyamatos körforgását.

2. megoldás

Egy másik megközelítés lehet a holtpont elkerüléséhez, hogy egy számláló szemafort, egy úgynevezett „guard”-ot vezethetünk be. Ezzel korlátozhatjuk az asztalnál a filozófusok számát, akik éppen próbálják megszerezni a villákat.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.Semaphore;

public class DiningPhilosophers {
    public static void main(String[] args) {
        Semaphore s1  = new Semaphore(1);
        Semaphore s2  = new Semaphore(1);
        Semaphore s3  = new Semaphore(1);
        Semaphore s4  = new Semaphore(1);
        Semaphore s5  = new Semaphore(1);
        // Semaphore guard = new Semaphore(4);

        new Philosopher(s1, s2).start();
        new Philosopher(s2, s3).start();
        new Philosopher(s3, s4).start();
        new Philosopher(s4, s5).start();
        new Philosopher(s5, s1).start();
    }
}

class Philosopher extends Thread{
    Semaphore leftFork;
    Semaphore rightFork;

    public Philosopher(Semaphore leftFork, Semaphore rightFork) {
        this.leftFork = leftFork;
        this.rightFork = rightFork;
    }

    @Override
    public void run() {
        for(int i = 0; i < 5; i++) {
            eat();
        }
    }

    private void eat() {
        try {
            this.leftFork.acquire();
            System.out.println(Thread.currentThread().threadId() + " picked up its left fork");
            this.rightFork.acquire();
            System.out.println(Thread.currentThread().threadId() + " picked up its right fork");

            Thread.sleep(100);
            System.out.println(Thread.currentThread().threadId() + " nom-nom...");


            System.out.println(Thread.currentThread().threadId() + " put down its right fork");
            this.rightFork.release();
            System.out.println(Thread.currentThread().threadId() + " put down its left fork");
            this.leftFork.release();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Monitor vagy Lock segítségével is megoldható a feladat

1
2
3
4
5
synchronized (leftfork) {
    synchronized (rightfork) {
        ...
    }
}
1
2
3
4
5
leftfork.lock();
rightfork.lock();
        ...
rightfork.unlock();
leftfork.unlock();

Parkolóház számláló szemaforral

A acquire(int permits) metódus azt jelenti, hogy a szál az adott számú engedélyt próbálja megszerezni. Ha a szemafor nem rendelkezik elegendő engedéllyel, akkor a szál várakozni fog, amíg az engedélyek száma elégséges nem lesz. A release(int permits) metódus lehetővé teszi, hogy a szál egyszerre több engedélyt adjon vissza, növelve ezzel a szemafor számlálóját.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.Random;
import java.util.concurrent.Semaphore;

public class ParkingLot {
    public static void main(String[] args) {
        Semaphore parkingLot = new Semaphore(4);

        for (int i = 1; i <= 10; i++) {
            new Vehicle(parkingLot).start();
        }
    }
}

class Vehicle extends Thread {
    private final Semaphore parkingLot;

    public Vehicle(Semaphore parkingLot) {
        this.parkingLot = parkingLot;
    }

    @Override
    public void run() {
        try {
            System.out.println("Vehicle " + Thread.currentThread().threadId() + " needs 2 spaces.");
            parkingLot.acquire(2);
            System.out.println("Vehicle " + Thread.currentThread().threadId() + " has parked." );

            Thread.sleep(100);

            System.out.println("Vehicle " + Thread.currentThread().threadId() + " has left.");
            parkingLot.release(2);
        } catch (InterruptedException e) {
            System.err.println("Vehicle " + Thread.currentThread().threadId() + " was interrupted.");
        }
    }
}

Gyakorló feladatok

Feladat

Készíts egy SharedInt nevű osztályt, amely endelkezik egy privát adataggal egész számok tárolására illetve a következő metódusokkal:

  • setValue(int newValue) - kizárólagos módon beállítja a value változó értékét
  • decrement() - kizárólagos módon csökkenti a value változó értékét

Készíts egy Modifier nevű osztályt, amely megvalósítja a Runnable interfészt. A szál feladata, hogy véletlenszerűen meghívja a konstruktorában átadott SharedInt objektum két metódusának egyikét.

Teszteld a programot a main metódusban! Hozz létre egy SharedInt objektumot, majd indíts el 20 párhuzamosan futó szálat. A szálak befejeződése után írd ki az objektum értékét.

A feladatot old meg külön-külön a Lock interfész és a Semaphore osztály használatával!

Feladat

Implementálj egy osztályt egy parkoló reprezentálására, ParkingLot néven, amely 10 kocsit képes befogadni. Készíts el a következő metódusokat:

  • enter(): beenged egy kocsit a parkolóba, amennyiben van szabad hely, egyéb esetben a kocsinak várakoznia kell.

  • leave(): kiléptet egy kocsit a parkolóból, és értesíti az esetlegesen várakozókat a parkolóhely felszabadulásáról.

A szükséges metódusokat lásd el kölcsönös kizárással. Csak és kizárólag a szükséges kódrészlet legyen része a kritikus szakasznak.

Készíts egy Car osztályt, amely a Thread osztályból származik. Az osztály a konstruktorában egy ParkingLot objektumot kap. A szál feladata az enter() és a leave() metódusok meghívása ebben a sorrendben.

Teszteld a programot a main metódusban! Hozz létre egy ParkingLot-ot, majd indíts 100 párhuzamosan futó Car szálat. Ne használd a megvalósításhoz a join és a sleep metódusokat.

A feladatot old meg külön-külön a Lock+Condition interfészek és a Semaphore osztály használatával!


Utolsó frissítés: 2025-03-23 20:14:57