sanhutrees 发表于 2018-9-20 11:43:17

sync.WaitGroup golang并发调度器

func (wg *WaitGroup) Add(delta int) {  statep := wg.state()
  if race.Enabled {
  _ = *statep // trigger nil deref early
  if delta < 0 {
  // Synchronize decrements with Wait.
  race.ReleaseMerge(unsafe.Pointer(wg))
  }
  race.Disable()
  defer race.Enable()
  }
  state := atomic.AddUint64(statep, uint64(delta) 32)
  w := uint32(state)
  if race.Enabled {
  if delta > 0 && v == int32(delta) {
  // The first increment must be synchronized with Wait.
  // Need to model this as a read, because there can be
  // several concurrent wg.counter transitions from 0.
  race.Read(unsafe.Pointer(&wg.sema))
  }
  }
  if v < 0 {
  panic("sync: negative WaitGroup counter")
  }
  if w != 0 && delta > 0 && v == int32(delta) {
  panic("sync: WaitGroup misuse: Add called concurrently with Wait")
  }
  if v > 0 || w == 0 {
  return
  }
  // This goroutine has set counter to 0 when waiters > 0.
  // Now there can't be concurrent mutations of state:
  // - Adds must not happen concurrently with Wait,
  // - Wait does not increment waiters if it sees counter == 0.
  // Still do a cheap sanity check to detect WaitGroup misuse.
  if *statep != state {
  panic("sync: WaitGroup misuse: Add called concurrently with Wait")
  }
  // Reset waiters count to 0.
  *statep = 0
  for ; w != 0; w-- {
  runtime_Semrelease(&wg.sema, false)
  }
  }
  // Done decrements the WaitGroup counter by one.
  func (wg *WaitGroup) Done() {
  wg.Add(-1)
  }
  // Wait blocks until the WaitGroup counter is zero.
  func (wg *WaitGroup) Wait() {
  statep := wg.state()
  if race.Enabled {
  _ = *statep // trigger nil deref early
  race.Disable()
  }
  for {
  state := atomic.LoadUint64(statep)
  v := int32(state >> 32)
  w := uint32(state)
  if v == 0 {
  // Counter is 0, no need to wait.
  if race.Enabled {
  race.Enable()
  race.Acquire(unsafe.Pointer(wg))
  }
  return
  }
  // Increment waiters count.
  if atomic.CompareAndSwapUint64(statep, state, state+1) {
  if race.Enabled && w == 0 {
  // Wait must be synchronized with the first Add.
  // Need to model this is as a write to race with the read in Add.
  // As a consequence, can do the write only for the first waiter,
  // otherwise concurrent Waits will race with each other.
  race.Write(unsafe.Pointer(&wg.sema))
  }
  runtime_Semacquire(&wg.sema)
  if *statep != 0 {
  panic("sync: WaitGroup is reused before previous Wait has returned")
  }
  if race.Enabled {
  race.Enable()
  race.Acquire(unsafe.Pointer(wg))
  }
  return
  }
  }
  }
  因此可以简单的解释为:
  1.wg.add 为调度器添加
  2.wg.done := wg.add(-1) 即调度器删除一个任务(非指定)
  3.wg.wait() 等待 所有并发结束后 解除堵塞
  因此可以利用这三个方法建立我们简单的并发模型:


页: [1]
查看完整版本: sync.WaitGroup golang并发调度器