Posts Tags Categories About
Golang并发

生产者消费者模型

Q: 写一个队列, 让生产者和消费者解耦, 并且队列长度可以调整.

针对这个问题, 首先可以先考虑定长的队列, 因为当队列大小变化时可能需要进行数据拷贝.

关于循环队列当head == tail时表示队列为空, 当nextHead == tail时表示队列已满.

在下面的实现中提供了两种接口, 一种当操作失败时返回错误; 另一种当操作失败时阻塞, 并在被唤醒后重试.

type Channel struct {
	lock        *sync.Mutex
	buf         []int
	head        int
	tail        int
	consumeCond *sync.Cond
	produceCond *sync.Cond
}

func NewChannel(size int) *Channel {
	channel := &Channel{
		buf:  make([]int, size+1),
		head: 0,
		tail: 0,
	}

	channel.lock = new(sync.Mutex)
	channel.consumeCond = sync.NewCond(channel.lock)
	channel.produceCond = sync.NewCond(channel.lock)

	return channel
}

func (c *Channel) nextHead() int {
	return (c.head + 1) % len(c.buf)
}

func (c *Channel) nextTail() int {
	return (c.tail + 1) % len(c.buf)
}

func (c *Channel) empty() bool {
	return c.head == c.tail
}

func (c *Channel) full() bool {
	return c.nextHead() == c.tail
}

func (c *Channel) TryProduce(data int) error {
	c.lock.Lock()
	defer c.lock.Unlock()

	if c.full() {
		return errors.New("channel is full when produce data")
	}

	c.buf[c.head] = data
	c.head = c.nextHead()

	return nil
}

func (c *Channel) TryConsume() (int, error) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if c.empty() {
		return 0, errors.New("channel is empty when consume data")
	}

	data := c.buf[c.tail]
	c.tail = c.nextTail()

	return data, nil
}

func (c *Channel) Produce(data int) {
	c.lock.Lock()
	defer c.lock.Unlock()

	for c.full() {
		c.produceCond.Wait()
	}

	c.buf[c.head] = data
	c.head = c.nextHead()

	c.consumeCond.Signal()
}

func (c *Channel) Consume() int {
	c.lock.Lock()
	defer c.lock.Unlock()

	for c.empty() {
		c.consumeCond.Wait()
	}

	data := c.buf[c.tail]
	c.tail = c.nextTail()

	c.produceCond.Signal()

	return data
}

另外队列还可以加入topic的功能, 这样就实现了消息总线, 在锁方面应该还有优化空间.

在调整队列长度时, 我们也可以借鉴kafka, 设计成有分区概念的队列.