Kihagyás

Go

Csővezetékes rendezés

A funkcionális dekompozíció tipikus példája az úgynevezett csővezeték. Ennél egy láncban helyezzük el a folyamatokat, melyek egy feldolgozás munkafázisait végzik el a rajtuk áthaladó adatokon. Egyidőben több adatelem van feldolgozás alatt, mindegyik másik munkafázisban. Egy adatelemnek a teljes feldolgozáshoz végig kell járnia sorrendben az összes munkafázist. Sebességnövekedést úgy érünk el, ha sok feldolgozandó adatelemünk van, ezek összesített feldolgozási ideje lényegesen lecsökkenhet csővezeték alkalmazásával. Ideális esetben n fázis esetén n-szeres gyorsulásra számíthatunk akkor, ha a fázisok időigénye megközelítőleg azonos, hiszen a leglassabb fázis határozza meg az adatok áramlási sebességét.

Az egyik legegyszerűbb alkalmazása a csővezetékes rendezés, a csővezeték hossza ebben az esetben megegyezik a rendezendő elemek számával. Minden fázis ugyanazt a munkafolyamatot hajtja végre, a rajta áthaladó sorozat elemeinek sorrendjét megváltoztatja úgy, hogy a nagyobb elemek a sorozat vége felé helyezkedjenek el.

pipeline

A send folyamat n db karaktert fog beolvasni és számmá konvertálja. Ez fogja a csővezeték első fázisára ráküldeni a rendezendő számokat. A receive utasítás pedig a csővezeték utolsó fázisáról fogja a standard output-ra küldeni a rendezett listát. A pipesort folyamat számot olvas be az input csatornáján és ugyanannyit küld tovább a kimenő csatornáján. A már látott számok közül a legnagyobbat eltárolja. Minden újabb beérkező számot összehasonlít az addigi legnagyobbal, a nagyobbikat eltárolja, a kisebbiket továbbküldi. Miután az összes számot látta, a lokálisan tárolt legnagyobbat is elküldi.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main

import (
    "fmt"
    "strconv"
    "os"
    "sync"
)


var wg sync.WaitGroup

func send(to chan<- int) {
    for i := 0; i < 5; i++ {
        var input string
        fmt.Scanln( & input)

        num, err := strconv.Atoi(input) // visszafele: strconv.Itoa(num)
        if err != nil {
            fmt.Println(err)
            os.Exit(0)
        }
        to<- num
    }
}

func receive(from <-chan int) {
    for i := 0; i < 5; i++ {
        fmt.Print(<-from, " ")
    }
    wg.Done()
}

func pipesort(from <-chan int, to chan<- int) {
    max := <-from
    for i := 0;i < 4;i++{
        temp := <-from
        if temp > max {
            to<- max
            max = temp
        } else {
            to<- temp
        }
    }
    to<- max
}

func main() {
    wg.Add(1)
    defer wg.Wait()

    var channels [5]chan int

    for i := range channels {
        channels[i] = make(chan int)
    }

    go send(channels[0])
    for i := 0; i < 4; i++ {
        go pipesort(channels[i], channels[i + 1])
    }
    go receive(channels[4])
}

Multiplexer

A select legtipikusabb alkalmazása az úgynevezett multiplexer folyamat megvalósítása. Ennek az a feladata, hogy több bemenő csatorna forgalmát egy kimenő csatornára irányítsa. Sokszor együtt alkalmazzák az úgynevezett demultiplexer folyamattal, ami egy bemenő csatornán érkező adatokat oszt szét több kimenő csatornán. A két folyamat együttes alkalmazásával elérhető, hogy egy fizikai csatornát több logikai csatornaként használjunk, ezt a technikát nevezik multiplexálásnak.

multi1

multi2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main

import (
    "fmt"
    "os"
    "sync"
    "bufio"
    "math/rand"
)


var wg sync.WaitGroup

func demultiplexer(stdin *bufio.Reader, out [5]chan byte) {
    reading := true

    for reading {
        value, _ := stdin.ReadByte()

        if 97 <= value && value <= 122 {
            index := rand.Intn(5)
            fmt.Println(value, "kuldese a(z)", index, "csatornan..")
            out[index]<- value
        } else if value == 48 {
            reading = false
            for _, c  := range out {
                close(c)
            }
        }
    }
}

func multiplexer(in [5]chan byte, out *bufio.Writer) {
    for {
        var msg byte 
        select { 
            case msg = <-in[0]:
            case msg = <-in[1]:
            case msg = <-in[2]:
            case msg = <-in[3]:
            case msg = <-in[4]:
        }
        if(msg == 0){
            break
        }
        out.WriteByte(msg)
        out.Flush()
        fmt.Println(" fogadva..")
    }
    wg.Done()
}

func doSomething(in <-chan byte, out chan<- byte, shift int) {
    for value := range in {
        shiftedChar := value + byte(shift)
        if shiftedChar > 'z' {
            shiftedChar = 'a' + (shiftedChar - 'z' - 1)
        }
        fmt.Println(shiftedChar, "tovabbitasa..")
        out<- shiftedChar
    }
    close(out)
}

func main() {
    wg.Add(1)
    defer wg.Wait()

    var beforeDoSomething [5]chan byte
    var afterDoSomething [5]chan byte

    stdin := bufio.NewReader(os.Stdin)
    stdout := bufio.NewWriter(os.Stderr)

    for i := 0; i < 5; i++ {
        beforeDoSomething[i] = make(chan byte)
        afterDoSomething[i] = make(chan byte)
    }

    for i := 0; i < 5; i++ {
        go doSomething(beforeDoSomething[i], afterDoSomething[i], i)
    }  
    go demultiplexer(stdin, beforeDoSomething)
    go multiplexer(afterDoSomething, stdout)
}

A sync könyvtár

Korábban már láttunk példát arra, hogy a goroutine-ok hozzáférhetnek a közös változókhoz is, pl. a program idő előtti tárminálását megakadályozó WaitGroup-hoz, amit package szintén, egy globális változó segíségével értek el. Ez tehát azt jelenti, hogy ilyenformán a Go alkalmas közös memóriás párhuzamos programozásra is. Az ehhez szükséges párhuzamos konstrukciókat a sync könyvtár függvényei is biztosítják, amelyek a korábban megismert közös memóriás párhuzamos konstrukciókat valósítanak meg.

Azt láttuk eddig, hogy a goroutine-ok között csatorna segítségével megvalósítható a szinkron és aszinkron kommunikáció. De vannak esetek, amikor igazából nincs szükség kommunikációra az egyes folyamatok között, így tehát hogy biztosítsuk a goroutine-ok számára a konfliktusmentes hozzáférést 1-1 globális vagy paraméterben kapott erőforráshoz, kölcsönös kizárást kell alkalmaznunk. Fontos, hogy amennyiben goroutine-ok szinkronizációját szeretnék biztosítani, továbbra is csatornát érdemes használni.

Ezenkívül használjuk az atomic könyvtárat is, ami hasznos atomi változókat biztosít.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {

    var ops1 int32
    var ops2 int

    wg := new(sync.WaitGroup)

    for i := 0; i < 50_000; i++ {
        wg.Add(1)

        go func() {
            atomic.AddInt32(&ops1, 1)
            ops2++
            wg.Done()
        }()
    }

    wg.Wait()

    fmt.Println("ops:", ops1, ops2)
}

Mutex lényegében a korábban már látott explicit zár, amivel megvalósítható a kölcsönös kizárás. Van egy impliciten tárolt értéke, ha ez 0-t vesz fel, akkor a zár nem blokkol, ellenkező esetben blokkol. 2 függvénye van:

  • Lock(): Zárol, tehát ha a Mutex már használatban van, akkor az adott goroutine blokkolódik.

  • Unlock(): Feloldja a zárt.

Önmagában a Mutex talán a Java synchronized kulcsszavára hasonlít, wait és signal műveletek nélkül.

A wait és a signal műveleteket a Cond típus biztosítja, amellyel feltételváltozót tudunk létrehozni. Feltételváltozókat egy korábban létrehozott Mutex segítségével tudunk létrehozni.

  • Wait(): Feloldja a Mutex-et, az aktuális goroutine futását pedig felfüggeszti. Hasonlóan a Java-hoz, itt is ciklusban érdemes vizsgálni a feltételt komplexebb problémák esetén, hogy elkerüljük a várakoztatási feltétel megsértését.

  • Signal(): Felébreszt egy blokkolt goroutine-t

  • Broadcast(): Felébreszti az összes blokkolt goroutine-t (Java signalAll()/notifyAll())

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
    "fmt"
    "sync"
    "time"
)


var wg sync.WaitGroup
var currentReaders int
var isWriting bool
const readers_writers_number = 20
const max_readers_number = 2

func reading(mutex *sync.Mutex, readers *sync.Cond, writers *sync.Cond, i int) {
    mutex.Lock()
        for currentReaders == max_readers_number || isWriting {
            readers.Wait();
        }
        currentReaders++
        fmt.Println(i, "reader is ready to ")

    mutex.Unlock()

    fmt.Println("read..")
    time.Sleep(time.Duration(200) * time.Millisecond)

    mutex.Lock()
        fmt.Println(i, "reader stopped reading")
        currentReaders--
        writers.Signal()
        readers.Broadcast()
    mutex.Unlock()  

    wg.Done()
}

func writing(mutex *sync.Mutex, readers *sync.Cond, writers *sync.Cond, i int) {
    mutex.Lock()
        for currentReaders > 0 || isWriting  {
            writers.Wait();
        }
        isWriting = true
        fmt.Println(i, "writer is ready to ")
    mutex.Unlock()

    fmt.Println("write..")
    time.Sleep(time.Duration(200) * time.Millisecond)

    mutex.Lock()
        isWriting = false
        fmt.Println(i, "reader stopped writing")
        writers.Signal()
        readers.Broadcast()
    mutex.Unlock()  

    wg.Done()
}


func main() {

    wg.Add(readers_writers_number * 2)
    defer wg.Wait()

    mutex := new(sync.Mutex)
    readers := sync.NewCond(mutex)
    writers := sync.NewCond(mutex)

    for i := 0; i < readers_writers_number; i++ {
        go writing(mutex, readers, writers, i)
        go reading(mutex, readers, writers, i)
    }
}      

Gyakorló feladat

I) Valósítsd meg az étkező filozófusok problémát a sync konstrukciók segítségével.

Kapcsolódó linkek

Go dokumentáció

Go standard könyvtárak


Utolsó frissítés: 2024-04-19 07:22:18