今天看啥  ›  专栏  ›  go4it

聊聊dapr的Limiter

go4it  · 掘金  ·  · 2021-03-06 22:11
阅读 12

聊聊dapr的Limiter

本文主要研究一下dapr的Limiter

Limiter

dapr/pkg/concurrency/limiter.go

const (
	// DefaultLimit is the default concurrency limit
	DefaultLimit = 100
)

// Limiter object
type Limiter struct {
	limit         int
	tickets       chan int
	numInProgress int32
}
复制代码

Limiter定义了limit、tickets、numInProgress属性

NewLimiter

dapr/pkg/concurrency/limiter.go

// NewLimiter allocates a new ConcurrencyLimiter
func NewLimiter(limit int) *Limiter {
	if limit <= 0 {
		limit = DefaultLimit
	}

	// allocate a limiter instance
	c := &Limiter{
		limit:   limit,
		tickets: make(chan int, limit),
	}

	// allocate the tickets:
	for i := 0; i < c.limit; i++ {
		c.tickets <- i
	}

	return c
}
复制代码

NewLimiter方法根据limit来创建Limiter,并挨个分配ticket

Execute

dapr/pkg/concurrency/limiter.go

// Execute adds a function to the execution queue.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *Limiter) Execute(job func(param interface{}), param interface{}) int {
	ticket := <-c.tickets
	atomic.AddInt32(&c.numInProgress, 1)
	go func(param interface{}) {
		defer func() {
			c.tickets <- ticket
			atomic.AddInt32(&c.numInProgress, -1)
		}()

		// run the job
		job(param)
	}(param)
	return ticket
}
复制代码

Execute方法首先获取ticket,然后递增numInProgress,之后异步执行job,执行完后归还ticket

Wait

dapr/pkg/concurrency/limiter.go

// Wait will block all the previously Executed jobs completed running.
//
// IMPORTANT: calling the Wait function while keep calling Execute leads to
//            un-desired race conditions
func (c *Limiter) Wait() {
	for i := 0; i < c.limit; i++ {
		<-c.tickets
	}
}
复制代码

Wait方法遍历limit,挨个等待tickets返回

小结

dapr的Limiter定义了limit、tickets、numInProgress属性;它定义了Execute、Wait方法,同时提供NewLimiter的工厂方法。

doc




原文地址:访问原文地址
快照地址: 访问文章快照