打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Go并发处理

写了一个web接口,想高并发的请求这个接口,进行压力测试,所以服务端就实现了一个线程池。

代码从网上理解了之后写的。代码实例

简单的介绍:

  首先实现一个Job接口,只要有方法实现了Do方法即可

  定义个分发器结构体,主要是WorkPool线程池,用于存储Worker的JobChannel

  init的时候,先初始化一个JobQueue队列,其他的函数调用这个线程池的时候,把任务放在这个队列即可。

  然后Run的时候,创建多个Worker,起初的时候,woker会把自身的JobChannel先注册到线程池workerPool中

  然后worker.start就是for{select } 阻塞等待JobChannel中的job任务

  此时又启一个go d.Dispatcher() ,将JobQueue中的job任务放在worker的Jobchannle中。这样上面的for{select} 就可以拿到任务去执行

  

注: maxWorkers 是内核CPU数量,本机4核,就是线程池可以放4个JobChannel,所以,在newWorker的时候,就创建了4个Worker来并发的处理job任务。

 

任务处理

package workPoolimport "fmt"type Worker struct {WorkerPool chan chan JobJobChannel chan JobQuit chan bool}func NewWorker(workpool chan chan Job) *Worker {return &Worker{WorkerPool: workpool,JobChannel: make(chan Job),Quit: make(chan bool)}}func (w *Worker) Start()  {go func() {for{w.WorkerPool <-w.JobChannelselect {case job := <-w.JobChannel:if err := job.Do();err !=nil{fmt.Println("exec some failed ....")}case <-w.Quit:return}}}()}func (w *Worker) Stop()  {go func() {w.Quit <-true}()}

  

 

实现一个分发器

package workPoolimport "runtime"var(MaxWorkers = runtime.NumCPU()MaxQueue = 512)type Job interface {Do() error}var JobQueue chan Jobtype Dispatcher struct {MaxWorkers intWorkerPool chan chan JobQuit chan bool}func init()  {runtime.GOMAXPROCS(MaxWorkers)JobQueue  = make(chan Job,MaxQueue)dispatcher := NewDispatcher(MaxWorkers)dispatcher.Run()}func NewDispatcher(maxWorkers int) *Dispatcher {pool := make(chan chan Job,maxWorkers)return &Dispatcher{MaxWorkers: maxWorkers,WorkerPool: pool,Quit: make(chan bool)}}func (d *Dispatcher) Run()  {for i:=0;i<d.MaxWorkers;i++{worker := NewWorker(d.WorkerPool)worker.Start()}go d.Dispatcher()}func (d *Dispatcher) Dispatcher() {for  {select {case job := <-JobQueue:jobChannel := <-d.WorkerPooljobChannel <- jobcase <-d.Quit:return}}}

  

main函数中可以这样使用
package mainimport ("context_http/workPool""fmt""net/http")type Msg struct {mobile string}func (m *Msg) Do() error {m.mobile = m.mobile+"_test"fmt.Println(m.mobile)return nil}func getMobile(w http.ResponseWriter,r *http.Request)  {defer r.Body.Close()r.ParseForm()mobile := r.PostForm.Get("mobile")var work workPool.Jobm := Msg{mobile: mobile}work = &mworkPool.JobQueue <- workstatus := `{"status":"ok"}`w.Write([]byte(status))}func main() {http.HandleFunc("/test",getMobile)err := http.ListenAndServe(":8081",nil)if err !=nil{fmt.Println("server failure :",err)return}}

  

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Golang百万级高并发实践
Golang Channel用法简编 | Tony Bai
定时器,打点器,工作池,速率限制,原子计数器
22 Go常见的并发模式和并发模型
Go 并发可视化解释 — 通道
深入Eclipse多线程机制
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服