Kihagyás

Java - II.

Ebben az anyagrészben az olvasók-írók, majd az étkező filozófusok problémáján keresztül nézünk további példákat párhuzamos konstrukciókra, amelyekkel megvalósítható a kölcsönös kizárásra. Továbbá megismerünk néhány magasabb szintű Java párhuzamos konstrukciót, amelyet a concurrent package biztosít.

Olvasók-írók probléma

Az olvasók-írók probléma egy klasszikus probléma, amely az erőforrás-megosztás optimalizálására fókuszál. Egy közös erőforráshoz, például egy adatbázishoz vagy fájlhoz, több szál férhet hozzá, de a hozzáférés típusa eltérő szabályokat igényel.

Több olvasó szál egyidejűleg férhet hozzá az erőforráshoz, mivel az olvasás nem módosítja az adatokat. Bizonyos megközelítések limitáljak az egyidőben történő olvasások számát is. Egy író szál kizárólagos hozzáférést igényel, azaz sem más író, sem olvasó nem férhet hozzá, amíg az írás zajlik. A kihívás az, hogy biztosítsuk az erőforrás konzisztenciáját (az inkonzisztens állapot adatvesztést és hibás működést eredményezhet), miközben minimalizáljuk a szálak várakozási idejét.

A probléma megoldásakor különböző stratégiák közül választhatunk, aszerint, hogy előnyben részesítjük-e valamelyik folyamatcsoportot a másikkal szemben.

  • Reader-preference: Az olvasó-preferencia azt jelenti, hogy ha olvasó van kritikus szakaszban, akkor az újonnan érkező olvasók bemehetnek mellé, még akkor is, ha van várakozó író. Előnye, hogy így maximalizálhatjuk a párhuzamosságot.

  • Writer-preference: Az író-preferencia azt jelenti, hogy az olvasók nem mehetnek be a kritikus szakaszba, ha van várakozó író, valamint írót választunk következő folyamatnak, amikor csak lehet. Emellett az írási műveletek frissítik az adatokat, így valószínűleg az alkalmazás szempontjából jobb, ha ez minél előbb megtörténik, és nem elavult adatokkal dolgozik.

Mindkét módszer hátránya, hogy éhezéshez vezethet, az előnyben részesített folyamatok bármeddig késleltethetik a másik csoport tagjait.

A ReentrantLock és a Condition osztályok segítségével hatékonyan kezelhető az olvasók és írók hozzáférése, ehhez külön feltételváltozókat érdemes létehozni az olvasók és írók számára, illetve 4 kizárólagos művelet segítségével vezéreljük a programot: olvasók belépési és kilépési műveleteivel és az írók belépési és kilépési műveletivel. A belépési műveletek fogják a várakoztatási feltételeket ellenőrizni (pl. egy író csak akkor léphessen be, ha nincs aktív olvasó vagy másik író).

A megvalósításhoz továbbra is a Lock és a Condition interfészeket fogjuk használni. Érdekesség, hogy az olvasók-írók problémára egy speciális Lock implementációt is publikáltak: ReadWriteLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package _06;

import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReaderWriter {

    public static final int READERS_NUMBER = 10;
    public static final int WRITERS_NUMBER = 20;
    public static final int MAX_READERS_NUMBER = 2;

    public static void main(String[] args) {
        Book book = new Book(MAX_READERS_NUMBER);
        for (int i = 0; i<WRITERS_NUMBER; i++) {
            Thread thread = new Thread(new Writer(book));
            thread.start();
        }
        for (int i = 0; i<READERS_NUMBER; i++) {
            Thread thread = new Thread(new Reader(book));
            thread.start();
        }
    }
}

class Book {

    private int maxReaders;
    private int readersNumber;
    private boolean isWriting = false;
    private ReentrantLock lock = new ReentrantLock();
    private Condition writing = lock.newCondition();
    private Condition reading = lock.newCondition();

    public Book(int maxReaders) {
        this.maxReaders = maxReaders;
    }

    public void write() {}

    public void read() {}
}

class Writer implements Runnable {

    private Book book;

    public Writer(Book book) {
        super();
        this.book = book;
    }

    @Override
    public void run() {
        book.write();
    }

}

class Reader implements Runnable {

    private Book book;

    public Reader(Book book) {
        super();
        this.book = book;
    }

    @Override
    public void run() {
        book.read();
    }
}

Az étkező filozófusok probléma

Az étkező filozófusok problémája az erőforrások megosztásának és kezelésének kihívásait szemlélteti a bináris szemaforok használatán keresztül. A probléma szerint több filozófus ül egy kör alakú asztalnál, és evéshez készülnek, amihez villákra van szükségük. Minden filozófus előtt egy tányér található, és a tányérok között egy-egy villa helyezkedik el. Ahhoz hogy egy filozófus enni tudjon, fel kell vennie a tányérja bal és jobb oldalán található 1-1 villát. Látható, hogy a verseny a villákért folyik, a villák reprezentálják az erőforrást. Ha egy filozófus befejezte az evést, akkor visszarakja a villákat az asztalra, amit így a szomszédos tányéroknál elhelyezkedő filozófusok használhatnak.

  • Minden filozófusnak szüksége van a jobb és bal oldali villájára az evéshez.

  • Egy villa egyszerre csak egy filozófus birtokában lehet (a villákat reprezentálhatjuk bináris szemaforként, így egyszerre csak egy filozófus használhatja azt).

  • Ha minden filozófus egyszerre próbálja megszerezni az azonos oldali villáját, akkor egyikük sem tudja befejezni az evést, és a futás holtpontba kerül.

Ennek megvalósításához egy újabb (de már korábbról ismert) konstrukciót használunk, ez lesz a Semaphore osztály. amelynek az acquire() és a release() metódusai felelnek majd meg a wait() és signal() utasításoknak.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class EtkezoFilozofusok {

    public static final int PHILO_NUMBER = 5;

    public static void main(String[] args) {
        List<Semaphore> forks = new ArrayList<Semaphore> ();

        for (int i = 0; i<PHILO_NUMBER; i++) {
            forks.add(new Semaphore(1));
        }

        Philo philo1 = new Philo("Horace", forks.get(0), forks.get(1));
        Philo philo2 = new Philo("Confucius", forks.get(1), forks.get(2));
        Philo philo3 = new Philo("Democritus", forks.get(2), forks.get(3));
        Philo philo4 = new Philo("Freud", forks.get(3), forks.get(4));
        Philo philo5 = new Philo("Montesquieu", forks.get(4), forks.get(0));

        new Thread(philo1).start();
        new Thread(philo2).start();
        new Thread(philo3).start();
        new Thread(philo4).start();
        new Thread(philo5).start();
    }
}

class Philo implements Runnable {

    private Semaphore myFork;
    private Semaphore yourFork;

    private String name;

    public Philo(String name, Semaphore myFork, Semaphore yourFork) {
        this.myFork = myFork;
        this.yourFork = yourFork;
        this.name = name;
    }

    @Override
    public void run() {
        for (int i = 0; i<10; i++) {
            eat();
        }
    }

    private void eat() {}
}

A holtpont elkerülésének érdekében 2 megközelítéssel dolgozhatunk:

  • Kijelölünk egy filozófust, aki fordított sorrendben (pl. először a bal, majd a jobb oldalról) feszi fel a villákat, mindenki más a jobb, majd bal sorrendet követi.

  • Egy új, nem-bináris szemafort hozunk létre, amely guard-ként működik. Értékét N-1-re állítjuk, ahol N a filozófusok száma. Minden filozófus először a guard szemafortól kér engedélyt, hogy odalépjen az asztalhoz.

CyclicBarrier

Egy szinkronizációs mechanizmust biztosít a CyclicBarrier, amely kifejezetten a szálak koordinációjára szolgál, úgy, hogy tetszőleges mennyiségű szál várja be egymást egy közös szinkronizációs pontban. Ez a szinkronizációs pont lehetővé teszi, hogy a szálak csak akkor folytathassák a munkát, ha minden szál elérte a megadott pontot. A CyclicBarrier neve arra utal, hogy a szinkronizáció ciklikusan ismételhető, azaz többször is felhasználható a szálak közötti szinkronizáció megvalósításához.

A CyclicBarrier hatékonyabb és könnyebben alkalmazható a szálak szinkronizációjára, mint a Semaphore, mivel erre a feladatra specializálódott. Ezen felül a CyclicBarrier lehetővé teszi, hogy egy opcionális műveletet is végrehajtsunk, amikor minden szál elérte a közös pontot, például egy összegzést vagy ellenőrzést, ami még rugalmasabbá teszi a használatát.

A barrier létrehozásakor meghatározzuk, hogy hány szálat kell bevárni a szinkronizációs pontban. Amikor egy szál eléri ezt a pontot, meghívja az await() metódust. Ez a metódus csökkenti a barrier számlálóját, és addig blokkolja a szálat. Amint az utolsó szál is eléri az ún. barrier pontot, azaz a barrier számlálója nullára csökken, a szálak folytathatják a végrehajtást.

A barrier objektum példányosításakor megadható egy Runnable is opcionálisan. Fontos, hogy bár a művelet ekkor fut le, amikor az utolsó szál is meghívta az await()-et. A szálak csak akkor folytathatja a végrehajtást ha a Runnable művelet befejeződött. Ez biztosítja, hogy a közös művelet (például összegzés vagy ellenőrzés) befejeződjön, mielőtt bármelyik szál folytatná a saját végrehajtását.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int numberOfThreads = 5;

        CyclicBarrier barrier = new CyclicBarrier(5, new BarrierTask());

        for (int i = 0; i < numberOfThreads; i++) {
            Thread thread = new Thread(new Task(barrier));
            thread.start();
        }
    }
}

class BarrierTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("A barrier feladata csak egyszer futott le.");
    }
}

class Task extends Thread {

    private final CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println(this.getName() + " dolgozik...");
            Thread.sleep(new Random().nextInt(1000)); 

            System.out.println(this.getName() + " a szinkronizációs pontban..(" +  (barrier.getNumberWaiting()+1) +")");
            barrier.await();

            System.out.println(this.getName() + " folytatja a munkát.");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

Atomic változók

Az atomic változók célja, hogy szálbiztos műveleteket biztosítsanak egyszerű adattípusokon (például számlálókon), anélkül hogy kölcsönös kizárást (például synchronized blokkokat) kellene alkalmazni. Az atomic változók belső mechanizmusukkal garantálják, hogy az adott műveletek atomi módon végrehajtásra kerül. Mivel nem használnak hagyományos zárolási mechanizmusokat, hatékonyabb alternatívát kínálnak, és segítenek elkerülni a zárólás teljesítménybeli "költségét", miközben biztosítják, hogy a műveletek szálbiztos módon történjenek.

Számos atomic osztály elérhető különböző adattípusok kezelésére. Ezek közül néhány (a többiről itt lehet olvasni):

  • AtomicInteger: Szálbiztos egész számok kezelésére.

  • AtomicLong: Szálbiztos hosszú egész számok kezelésére.

  • AtomicBoolean: Szálbiztos logikai értékek kezelésére.

  • AtomicReference: Szálbiztos objektum referenciák kezelésére.

Az atomic osztályok olyan metódusokat biztosítanak, amelyek garantálják a szálbiztos műveletek végrehajtását:

  • get(): Az aktuális érték lekérdezése.

  • set(newValue): Az aktuális érték beállítása.

  • incrementAndGet(): Az érték növelése 1-gyel, majd az új érték visszaadása (csak AtomicInteger és AtomicLong esetén).

  • decrementAndGet(): Az érték csökkentése 1-gyel, majd az új érték visszaadása (csak AtomicInteger és AtomicLong esetén).

  • addAndGet(delta): Egy adott érték hozzáadása, majd az új érték visszaadása (csak AtomicInteger és AtomicLong esetén).

  • compareAndSet(expected, update): Feltételes frissítés, amely az update értékét csak akkor állítja be, ha az aktuális érték megegyezik az expected értékkel.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicCounter {

    int number1;
    AtomicInteger number2 = new AtomicInteger();

    public void incNumberOne() {
        number1++;
    }

    public void incNumberTwo() {
        number2.incrementAndGet();
    }

    public static void main(String[] args) {
        AtomicCounter ac = new AtomicCounter();

        for (int i = 0; i < 10_000; i++) {
            Thread myThread = new AtomicThread(ac);
            myThread.start();
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Final result (number1): " + ac.number1);
        System.out.println("Final result (number2): " + ac.number2.get());
    }
}

class AtomicThread extends Thread {
    AtomicCounter ac;

    AtomicThread(AtomicCounter ac) {
        this.ac = ac;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        ac.incNumberOne();
        ac.incNumberTwo();
    }
}

ExecutorService

A Runnable mellett egy másik interfészt is használhatunk arra, hogy egy adott kódrészletet külön szálon futtassunk: ez lesz a Callable. Ez az interfész rugalmasabb megoldást kínál a korábban megismert megközelítésekhez képest, mivel a call() metódusa nemcsak visszatérési értékkel rendelkezhet, hanem kivételt is dobhat, amit a hívó metódusnak kell lekezelnie vagy továbbítania. Az előbbi tulajdonság különösen előnyös abban az esetben, ha a párhuzamosan végrehajtott feladat egy számítás eredményét adja vissza, amelyet később felhasználhatunk.

A Callable interfész nem használható közvetlenül egy Thread-del (hiszen az egy Runnable interfészt vár paraméterként), de demonstrációs céllal, külön szál létrehozása nélkül is tudjuk futtatni (mivel lényegében csak egy végrehajtandó kódrészletet definiálunk).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CallableExample {
    public static void main(String[] args) {
        Callable<Integer> callableTask = new ExampleCallable();
        try {
            System.out.println("Return value: " + callableTask.call() + " executed by: " + Thread.currentThread());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

class ExampleCallable implements Callable<Integer> {

    @Override
    public Integer call()  { // a run() nem dobhat kivételt
        System.out.println(Thread.currentThread().getName() + " is counting..");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        int result = (int) (Math.random() * 100);
        System.out.println(Thread.currentThread().getName() + "'s result: " + result);
        return result;
    }
}

Az ExecutorService egy magasabb szintű API Java-ban, amely megkönnyíti a szálkezelést. Arra használható, hogy hatékonyan kezelje és újrahasznosítsa a szálakat, elkerülve az egyes szálak kézi létrehozásának és kezelésének nehézségeit.

Thread végállapot

Ahogy azt a korábbi órák egyikén megtanultuk, egy TERMINATED állapotban lévő szálat nem tudunk ismét elindíti, helyette új szálat kellene példányosítanunk. Egy Thread létrehozása erőforrásigényes, különösen ha sok szálra van szükség a végrehajtás során.

Az ExecutorService megoldja ezt a problémát egy ún. thread pool segítségével, amelyben előre létrehozott és újrahasznosítható szálak dolgoznak. Ezek a szálak fogják megkapni a Callable feladatokat az ExecutorService által meghatározott ütemezés alapján. Ha több feladatot adunk át futtatásra az ExecutorService-nek, mint amennyi szál a thread pool-ban elérhető, akkor a feladatokat egy belső várakozási sorban tárolja. Ha egy szál végzett egy feladattal, akkor nem kerül leállításra, hanem ismét munkára fogható. Ez egy hatékonyabb és egyszerűbb megközelítés, mint manuálisan létrehozni és kezelni az egyes szálakat.

Az Executors osztály segítségével különböző típusú thread poolokat hozhatunk létre:

  • Executors.newFixedThreadPool(int threads): Fix számú szálat tartalmazó pool kerül létrehozásra.

  • Executors.newCachedThreadPool(): Dinamikusan, a feladatok számától függően változó szálkészlet kerül létrehozásra.

  • Executors.newSingleThreadExecutor(): Egyetlen szállal dolgozó pool kerül létrehozásra.

Ismétlődő feladatok

A ScheduledThreadPoolExecutor egy olyan ExecutorService implementáció, amely időzített vagy ismétlődő feladatokat tud végrehajtani. Ez a megoldás különösen hasznos, ha például időszakos karbantartási feladatokat kell végezni, vagy a feladatokat adott időpontban kell elindítani.

ExecutorService legfontosabb metódusai:

  • shutdown(): A szálak szabályos leállítására szolgál, de a már beküldött (várakozó) és a futó feladatok végrehajtódnak.

  • submit(Callable<T> task): Elindít egy feladatot és azonnal visszatér, lehetővé téve, hogy a hívó szál folytassa a végrehajtást, miközben a háttérfeladat párhuzamosan fut (azaz nem blokkolja a hívó szálat!)

  • invokeAll(Collection<Callable<T>> tasks): A kollekcióban átadott feladatokat futtatja az elérhető szálakon párhuzamosan, és blokkolja a hívó szálat, amíg mindegyik feladat be nem fejeződik.

ExecutorService + Runnable

Az ExecutorService képes Runnable feladatok fogadására és végrehajtására is, és szintén automatikusan kezeli a szálak életciklusát, így nem kell manuálisan létrehozni a szálakat.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceExample {
    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();

        for(int i=0; i<10; i++) {
            executor.submit(new MyCallable());
            try {
                Thread.sleep(25);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        executor.shutdown();
    }
}

class MyCallable implements Callable<Integer>{

    @Override
    public Integer call() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName() + "-" + this);
        return 0;
    }
}

Future

A Future interfész lehetővé teszi, hogy kezeljük az aszinkron feladatok eredményeit. Az aszinkron programozásban a feladatokat háttérszálakon indítjuk el, így a hívó szál nem várja meg azok befejeződését, és folytathatja a saját munkáját. A Future segítségével később lekérhetjük a feladat eredményét, de a lekérés maga már blokkoló hívás lesz, ha a feladat még nem fejeződött be.

Párhuzamos vs. aszinkron programozás

A párhuzamos programozás azt jelenti, hogy több feladat egyszerre fut különböző szálakon. Bár a feladatok párhuzamosan futnak, ez nem feltétlenül jelent aszinkron működést, mert a hívó szál is blokkolódhat, amíg a párhuzamos feladatok befejeződnek.

Aszinkron programozás esetén a háttérben futó aszinkron feladatok nem blokkolják a hívó szálat, tehát miközben a háttérfeladatok futnak, a hívó szál tovább végezheti a saját dolgát. Az eredmény később, szükség esetén, blokkoló módon kérhető le a feladatok befejeződése után.

A korábban ismertetett submit(..) és invokeAll(..) metódusok az ExecutorService implementációk részeként Future objektumokat adnak vissza, amelyek a futó feladatok eredményének kezelésére szolgálnak. A Future-höz tartozó legfontosabb metódusok:

  • get(): Blokkolja a hívó szálat várva a feladat befejezésére, majd visszatér az eredménnyel.

  • isDone(): Ellenőrzi, hogy a feladat befejeződött-e.

  • cancel(): Megszakítja a futó feladatot.

  • isCancelled(): Ellenőrzi, hogy a feladatot megszakították-e.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        List<RandomCallable> callableList = new ArrayList<RandomCallable>();
        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();

        for(int i=0; i<10; i++) {
            callableList.add(new RandomCallable());
        }

        try {
            futures = executor.invokeAll(callableList);
            int sum = 0;
            for(Future<Integer> future : futures) {
                sum += future.get();
            }
            System.out.println("Result: " + sum);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

class RandomCallable implements Callable<Integer>{
    @Override
    public Integer call() throws InterruptedException {
        Thread.sleep(500);
        int res = new Random().nextInt(100);
        System.out.println(this + " is counting.. " + res);
        return res;
    }
}

Virtuális szálak

A Project Loom célja, hogy a Java-ban elérhetővé tegye az ún. virtuális szálak használatát, amelyek kevesebb erőforrást igényelnek, mint a hagyományos szálak. Ennek köszönhetően lehetőség nyílik nagyszámú virtuális szál hatékony kezelésére. A virtuális szálak kevesebb memóriahasználattal járnak, mivel nem közvetlenül az operációs rendszer által kezelt natív szálakhoz kapcsolódnak. Ezeknek a szálak ütemezése a JVM-en belül történik, így nagyságrendekkel több szál indítható el.

A virtuális szálak kiválóan alkalmasak nagy számú egyidejű kapcsolat kezelésére, például HTTP szerverek, adatbázis-kezelő rendszerek, mikroszolgáltatások és aszinkron eseménykezelő rendszerek esetén. Fő előnyeik a kisebb memóriahasználat, jobb skálázhatóság és egyszerűbb programozási modell, különösen I/O-intenzív feladatoknál. Hátrányuk, hogy a blokkoló műveletek csökkenthetik a hatékonyságukat, és CPU-intenzív feladatok esetén nem feltétlenül jobbak a hagyományos szálaknál. Emellett egyes régebbi Java könyvtárak nem támogatják őket, ami kompatibilitási problémákat okozhat.

Amikor a Java futtatókörnyezet ütemez egy virtuális szálat, akkor azt hozzárendeli egy ún. platform thread-hez (hordozó szálhoz). A hordozó szál már az operációs rendszer által ütemezett hagyományos szál. Virtuális szálakat létre tudunk hozni ExecutorService segítségével: Executors.newVirtualThreadPerTaskExecutor(). Ez minden egyes feladathoz új virtuális szálat hoz létre.

Blokkoló műveletek

Amikor a Java futtatókörnyezet ütemez egy virtuális szálat, az hozzárendelésre kerül egy platformszálhoz, amelyet az operációs rendszer a szokásos módon ütemez. Miután a virtuális szál végrehajtott valamennyi kódot, lecsatlakozhat a hordozó szálról (tipikusan amikor blokkoló műveletet hajt végre). A hordozó szál így felszabadul, és más virtuális szálakat is kiszolgálhat.

Blokkoló művelet

Olyan művelet, amely megakadályozza a szál további végrehajtását, amíg egy külső feltétel teljesül. Ilyenek például az I/O műveletek (pl. fájlolvasás vagy hálózati kommunikáció) és a zárolási mechanizmusok közbeni várakoztatás.

Például ha egy program fájlt próbál beolvasni, akkor várnia kell, amíg az operációs rendszer betölti a fájlt a háttértárból a memóriába. Ez idő alatt a program nem folytathatja a futást, várakoznia kell, ami viszont megakadályozza a további műveletek végrehajtását.

Egy virtuális szál nem tud lecsatlakozni blokkoló műveletek közben, ha rögzítve van (pinned) a hordozó szálhoz. Esetünkben a rögzítés akkor fordul elő, amikor a virtuális szál végrehajtása egy synchronized blokkban vagy metódusban van éppen és várakozásra kényszerül. Így ha a rögzített virtuális szálhoz tartozó platformszál nem tud más virtuális szálakat kiszolgálni, ami csökkenti a párhuzamosan futtatható szálak számát, és ezzel ronthatja a skálázódást.

JEP 491

A Java Enhancement Proposal 491 célja ezt a korlátozást orvosolni azáltal, hogy a monitor kezelését a platformszálak helyett a virtuális szálakhoz rendeli. Ez lehetővé teszi, hogy a virtuális szálak a synchronized blokkok vagy metódusok végrehajtása közben is szabadon lecsatlakozzanak a platformszálatakról. Ezt várhatóan a 2025 márciusában publikálásra kerülő JDK 24 fogja tartalmazni először.

A jelenlegi monitor koncepciójának korlátozása orvosolható Lock vagy Semaphore használatával, mivel ezek nem kötődnek közvetlenül a platformszálakhoz, így elkerülhető a virtuális szálak rögzítése.

Gyakorló feladatok

(1) Implementálj egy osztályt egy parkoló reprezentálására, ParkingLot néven, amely 10 kocsit képes befogadni. Készíts el az osztályhoz a következő metódusokat:

  • enter(): beenged egy kocsit a parkolóba, amennyiben van szabad hely, egyéb esetben a kocsinak várakoznia kell.

  • parking(): 300 ms-ig altatja a szál végrehajtását.

  • leave(): kiléptet egy kocsit a parkolóból, és értesíti az esetlegesen várakozókat a parkolóhely felszabadulásáról.

Egy számláló szemafor segítségével biztosítsd, hogy ne tartózkodjon egyidőben a parkolóban a megengedettnél több jármű.

(a) Készíts egy Car osztályt, amely a Thread osztályból származik. Az osztály a konstruktorában egy ParkingLot objektumot kap. A szál feladata az enter(), parking() és a leave() metódusok meghívása a megadott sorrendben.

Teszteld a programot a main metódusban! Hozz létre egy ParkingLot objektumot és 100 párhuzamosan futó Car szálat.

(b) Készíts egy Car osztályt, amely a Callable interfészt valósítja meg Void visszatérési értékkel. Az osztály a konstruktorában egy ParkingLot objektumot kap. A szál feladata az enter(), parking() és a leave() metódusok meghívása a megadott sorrendben.

Teszteld a programot a main metódusban! Hozz létre egy ParkingLot objektumot és 100 párhuzamosan futó Car feladatot, illetve egy fix méretű ExecutorService-t 5 szállal.

Ne használj a feladat által nem említett párhuzamos konstrukciót.

(2) Hozz létre egy Resource nevű osztályt, amelynek konstruktorában egy egész szám típusú paramétert fogad, amely a szálak számát jelöli. A konstruktor feladata, hogy inicializálja és feltöltse az array nevű adattagot, amely egy egész számokat tartalmazó tömb. A tömb mérete a szálak számának százszorosával lesz egyenlő (szálak száma × 100), a szálak számát paraméterként kapja meg a konstruktor.

Ezután hozz létre egy Calculator osztályt, amely a Thread osztályból származik és implementálja a run() metódust. A Calculator osztály konstruktora négy paramétert fogad:

  • Egy Resource típusú objektum referenciáját, amely az egész számokat tartalmazó tömböt tárolja,

  • Egy egész számot, amely az intervallum kezdőértékét jelöli,

  • Egy másik egész számot, amely az intervallum végértékét adja meg.

  • Végezetül egy CyclicBarrier objektumra mutató referenciát.

A run() metódus feladata, hogy végigiteráljon a megadott intervallumon belül található számokon az array tömbben, és az összeget eltárolja a Calculator osztály sum nevű adattagjában.

A fő programban ne a join() metódussal biztosítsuk a szálak bevárását, hanem egy CyclicBarrier objektum segítségével, amely példányosításakor egy Runnable objektumot is megkap. Ez a Runnable az összes létrehozott szálat tartalmazó kollekcióra mutató referenciát kap, és a feladata, hogy össszegezze a Calculator szálak részszámításait. Ne használj kölcsönös kizárást vagy altatást.

(3) Hozz létre egy AtomicReference<String> változót, kezdőértéke legyen "Idle".

Hozz létre 10 szálat, amelyek mindegyike végrehajtja a következő lépéseket:

  • Ha az érték "Idle", akkor módosítja azt "InProgress"-ra, egyébként várakozik az erőforrásra. Ezt tevékeny várakozással végzi, tehát egy ciklus segítségével folyamatosan ellenőrzi az adattag értékét.

  • "InProgress" állapotban altatás segítségével várakoztatjuk a szálat 100 ms-ig, majd visszaállítjuk az értéket "Idle"-ra.

A megoldáshoz kizárólag a compareAndSet és a set metódusokat használd.

A műveletek elvégzét követően termináljon a program.


Utolsó frissítés: 2025-03-09 14:35:35