今天看啥  ›  专栏  ›  LitC

Channel(二)channel写

LitC  · 简书  ·  · 2021-03-14 14:29

2.1、channel创建

channel := make(chan int, 6)

channel创建,本质上就是初始化一个hchan结构体,使用make()创建channel,是如何调用到具体的实例化hchan的函数makechan()的?

在编译期根据make()中的不同类型,将其转换为不同类型的节点,

见/usr/local/go/src/cmd/compile/internal/gc/typecheck.go:327 OMAKE类型节点

func typecheck1(n *Node, top int) (res *Node) {
    // ...
    switch n.Op{
    case OMAKE:
        switch t.Etype {
        case TCHAN:
            l = nil
            if i < len(args){
                // ... 对缓冲区大小进行检测
                n.Left = l  // 带缓冲区,赋值缓冲区大小
            }else{
                n.Left = nodintconst(0) // 不带缓冲区
            }
            n.Op = OMAKECHAN
        }
    }
}

然后OMAKECHAN节点会/usr/local/go/src/cmd/compile/internal/gc/walk.go:416中转换成调用makechan或者makechan64的函数

// src/cmd/compile/internal/gc/walk.go
func walkexpr(n *Node, init *Nodes) *Node {
    switch n.Op {
    case OMAKECHAN:
        size := n.Left
        fnname := "makechan64"
        argtype := types.Types[TINT64]

        if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
            fnname = "makechan"
            argtype = types.Types[TINT]
        }
        n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
    }
}

makechan函数

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize % maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

2.2、写channel

image.jpeg

流程:

  • 首先判断recvq接收队列是否为空,若不为空,说明有消费者goroutine阻塞在队列里,即channel为空或者channel没有缓冲区。直接从recvq中取出一个G,写入数据,唤醒该G,结束写入流程。
  • 若recvq为空,判断当前环形队列是否有空位,如果有,将数据写入队列尾部,如果队列已满,则将当前写数据的goroutine加入到sendq队列中,等待被唤醒
channel := make(chan int, 6) channel <- 10

<- 发送语句实际会被转换为chansend1

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
   chansend(c, elem, true, getcallerpc())
}
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
 /*
 *c: 操作的channel
  ep:指针,指向发送的数据ch<-i
  block:是否阻塞调用,在select case中才会设置为false
  
  return:代表是否发送成功
  */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    //channel为nil的情况,
   if c == nil {
       //block为false,直接返回false,表示发送失败
      if !block {
         return false
      }
      //对于nil channel,直接挂起当前goroutine,永久阻塞
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   if debugChan {
      print("chansend: chan=", c, "\n")
   }

   if raceenabled {
      racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
   }

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   //
   // After observing that the channel is not closed, we observe that the channel is
   // not ready for sending. Each of these observations is a single word-sized read
   // (first c.closed and second full()).
   // Because a closed channel cannot transition from 'ready for sending' to
   // 'not ready for sending', even if the channel is closed between the two observations,
   // they imply a moment between the two when the channel was both not yet closed
   // and not ready for sending. We behave as if we observed the channel at that moment,
   // and report that the send cannot proceed.
   //
   // It is okay if the reads are reordered here: if we observe that the channel is not
   // ready for sending and then observe that it is not closed, that implies that the
   // channel wasn't closed during the first observation. However, nothing here
   // guarantees forward progress. We rely on the side effects of lock release in
   // chanrecv() and closechan() to update this thread's view of c.closed and full().
   if !block && c.closed == 0 && full(c) {
      return false
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

    //加锁
   lock(&c.lock)

    //如果channel关闭,panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

    //从待接收队列中获取等待的goroutine
   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      //只要可以从待接收队列中获取到goroutine,那么发送操作都是只需要copy一次,见code1:send
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      typedmemmove(c.elemtype, qp, ep)
      //递增存放发送数据的索引
      c.sendx++
      if c.sendx == c.dataqsiz {
          //调整索引
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
    //如果是非阻塞发送,那么可以直接解锁返回,未发送成功
   if !block {
      unlock(&c.lock)
      return false
   }

   // Block on the channel. Some receiver will complete our operation for us.
   //阻塞发送,挂起当前goroutine
   gp := getg()
   //生成配置sudo
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   //入队待发送队列
   c.sendq.enqueue(mysg)
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set gp.activeStackChans is not safe for
   // stack shrinking.
   atomic.Store8(&gp.parkingOnChan, 1)
   //挂起goroutine,等待唤醒
   //chanparkcommit函数会解锁,ch.lock
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}

code1:send

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if raceenabled {
      if c.dataqsiz == 0 {
         racesync(c, sg)
      } else {
         // Pretend we go through the buffer, even though
         // we copy directly. Note that we need to increment
         // the head/tail locations only when raceenabled.
         qp := chanbuf(c, c.recvx)
         raceacquire(qp)
         racerelease(qp)
         raceacquireg(sg.g, qp)
         racereleaseg(sg.g, qp)
         c.recvx++
         if c.recvx == c.dataqsiz {
            c.recvx = 0
         }
         c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
   }
   //sg.elem是指向接收goroutine中接收数据的指针 s<-ch
   //如果待接收goroutine需要接收具体的数据,那么直接将数据copy到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   //赋值param,待接收者被唤醒后会根据 param 来判断是否是被发送至唤醒的
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1) //唤醒待接收者
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   // src is on our stack, dst is a slot on another stack.
    //src 是发送的数据源地址,dst是接收数据的地址
    //src 在当前的goroutine栈中,dst在其他栈上
   // Once we read sg.elem out of sg, it will no longer
   // be updated if the destination's stack gets copied (shrunk).
   // So make sure that no preemption points can happen between read & use.
   dst := sg.elem
   // No need for cgo write barrier checks because dst is always
   // Go memory.
   //使用memove直接进行内存copy
   //因为dst指向其他goroutine的栈,如果它发生了栈收缩,那就没有修改真正的dst位置。所以需要加一个写屏障
   typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
   memmove(dst, src, t.size)
}

写channel的特性:

  • 向nil channel发送数据会被永久阻塞,并且不会被select语句选中
  • 如果channel未关闭,非缓冲区没有待接收的goroutine,或者缓冲区已满,那么不会被select语句选中
  • 向关闭的channel发送数据,会panic,并且可以被select语句选中,这也就意味着select语句中可能会panic
  • 如果有待接收者,那么会将发送的数据直接copy到待接收者的接收位置,然后唤醒接收者
  • 如果有缓冲区,并且缓冲区未满,那么就直接把发送的数据copy到缓冲区中
  • 如果channel未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前goroutine,等待被唤醒
  • 发送者被阻塞后,可以被关闭channel操作或者被接收者操作唤醒,关闭channel导致发送者被唤醒后,会panic
  • 当channel中有待接收goroutine,那么channel的状态必然是费缓冲或者缓冲区为空

发送数据,可以被select选中的情况:

  • channel已关闭
  • channel未关闭,channel有待接收的goroutine,或者缓冲区不为空且缓冲区未满



原文地址:访问原文地址
快照地址: 访问文章快照