天天看點

Go語言goroutine并發處理

模拟并發事務處理:

package main

import (

    "fmt"

    "math/rand"

    "time"

)

type job struct {

    jobID int

    load int    //seconds needed to finish the job

}

const jobCount int = 20

const workerCount int = 5

var jobQueue = make(chan job, jobCount)

var waitQueue = make(chan bool, workerCount)

type worker struct {

    id int

}

func (w worker) doTheJob() {

    time.Sleep(time.Second * 5)

    for {

        select {

            case j := <- jobQueue:

                fmt.Println("Worker [", w.id, "] started doing job [", j.jobID, "], job load : ", j.load)

                time.Sleep(time.Second * time.Duration(j.load))

                fmt.Println("Worker [", w.id, "] finished job [", j.jobID, "]")

            default:

                waitQueue <- true

        }

    }

}

func main() {

    var workers [workerCount]worker

    for i := 0; i < workerCount; i++ {

        workers[i] = worker{id: i + 1}

        go workers[i].doTheJob()

    }

    for i := 0; i < jobCount; i++ {

        j := job{jobID: i + 1, load: rand.Intn(10)}

        jobQueue <- j

        time.Sleep(time.Second)

    }

    for i := 0; i < workerCount; i++ {

        <- waitQueue

    }

    close(jobQueue)

    close(waitQueue)

}

使用waitGroup等待所有goroutine傳回:

package main

import (

    "fmt"

    "math/rand"

    "time"

"sync"

)

func main() {

    var wg sync.WaitGroup

    const total int = 10

    wg.Add(total)

    for i := 0; i < 10; i++ {

        go func(i int) {

            defer wg.Done()

            fmt.Println("Start job", i)

            time.Sleep(time.Second * time.Duration(rand.Intn(10)))

            fmt.Println("Stop job", i)

        } (i)

    }

    wg.Wait()

}

使用Timer加入等待逾時的機制避免資源競争導緻死鎖:

package main

import (

    "fmt"

    "time"

    "math/rand"

    "sync"

)

const CUSTOMER_REQUEST_TIMEOUT int = 30

const FRONTEND_REQUEST_TIMEOUT int = 60

const BACKEND_RESPONSE_TIMEOUT int = 60

type customerRequest struct {

    message string    // request message

    id int    // customer id

    load int

}

type frontendRequest struct {

    message string    // modified message

    id int    // customer id

    load int

}

type backendResponse struct {

    id int    // customer id

    message string    // response msg

}

type customer struct {

    id int

}

type frontendWorker struct {

    id int

}

type backendWorker struct {

    id int

}

func (c customer) sendRequests(customerRequestQueue chan customerRequest) {

    t := time.NewTimer(time.Second * time.Duration(CUSTOMER_REQUEST_TIMEOUT))

    for {

        r := customerRequest { id: c.id, message: "[Customer][" + fmt.Sprintf("%d", c.id) + "] request", load: rand.Intn(5) + 1 }

        select {

            case customerRequestQueue <- r:

                fmt.Println("Customer", c.id, "initialized a request")

            case <- t.C:

                fmt.Println("Customer request timeout, customer id:", c.id)

                t.Reset(time.Second * time.Duration(CUSTOMER_REQUEST_TIMEOUT))

        }

        time.Sleep(time.Second * time.Duration(rand.Intn(3)+1))    // every 1~4 second a customer will raise a request

    }

}

func (w frontendWorker) process(customerRequestQueue chan customerRequest, frontendRequestQueue chan frontendRequest, backendResponseQueue chan backendResponse) {

    t := time.NewTimer(time.Second * time.Duration(FRONTEND_REQUEST_TIMEOUT))

    for {

        select {

            case x := <- customerRequestQueue:

                fmt.Println("[Frontend]", w.id, ": Customer request received, customer id:", x.id)

                time.Sleep(time.Second * time.Duration(x.load))

                fmt.Println("[Frontend]", w.id, ": Customer request processed, customer id:", x.id)

                r := frontendRequest { id: x.id, message: "[Frontend]" + x.message, load: x.load + 2 }

                frontendRequestQueue <- r

            case x := <- backendResponseQueue:

                fmt.Println(x.message, "handled")

            case <- t.C:

                fmt.Println("Frontend request timeout, frontend worker id:", w.id)

                t.Reset(time.Second * time.Duration(FRONTEND_REQUEST_TIMEOUT))

        }

    }

}

func (w backendWorker) process(frontendRequestQueue chan frontendRequest, backendResponseQueue chan backendResponse) {

    t := time.NewTimer(time.Second * time.Duration(BACKEND_RESPONSE_TIMEOUT))

    for {

        select {

            case x := <- frontendRequestQueue:

                fmt.Println("[Backend]", w.id, ": Frontend request received, customer id:", x.id)

                time.Sleep(time.Second * time.Duration(x.load))

                fmt.Println("[Backend]", w.id, ": Frontend request processed, customer id:", x.id)

                r := backendResponse { id: x.id, message: "[Backend]" + x.message }

                backendResponseQueue <- r

            case <- t.C:

                fmt.Println("Backend response timeout, backend worker id:", w.id)

                t.Reset(time.Second * time.Duration(BACKEND_RESPONSE_TIMEOUT))

        }

    }

}

func main() {

    var wg sync.WaitGroup

    wg.Add(1)

    customerRequestQueue := make(chan customerRequest, 10)

    frontendRequestQueue := make(chan frontendRequest, 20)

    backendResponseQueue := make(chan backendResponse, 20)

    for i := 0; i < 100; i++ {

        c := customer { id: i+1 }

        go c.sendRequests(customerRequestQueue)

    }

    for i := 0; i < 10; i++ {

        w := frontendWorker { id: i+1 }

        go w.process(customerRequestQueue, frontendRequestQueue, backendResponseQueue)

    }

    for i := 0; i < 20; i++ {

        w := backendWorker { id: i+1 }

        go w.process(frontendRequestQueue, backendResponseQueue)

    }

    wg.Wait()    // wait forever

}

通過close方法向所有“sub goroutine”發送“關閉消息”:

package main

import (

    "fmt"

    "time"

    "math/rand"

"sync"

)

const TASKTIME int = 10

type parentProcess struct {

    id int

}

type childProcess struct {

    id int

}

func (p parentProcess) process(childCount int, w *sync.WaitGroup) {

    outQueue := make(chan string, 20)

    closeQueue := make(chan bool)

    t := time.NewTimer(time.Second * time.Duration(TASKTIME))

    var wg sync.WaitGroup

    wg.Add(childCount)

    for i := 0; i < childCount; i++ {

        var c childProcess

        c = childProcess{id: i+1}

        go c.process(outQueue, closeQueue, &wg)

    }

    for {

        select {

            case x := <- outQueue:

                fmt.Println(p.id, "=>", x)

            case <- t.C:

                t.Stop()

                close(closeQueue)

                fmt.Println(p.id, "waiting for all children to quit")

                wg.Wait()

                close(outQueue)

                w.Done()

                fmt.Println(p.id, "quit")

                return

        }

    }

}

func (c childProcess) process(outQueue chan string, closeQueue chan bool, wg *sync.WaitGroup) {

    for {

        select {

            case outQueue <- fmt.Sprintf("%d produced %d", c.id, rand.Intn(1000)):

                time.Sleep(time.Second)

            case <-closeQueue:

                wg.Done()

                return

        }

    }

}

func main() {

    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {

        wg.Add(1)

        var p parentProcess

        p = parentProcess{id: i+100}    // parent Id starts from 100

        go p.process(rand.Intn(5)+5, &wg)

    }

    wg.Wait()

}