Kihagyás

Java - II.

A gyakorlat anyaga

Ebben az anyagben az olvasók-írók, majd az étkező filozófusok problémáján keresztül nézünk példákat a kölcsönös kizárásra. Továbbá megismerünk néhány magasabb szintű Java párhuzamos konstrukciót, amelyet a concurrent package biztosít.

Olvasók-írók probléma

A probléma definíciója szerint adott egy könyvtár, ahol egyidejűleg több olvasó is tölheti az idejét olvasással, viszont egyidőben csak egy író írhatja a könyvet (férhez hozzá az erőforráshoz). Ekkor olvasó sem olvashat. Továbbá az egyidőben olvasók száma korlátozva van. 4 esetre kell odafigyelni: mikor egy olvasó belép, mikor egy olvasó kilép, illetve mikor egy író belép majd kilép. Az egyes belépés/kilépés párok (amelyek lényegében a kritikus szekciók) között a párhuzamos futtatás megengedett (hiszen az olvasók egy limitált száma párhuzamosan hozzáférhet az erőforráshoz).

A megvalósításhoz továbbra is a Lock és a Condition interfészeket fogjuk használni. Érdekesség, hogy az olvasók-írók problémára egy speciális Lock implementációt is publikáltak: ReadWriteLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package _06;

import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReaderWriter {

    public static final int READERS_NUMBER = 10;
    public static final int WRITERS_NUMBER = 20;
    public static final int MAX_READERS_NUMBER = 2;

    public static void main(String[] args) {
        Book book = new Book(MAX_READERS_NUMBER);
        for (int i = 0; i<WRITERS_NUMBER; i++) {
            Thread thread = new Thread(new Writer(book));
            thread.start();
        }
        for (int i = 0; i<READERS_NUMBER; i++) {
            Thread thread = new Thread(new Reader(book));
            thread.start();
        }
    }
}

class Book {

    private int maxReaders;
    private int readersNumber;
    private boolean isWriting = false;
    private ReentrantLock lock = new ReentrantLock();
    private Condition writing = lock.newCondition();
    private Condition reading = lock.newCondition();

    public Book(int maxReaders) {
        this.maxReaders = maxReaders;
    }

    public void write() {}

    public void read() {}
}

class Writer implements Runnable {

    private Book book;

    public Writer(Book book) {
        super();
        this.book = book;
    }

    @Override
    public void run() {
        book.write();
    }

}

class Reader implements Runnable {

    private Book book;

    public Reader(Book book) {
        super();
        this.book = book;
    }

    @Override
    public void run() {
        book.read();
    }
}

Az étkező filozófusok probléma

A probléma definíciója szerint adott n db filozófus, akik gondolkodnak, majd eközben megéheznek. Adott egy asztal n db tányérral, illetve minden tányér között van egy-egy villa. Ahhoz hogy egy filozófus enni tudjon, fel kell vennie a tányérja jobb és bal oldalán található 1-1 villát. Azaz a verseny a villákért folyik, a villák reprezentálják az erőforrást. Ha egy filozófus befejezte az evést, akkor visszarakja a villákat az asztalra, amit így a szomszédos tányéroknál elhelyezkedő filozófusok használhatnak.

Ennek megvalósításához egy újabb (de már korábbról ismert) konstrukciót használunk, ez lesz a Semaphore osztály. amelynek az acquire() és a release() metódusai felelnek majd meg a wait() és signal() utasításoknak.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class EtkezoFilozofusok {

    public static final int PHILO_NUMBER = 5;

    public static void main(String[] args) {
        List<Semaphore> forks = new ArrayList<Semaphore> ();

        for (int i = 0; i<PHILO_NUMBER; i++) {
            forks.add(new Semaphore(1));
        }

        Philo philo1 = new Philo("Horatius", forks.get(0), forks.get(1));
        Philo philo2 = new Philo("Konfuciusz", forks.get(1), forks.get(2));
        Philo philo3 = new Philo("Démokritosz", forks.get(2), forks.get(3));
        Philo philo4 = new Philo("Freud", forks.get(3), forks.get(4));
        Philo philo5 = new Philo("Montesquieu", forks.get(4), forks.get(0));

        new Thread(philo1).start();
        new Thread(philo2).start();
        new Thread(philo3).start();
        new Thread(philo4).start();
        new Thread(philo5).start();
    }

}

class Philo implements Runnable {

    private Semaphore myFork;
    private Semaphore yourFork;

    private String name;

    public Philo(String name, Semaphore myFork, Semaphore yourFork) {
        this.myFork = myFork;
        this.yourFork = yourFork;
        this.name = name;
    }

    @Override
    public void run() {
        for (int i = 0; i<10; i++) {
            eat();
        }
    }

    private void eat() {}
}

A holtpont elkerülésének érdekében 2 megközelítéssel dolgozhatunk:

  • Kijelölünk egy filozófust, aki fordított sorrendben (pl. először a bal, majd a jobb oldalról) feszi fel a villákat, mindenki más a jobb, majd bal sorrendet követi.

  • Egy új, nem-bináris szemafort hozunk létre, amely guard-ként működik. Értékét N-1-re állítjuk, ahol N a filozófusok száma. Minden filozófus először a guard szemafortól kér engedélyt, hogy odalépjen az asztalhoz.

Összefoglaló táblázat

Megvalósítás WAIT SIGNAL
Implicit monitor wait() notify() / notifyAll()
Lock + Condition await() signal() / signalAll()
Semaphore acquire() release()

Atomic

Az atomic package több típusra is biztosít nekünk szálbiztos hozzáférést, többek között: AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference. Cseréljük le a Counter osztályunkban lévő adattagot egy AtomicInteger objektumra, a dokumentációban pedig keressük ki a inkrementáláshoz használható függvényeket.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.atomic.AtomicInteger;

public class Counter {

    int count;

    public void inc() {
        count++;
        System.out.println(count);
    }

    public static void main(String[] args) {

        Counter c = new Counter();

        for(int i=0;i<10000;i++) {
            Thread myThread = new MyThread(c);
            myThread.start();
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Final result: " +c.count);
    }
}

class MyThread extends Thread {

    Counter c;

    MyThread(Counter c){
        this.c=c;
    }

    @Override
    public void run() {

        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        c.inc();
    }
}

Barrier

A CyclicBarrier osztály lehetőséget biztosít, hogy több szál bevárja egymást a futásának egy bizonyos pontján. A ciklikusság arra a tulajdonságára hivatkozik, hogy a várakoztatott szálak felszabadítását követően újra használhatóvá válik a Barrier (lásd reset() metódus a dokuementációban).

Ennek segítségével szinkronizálhatjuk a szálakat egy ún. barrier pontban. A funkcionalitás különlegessége, hogy a konstruktora kiegészíthető egy Runnable() interfésszel (lásd dokumentáció a konstruktorokról), ami pontosan akkor és pontosan egyszer fut le, amikor az utolsó szál is megérkezett az ún. barrier pontba, de még mielőtt bármelyik is folytatná a futását.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Barrier {

    public static void main(String[] args) {

        CyclicBarrier cb = new CyclicBarrier(10);

        for(int i=0;i<10;i++) {
            new MyThread(cb).start();
        }

    }
}

class MyThread extends Thread {

    CyclicBarrier cb;

    MyThread(CyclicBarrier cb){
        this.cb=cb;
    }

    @Override
    public void run() {
        System.out.println("Valami műveletet végzünk..");

        try {
            cb.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.println("A művelet után..");
    }
}

ExecutorService

Az ExecutorService segítségével feladatokat tudunk végrehajtani. Ehhez az Executors osztályt fogjuk használni, amellyel különböző stratégiákkal működő szálakat tudunk létrehozni, amelyek feldolgozzák a feladatokat. Ilyen stratégiákat, amelyeket ún. thread pool-ok valósítanak meg és újrafelhasználható szálakat biztosít, a newFixedThreadPool, newCachedThreadPool, newSingleThreadExecutor segítségével tudunk létrehozni.

Egy feladatot egy Runnable vagy a Callable osztály példányai reprezentálnak. Utóbbinak az előnye, hogy a Callable rendelkezik visszatérési értékkel, amely különösen jó lehet akkor, ha valamilyen részfeladat eredményével szeretnék visszatérni. Ehhez a [Future][future] interfészt fogjuk használni, amely aszinkron módon képes kezelni a feladatokat és visszatérni az eredményükkel.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package _10;

import java.util.Random;
import java.util.*;
import java.util.concurrent.*;

class MyRunnable implements Runnable{

    @Override
    public void run() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName());   
    }
}

class MyCallable implements Callable<Integer>{

    @Override
    public Integer call() {
        try {
            Thread.sleep(new Random().nextInt(10)*100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());   
        return new Random().nextInt(10)*100;
    }

}

public class Executor {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(5);

        for(int i = 0; i<15; i++) {
            executor.execute(new MyRunnable());
        }

        System.out.println(executor.isShutdown() + " " + executor.isTerminated());
        executor.shutdown();
        System.out.println(executor.isShutdown() + " " + executor.isTerminated());
    }   
}

Gyakorló feladat

I) Adott egy speciális nyomtató (erőforrás), amely egyidőben képes kiszolgálni két különböző típusú kérést: nyomtatni és fénymásolni. Továbbá, a nyomtató speciális felépítéséből adódóan egyszerre két dokumentumot is képes nyomtatni.

A nyomtatások minden esetben 400 ms-ig, a fénymásolás pedig 550 ms-ig tart (altatás). Összesen 20-20 nyomtatási és fénymásolási kérés érkezett be (szálak).

Amennyiben éppen két nyomtatás zajlik, a következő nyomtatási szálnak várakoznia kell. Hasonlóan, amennyiben fénymásolás zajlik, a következő fénymásolási szálnak várakoznia kell. Figyelj a kölcsönös kizárásra és a párhuzamosan futtatható részekre, csak és kizárólag a szükséges kódrészlet legyen része a kritikus szakasznak. Amennyiben van várakozó nyomtatási/fénymásolási kérés, és a megfelelő erőforrás szabad, ne várakoztasd feleslegesen a folyamatot.

A kölcsönös kizárás megvalósításához a Semaphore osztályt használd. A megoldásodat a join metódus és CyclicBarrier használata nélkül készítsd el, és csak a feladat által kért esetben használj altatást.

II) Valósítsd meg az előző összefoglaló végén részletezett Pi közelítését ExecutorService, Callable és Future segítségével. A Callable egy Point objektummal térjen vissza. Az Executor akkor kerüljön leállításra, amíg egy adott epszilon hibahatáron belülre kerül a Pi közelített értéke.

Kapcsolódó linkek


Utolsó frissítés: 2024-03-18 09:18:44