Kihagyás

Java - I.

Ebben a fejezetben folytatjuk a közös memóriás programozást és megismerkedünk a Java programozási nyelv alapvető párhuzamos konstrukcióival.

A JDK 21 hivatalos Javadoc-ja itt érthető el.

Szálak létrehozása

A Java-ban a végrehajtási egységet szálnak nevezzük. Szekvenciális programok esetében az összes utasítás a main metódusban fut le, amelyet a main thread hajt végre. A fő szál által létrehozott újabb szálak használatával lehetőség nyílik arra, hogy a program különböző részei párhuzamosan hajtsanak végre műveleteket, ezzel felgyorsítva a feldolgozást és növelve a teljesítményt.

Általunk definiált működésű szálakat a Thread osztály származtatásával, vagy a Runnable interfész implementálásával tudunk létrehozni. Ahhoz, hogy a szál működési logikáját (azaz az elvégzendő feladatot) meghatározzuk, felül kell írnunk a run() metódust. Ezt követően a start() metódussal fogjuk tudni elindítani a szálat.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HelloWorld {

    public static void main(String[] args) {
        Thread thread = new ExampleThread();
        Thread runnable = new Thread(new ExampleRunnable());

        thread.start();
        runnable.start();
    }
}

class ExampleThread extends Thread {
    @Override
    public void run() {
        System.out.println("Hello World - Thread");
    }
}

class ExampleRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Hello World - Runnable");
    }
}

Háttérben futó szálak

A main thread által végreahajtott szekvenciális programokat tekinthetjük egyszálú vezérlésnek, de ez nem jelenti azt, hogy nincsenek további, a háttérben futó szálak. Például a Java rendszer által kezelt daemon szálak olyan háttérfolyamatokat futtatnak, mint például a garbage collector vagy a finalizer, és automatikusan megszűnnek, amikor az összes non-daemon szál befejeződött.

A main thread terminálása nem jelenti a teljes program végét, ha van még futó, nem-démon szál. A JVM aktívan tartja az alkalmazást mindaddig, amíg legalább egy ilyen szál fut.

A Thread osztály számos hasznos metódust biztosít, amelyek segítségével kezelhetjük és monitorozhatjuk a szálak működését, állapotát. Az alábbiakban néhány fontosabb metódust emelünk csak ki:

  • Thread.currentThread(): Az aktuálisan futó szálra mutató referencia lekérése.

  • Thread.sleep(long millis): A szál futását milliszekundumban megadott időre felfüggeszti.

  • threadId(): A szál egyedi azonosítóját adja vissza. A getId() 19-es verzió óta deprecated, használata hosszútávon váratlan viselkedést eredményezhet!

  • getName(): A szál nevét adja vissza. Ez a szálnak adott tetszőleges név, nem egyedi.

  • isAlive(): Ellenőrzi, hogy az adott szál éppen végrehajtás alatt van-e.

  • isDaemon(): Ellenőrzi, hogy az adott szál daemon szál-e.

  • getPriority() és setPriority(): A szál prioritásának lekérdezése vagy beállítása. A nagyobb prioritású szál nagyobb eséllyel kap processzoridőt.

  • yield(): Egy jelzés az ütemezőnek, hogy a jelenlegi szál hajlandó átadni a vezérlést más szálaknak, de nem garantálja, hogy ez a váltás valóban meg is történik.

  • getState(): Lekérdezi a szál aktuális állapotát.

  • join(): Biztosítja, hogy egy szál befejezze a futását, mielőtt a program továbblépne a következő utasításra.

InterruptedException

Van arra is lehetőség, hogy egy másik szál megszakítson egy szálat, jelezve neki, hogy le kell állnia a végrehajtással, amint lehetséges. Erre akkor lehet szükség, ha egy szál blokkoló műveletet végez, például túl hosszú ideig várakozik (sleep(..)). Ahhoz, hogy a szál megfelelően kezelhesse a megszakítást, bizonyos metódusok InterruptedException kivételt dobnak.

Szálállapotok

Egy szál életciklusa során különböző állapotokat vehet fel, amit a fent említett Thread.getState() metódussal hívhatunk le.

  • NEW: A szálat létrehozták, de még nem indították el a start() metódussal. Ebben az állapotban a szál nem hajt végre semmilyen műveletet.

  • RUNNABLE: A szál készen áll a futásra, és a JVM megpróbálja ütemezni a processzoron. Bár "futásra kész", nem feltétlenül fut azonnal, mivel más szálak is versenyezhetnek az erőforrásokért.

  • TERMINATED: A szál a run() metódus végrehajtásával kerül ebbe az állapotba. Végállapot, a szál nem indítható el ismét.

  • TIMED_WAITING: A szál egy meghatározott ideig vár, például a sleep(..) metódus hívásakor. Az idő lejárta után automatikusan visszakerülhet a RUNNABLE állapotba.

  • WAITING: A szál határozatlan ideig vár egy másik szál műveletére vár, hogy folytathassa a végrehajtást. Tipikusan valamilyen feltétel teljesülésére van szükség a folytatáshoz, amihez explicit értesítés szükséges.

  • BLOCKED: Amikor egy másik szál jelenleg használja a szükséges erőforrást, és a várakozó szál nem tudja folytatni a végrehajtást, amíg az erőforrás zárolása fel nem oldódik.

Szálállapotok

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ThreadStates {

    public static void main(String[] args) {
        Thread thread1 = new Thread();
        System.out.println(thread1.getState()); 

        Thread thread2 = new RunnableThread();
        thread2.start();

        Thread thread3 = new TimedWaitingThread();
        thread3.start();

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(thread3.getState()); 
        System.out.println(thread2.getState()); 
    }
}

class RunnableThread extends Thread {
    @Override
    public void run() {
        System.out.println(this.getState()); 
    }
}

class TimedWaitingThread extends Thread {
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Egy szál terminálásnak bevárása

Annak érdekében, hogy a speciális szinkronizációt megvalósítsuk a szálak között, a join() metódust alkalmazzuk: az utasítást hívó szál addig blokkolásra kerül, amíg a metódust végrehajtó szál nem terminál. A metódus esetén szintén gondoskodnunk kell az InterruptedException kivétel kezeléséről. Egy időkorlát is megadható paraméterként, amely meghatározza, hogy mennyi ideig várakozzon a hívó szál a befejezére (join(long millis)).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class Join {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello");

        Thread.currentThread().join();

        System.out.println("World!");
    }
}

Kölcsönös kizárás

A közös memóriát használó párhuzamos programozás fő kihívása a folyamatok által közösen használt változók kezelése. Amikor több szál ugyanazon a változón vagy adatszerkezeten (erőforráson) osztozik, elkerülhetetlenül felmerülnek olyan problémák, amelyek nem megfelelő kezelés esetén hibás programvégrehajtáshoz vagy helytelen eredményhez vezethetnek. A versenyhelyzetek akkor jelentkeznek, amikor több szál egy időben próbál hozzáférni és módosítani egy közös erőforrást (pl. egy számlálót), és a végrehajtás sorrendje meghatározza az eredményt.

A párhuzamos programozás során azokat a kódrészleteket, amelyek hozzáférnek és módosítanak közös változókat (tipikusan az erőforrásokat kezelő programrészeket) kritikus szakaszoknak nevezzük. Ezeket a szakaszokat kell megfelelő módon jelölni, hogy a végrehajtás során egy időpillanatban csak egy szál férhessen hozzájuk.

Ahhoz, hogy kölcsönös kizárást megvalósítsunk Java-ban, szükségünk van egy másik, folyamatinterakciók szabályozására szolgáló eszközre, a monitorra. A monitort úgy kell elképzelni, mintha egy erőforrást zártunk volna a belsejébe (pl. privát adattagok), az erőforrásra vonatkozó kritikus szekciókat pedig a publikus eljárások törzsében helyeztük volna el. Mivel tehát a monitor egy osztály jellegű konstrukció, így Java esetében bármelyik Java osztály monitorrá tehető. Nézzük is meg pontosabban, milyen tulajdonságokat és műveleteket fogunk használni monitor használatának esetén Java-ban:

  • privát adattagok, konstansok

  • inicializáló eljárás (konstruktor)

  • monitor eljárás (synchronized blokk és metódus)

  • implicit zár és feltételváltozó (osztály és objektumszintű)

  • wait(), notify()/notifyAll() metódusok

    • wait(): A hívó folyamat a várakozási sorra kerül, a monitor zárolását feloldja, így egy másik folyamatnak lehetősége van belépni

    • notify(), notifyAll(): egy / az összes várakozó folyamat felélesztése, ha a sor nem üres

Fontos megemlíteni, hogy most megismert synchronized kulcsszót Java-ban a kölcsönös kizárás megvalósításához használjuk és nem a korábbi órákon megismert szálak közötti szinkronizáció megvalósításához.

Tehát a korábban említett implicit zárat a synchronized kulcsszó kezeli. Ennek két fő megvalósítása van: metódusszintű és blokkszintű szinkronizáció. Ha egy másik szál is végre szeretné hajtani ugyanazt a műveletet, várakoznia kell, amíg a jelenleg futó szál be nem fejezi azt, és fel nem oldja az objektum zárolását. Tipikusan a szálak olyan objektumokat zárolnak, amelyek valamilyen erőforrást reprezentálnak.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class KritikusSzakasz {

    public static final int N = Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) {

        Buffer buffer = new Buffer();
        for (int i = 0; i<N; i++) {
            Thread t = new Thread(new MyRunnable(buffer));
            t.start();
        }
    }
}

class MyRunnable implements Runnable {

    private Buffer buffer;

    public MyRunnable(Buffer buffer) {
        super();
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.kritikusSzakasz();
        buffer.kritikusSzakaszBlokk();
    }

}

class Buffer {
    public synchronized void kritikusSzakasz() {
        System.out.println("Kritikus szakaszba lepett: " + Thread.currentThread().getName());
        System.out.println("Kritikus szakaszbol kilepett: " + Thread.currentThread().getName());
    }

    public void kritikusSzakaszBlokk() {
        synchronized(this) {
            System.out.println("Kritikus szakaszba (blokk) lepett: " + Thread.currentThread().getName());
            System.out.println("Kritikus szakaszbol (blokk) kilepett: " + Thread.currentThread().getName());
        }
    }
}

Egy szál az életciklusa során különböző állapotokat vehet fel. Most nézzük meg az alábbi példa segítségével, hogy kölcsönös kizárást alkalmazva milyen módon kerül egy szál a BLOCKED állapotba:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ThreadStateBlocked {
    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        Thread thread1 = new BlockedThread(lock);
        Thread thread2 = new BlockedThread(lock);

        thread1.start();
        Thread.sleep(100);
        thread2.start();

        Thread.sleep(100); 
        System.out.println(thread1.getState()); 
        System.out.println(thread2.getState());
    }
}

class BlockedThread extends Thread {
    final Object lock;

    public BlockedThread(Object lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Termelő-fogyasztó probléma

A probléma általános definíciója szerint kettő különböző típusú szállal dolgozunk: termelő és fogyasztó. Továbbá van benne egy tároló / rekesz (tipikus elnevezése a buffer), ez lesz az erőforrás, amit zárolunk (illetve pontosabban a buffert reprezentáló objektumot). A termelő feladata elemek létrehozása, és behelyezése a bufferba. A fogyasztó feladata az elemek kivétele és felhasználása. A problémát a tároló véges kapacitása okozza, azaz nem tud tetszőleges mennyiségű elemet tárolni. Így ha kiürül, vagy megtelik a raktár, akkor valamelyik típusú folyamatoknak biztosan várakoznia kell.

Az tisztán látható, hogy az elem tárolóba való elhelyezése illetve kivétele lesz az a művelet, aminek kizárólagos módon kell lennie, azaz ezt a két folyamatot kell majd ellátnunk synchronized kulcsszóval (azaz zárolni az erőforrást). Abban az esetban, ha termelő teli tárolóval találkozik, vagy ha a fogyasztó szál üres tárolóval találkozik, akkor várakozniuk kell (wait()). Amennyiben elem került a tárolóba vagy ha terméket vettek ki a tárolóból, akkor az adott szálnak fel kell ébresztenie a várakozó szálakat (notifyAll()), hiszen lehetőséget kell biztosítani a várakozó szálaknak, hogy folytathassák a futást.

Fontos megjegyezni, hogy a várakozási feltételt (wait()) minden esetben while ciklusba kell tenni! Amikor egy várakoztatott folyamat felélesztésre kerül, akkor folyamat egyszerűen beáll a monitort zárolni szándékozó folyamatok közé. Ha egy másik várakozó folyamat kapja meg előbb a monitort, akkor érvénytelenítheti a feltételt, ezért azt újra meg kell vizsgálni: ha a feltétel továbbra sem teljesül, akkor ismételten várakoztatnunk kell a szálat. Egyszerű esetvizsgálat esetén (if ) a buffer méretére adódó követelmény sérülhet.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;

public class ProducerConsumerProblem {
    public static void main(String[] args) {
        int numberOfProducers = 20;
        int numberOfConsumers = 10;
        Buffer buffer = new Buffer(10);

        ArrayList<Thread> threads = new ArrayList<>();
        for (int i = 0; i < numberOfProducers; i++) {
            Producer producer = new Producer("Producer-" + i, buffer);
            threads.add(producer);
        }
        for (int i = 0; i < numberOfConsumers; i++) {
            Consumer consumer = new Consumer("Consumer-" + i, buffer);
            threads.add(consumer);
        }

        Collections.shuffle(threads);
        for (Thread thread : threads) {
            thread.start();
        }
    }
}

class Buffer {
    private final int maxSize;
    private final ArrayList<Integer> list;

    Buffer(int maxSize) {
        this.maxSize = maxSize;
        this.list = new ArrayList<Integer>();
    }

    synchronized void deposit() {
        while (list.size() == maxSize) { 
            System.out.println(Thread.currentThread().getName() + " is waiting for free space.");
            try {
                wait();
                // System.out.println(thread.getState()); // WAITING
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        int item = new Random().nextInt(100); 
        try {
            Thread.sleep(100); 
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        list.add(item);
        System.out.println(Thread.currentThread().getName() + " buffer: " + list);
        notifyAll();
    }

    synchronized void extract() {
        while (list.isEmpty()) {
            System.out.println(Thread.currentThread().getName() + " is waiting for an item.");
            try {
                wait();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        list.removeFirst(); 
        try {
            Thread.sleep(100); 
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName() + " buffer: " + list);
        notifyAll();
    }
}

class Producer extends Thread {
    private final Buffer buffer;

    Producer(String name, Buffer buffer) {
        super(name);
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.deposit();
    }
}

class Consumer extends Thread {
    private final Buffer buffer;

    Consumer(String name, Buffer buffer) {
        super(name);
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.extract();
    }
}

Lock+Condition interfészek

A korábbi megvalósítás hátránya (wait(), notifyAll()), hogy csak a Java osztály implicit zárjával és feltételváltozójával dolgozhatunk.

Komplexebb problémák esetén ez erős megkötés lehet, és a teljesítményre is kihathat (egy folyamat felélesztése kisebb overhead-del jár, mint az összes folyamat felélesztése). Ezt orvosolja a Lock és a Condition interfész, melyek segítségével tetszőleges mennyiségű feltételváltozó (várakozási sor) alkalmazható. Vegyük egy szélsőséges(nek tűnő) példát: ha a buffer tele van, akkor miért értesítsünk egy (vagy több) termelőt, hiszen úgy sem fognak tudni elemet belerakni. Ugyanígy, ha a buffer üres, akkor miért élesszünk fel (egy vagy több) fogyasztót, nem fognak tudni elemet kivenni.

A ReentrantLock implementálja a Lock interfészt, amihez aztán tetszőleges számú feltételváltozót fogunk tudni hozzárendelni (Condition). Ezzel tehát egy újabb módszert kapunk a kölcsönös kizárás megvalósításához. A zárhoz tartozó műveletek ebben az esetben a await() és a signal(),signalAll() lesz. A zárolást a lock() és az unlock() metódusokkal tudjuk elvégezni. Új feltételváltozó létrehozásához a lock.newCondition() metódus szükséges.

Fontos megjegyezni, hogy a különböző megvalósításokat (monitor koncepció és a Lock interfész) nem javasolt egyszerre használni, mivel nehezen áttekinthető kódot eredményez és könnyebben kerülhet holtpontba a program.

Gyakorló feladatok

(1) Hozz létre egy Resource nevű osztályt. Az osztály konstruktorának egyetlen egész szám típusú paramétere van, amely a szálak számát jelöli. A konstruktor feladata, hogy inicializálja és feltöltse az array elnevezésű adattagot, amely egy egész számokat tartalmazó tömb. A tömb mérete a megadott szálak számának százszorosával egyenlő (szálak száma × 100), a szálak számát paraméterként kapja meg a konstruktor.

Ezután hozz létre egy Calculator osztályt, amely a Thread osztályból származik és implementálja a run() metódust. A Calculator osztály konstruktora három paramétert fogad:

  • egy Resource típusú objektum referenciáját,

  • egy egész számot, amely egy intervallum kezdőértékét jelöli,

  • egy másik egész számot, amely az intervallum végértékét adja meg.

A run() metódus feladata, hogy végigiteráljon a megadott intervallumon belül található számokon az array tömbben, és az összeget eltárolja a Calculator osztály sum elnevezésű adattagjában.

Teszteld a programot a main metódusban. A metódus szeletelje fel a szálak számával megegyező intervallumra a tömböt és minden intervallumhoz példányosítson egy új Calculator szálat az összegzés elvégzésére. A szálak elindítása után a join() metódus segítségével biztosítani kell, hogy a főprogram csak akkor folytatódjon, ha minden szál befejezte a számításait. Ezt követően a main szál iteráljon végig a Calculator osztály sum adattagjain és írja ki az összegzett eredményt.

(2) Írj egy párhuzamosan futó programot a Pi értékének geometriai közelítésére.

A Pi közelítését a geometriai valószínűség felhasználásval végezzük el. Ha egy (egység)négyzetbe egy minden oldalát érintő kört rajzolunk, majd az így kapott alakzat pontjait véletlenül választjuk ki elég sokszor, akkor a körre eső pontok száma úgy aránylik a négyzetre eső pontok számához, mint a kör területe a négyzet területéhez. Ebből a tört egyszerűsítésével megkaphatjuk a Pi közelített értékét.

Tehát nincs más feladatunk, mint folyamatosan véletlenül választani a négyzet területéről egy pontot (párhuzamos esetben szálak segítségével úgy, hogy egy szál több pontot is generál), a generált pontok számát nyilvántartani (legyen N), majd megnézni, hogy a kör területére esett-e, és ha igen akkor azt külön is nyilvántartjuk (K). A programmal folyamatosan kiírathatjuk a 4K/N értéket, hogy lássuk hogyan közelíti a Pi értékét.

A program futása addig tartson, amíg egy adott epszilon hibahatáron belülre kerül a Pi közelített értéke (ne használd a join metódust). A K és az N értékének módosítása legyen a kritikus szakasz része. Valószítsd meg a műveleteket a Monitor és a Lock+Condition segítségével is.

(3) Implementálj egy osztályt egy parkoló reprezentálására, ParkingLot néven, amely 10 kocsit képes befogadni. Készíts el a következő metódusokat:

  • enter(): kizárólagos módon beenged egy kocsit a parkolóba, amennyiben van szabad hely, egyéb esetben a kocsinak várakoznia kell.

  • leave(): kizárólagos módon kiléptet egy kocsit a parkolóból, és értesíti az esetlegesen várakozókat a parkolóhely felszabadulásáról.

Csak és kizárólag a szükséges kódrészlet legyen része a kritikus szakasznak. Valószítsd meg a metódusokat a Monitor és a Lock+Condition segítségével is.

Készíts egy Car osztályt, amely a Thread osztályból származik. Az osztály a konstruktorában egy ParkingLot objektumot kap. A szál feladata az enter() és a leave() metódusok meghívása ebben a sorrendben.

Teszteld a programot a main metódusban! Hozz létre egy ParkingLot-ot, majd indíts 100 párhuzamosan futó Car szálat. Ne használd a megvalósításhoz a join és a sleep metódusokat.


Utolsó frissítés: 2025-03-09 14:35:35