【Go】RabbitMQ学习笔记

1. 介绍

RabbitMQ is the most widely deployed open source message broker.

直接把 RabbitMQ 官网的介绍抄过来:它是部署最广泛的消息代理

2. 作用

消息代理中间件的作用:

  • 削峰:大量请求写入消息中间件,服务器按照自己能力去消费
  • 解耦:服务上游与下游解耦,上游只需把请求写入消息代理,下游只需要去消息中间件消费
  • 异步:实时性不高的功能放入代理,快速响应客户端请求

3. 竞品

  • ActiveMQ(已经寄了)
  • RocketMQ(阿里开源,广泛应用在电商场景)
  • Kafka(大数据利器,日志系统重要组件)

4. 系统架构

  • 生产者:消息发送方
  • channel:通信信道,一个连接里有多个可用信道(感觉是资源池),生产者和消费者都会与 broker 建立连接
  • exchange:交换机,生产者将消息发送给对应交换机(生产者只与交换机打交道),交换机通过 routingKey 将消息发送给消息队列
  • queue:消息队列,存放消息的实体
  • 消费者:从消息队列中消费消息
  • 虚拟主机:多租户的资源隔离(不想深入,也不做过多了解)

5. 工作模式

RabbitMQ 官方给出了七种工作模式

  • Hello World
  • Work Queues
  • Publish/Subscribe
  • Routing
  • Topics
  • RPC
  • Publisher confirms

6. 安装 RabbitMQ

我的虚拟机已经安装好Docker,通过 Docker 安装带 web 可视化插件的RabbitMQ,其中 5672 是服务端的端口,15672 是 web 可视化的端口

1
docker run --name rabbit --restart=always -p 15672:15672 -p 5672:5672 -d rabbitmq:management

WechatIMG75.jpeg 访问 web 管理界面,用户名密码都是guest

WechatIMG76.jpeg

7. 创建项目

  • 创建目录:mkdir rabbitmq-demo
  • 初始化项目:go mod init rabbitmq-demo
  • go 客户端选用:老规矩,最多 star 的项目,虽然已经停止维护很久了github.com/streadway/amqp

8. 发送消息:以简单模式为例

简单模式就是点对点的消息传输模型,官网称之为Hello World模式。生产者发送消息给指定队列,消费者从指定队列消费消息,其中生产者以及消费者都绑定的是默认交换机。

rabbitmq-hello-world

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
	"fmt"
	"github.com/streadway/amqp"
)

const url = "amqp://guest:guest@192.168.18.3:5672/"

func main() {
	// 连接服务器
	conn, err := amqp.Dial(url)
	if err != nil {
		fmt.Println(err)
	}
	defer conn.Close()
	// 获取channel
	channel, err := conn.Channel()
	defer channel.Close()
	if err != nil {
		fmt.Println(err)
	}
	// 声明队列
	queue, err := channel.QueueDeclare("test", false, false, false, false, nil)
	if err != nil {
		fmt.Println(err)
	}
	// 发布消息
	err = channel.Publish("", queue.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("hello world")})
	if err != nil {
		fmt.Println(err)
	}
}

9. 接受消息:以简单模式为例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"github.com/streadway/amqp"
)

const rabbit_url = "amqp://guest:guest@192.168.18.3:5672/"

func main() {
	// 链接服务器
	conn, err := amqp.Dial(rabbit_url)
	if err != nil {
		fmt.Println(err)
	}
	defer conn.Close()
	// 	建立链接
	channel, err := conn.Channel()
	if err != nil {
		fmt.Println(err)
	}
	defer channel.Close()
	// 声明队列
	queue, err := channel.QueueDeclare("test", false, false, false, false, nil)
	if err != nil {
		fmt.Println(err)
	}
	// 消费消息
	msg, err := channel.Consume(queue.Name, "", true, false, false, false, nil)
	if err != nil {
		fmt.Println(err)
	}
	// 遍历管道,读取消息
	for d := range msg {
		fmt.Println(string(d.Body))
	}
}

10. 如何保障消息不丢失

10.1 消息应答机制

RabbitMQ 有两种消息应答机制:自动应答手动应答自动应答即 NOACK,不需要接收方回应 ACK 消息,默认是这种应答方式。要开启手动应答,首先要把 channel.Consume 方法的第三个参数设置为 false

1
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

还需要在消费消息时调用AckReject或 Nack方法

1
func (d Delivery) Ack(multiple bool) error

10.2 消息持久化

要保障消息不丢失,就要保障消息的持久化,避免服务器故障或重启时内存中的消息丢失。消息持久化分为两部分:队列持久化、消息持久化。

队列持久化需要在声明队列时把参数 durable 传为 true

1
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

消息持久化需要在实例化消息时把Publishing结构体的DeliveryMode字段设置为Persistent

1
2
3
4
5
6
channel.Publish("", queue.Name, false, false, amqp.Publishing{
	DeliveryMode: amqp.Persistent,
	ContentType: "text/plain",
	Body: []byte("hello world"),
	}
)

10.3 生产方发布确认

生产方发布确认又叫发布确认模式,是一种保障生产方能确认发布消息到 Broker 的机制

要开启发布确认模式需要调用 channel.Conform 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*
Confirm puts this channel into confirm mode so that the client can ensure all
publishings have successfully been received by the server.  After entering this
mode, the server will send a basic.ack or basic.nack message with the deliver
tag set to a 1 based incremental index corresponding to every publishing
received after the this method returns.

Add a listener to Channel.NotifyPublish to respond to the Confirmations. If
Channel.NotifyPublish is not called, the Confirmations will be silently
ignored.

The order of acknowledgments is not bound to the order of deliveries.

Ack and Nack confirmations will arrive at some point in the future.

Unroutable mandatory or immediate messages are acknowledged immediately after
any Channel.NotifyReturn listeners have been notified.  Other messages are
acknowledged when all queues that should have the message routed to them have
either received acknowledgment of delivery or have enqueued the message,
persisting the message if necessary.

When noWait is true, the client will not wait for a response.  A channel
exception could occur if the server does not support this method.

*/
func (ch *Channel) Confirm(noWait bool) error {
	if err := ch.call(
		&confirmSelect{Nowait: noWait},
		&confirmSelectOk{},
	); err != nil {
		return err
}
	ch.confirmM.Lock()
	ch.confirming = true
	ch.confirmM.Unlock()
	return nil
}

再调用NotifyPublish方法或NotifyConfirm注册发布消息的回调函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
NotifyConfirm calls NotifyPublish and starts a goroutine sending
ordered Ack and Nack DeliveryTag to the respective channels.

For strict ordering, use NotifyPublish instead.
*/
func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
	confirms := ch.NotifyPublish(make(chan Confirmation, cap(ack)+cap(nack)))

	go func() {
		for c := range confirms {
			if c.Ack {
				ack <- c.DeliveryTag
			} else {
				nack <- c.DeliveryTag
			}
		}
		close(ack)
		if nack != ack {
			close(nack)
		}
	}()

	return ack, nack
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
NotifyPublish registers a listener for reliable publishing. Receives from this
chan for every publish after Channel.Confirm will be in order starting with
DeliveryTag 1.

There will be one and only one Confirmation Publishing starting with the
delivery tag of 1 and progressing sequentially until the total number of
Publishings have been seen by the server.

Acknowledgments will be received in the order of delivery from the
NotifyPublish channels even if the server acknowledges them out of order.

The listener chan will be closed when the Channel is closed.

The capacity of the chan Confirmation must be at least as large as the
number of outstanding publishings.  Not having enough buffered chans will
create a deadlock if you attempt to perform other operations on the Connection
or Channel while confirms are in-flight.

It's advisable to wait for all Confirmations to arrive before calling
Channel.Close() or Connection.Close().

*/
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
	ch.notifyM.Lock()
	defer ch.notifyM.Unlock()

	if ch.noNotify {
		close(confirm)
	} else {
		ch.confirms.Listen(confirm)
	}

	return confirm

}

11. 工作队列模式

11.1 公平调度的工作队列模式

工作队列模式类似于 Kafka 的消费者组,为了提高消费者的消费能力。生产者代码与简单模式一致,为了能从命令行控制要发送的消息,做一些改造。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"os"
	"strings"
)

const url = "amqp://guest:guest@47.99.140.12:5672/"

func main() {
	connection, err := amqp.Dial(url)
	if err != nil {
		fmt.Println(err)
	}
	defer connection.Close()
	channel, err := connection.Channel()
	if err != nil {
		fmt.Println(err)
	}
	defer channel.Close()
	queue, err := channel.QueueDeclare("work_queues", true, false, false, false, nil)
	if err != nil {
		fmt.Println(err)
	}
	body := bodyFrom(os.Args)
	err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
		ContentType:  "text/plain",
		DeliveryMode: amqp.Persistent,
		Body:         []byte(body),
	})
	if err != nil {
		fmt.Println(err)
	}
	fmt.Printf("send msg %s", body)
}

func bodyFrom(args []string) string {
	var s string
	if len(args) < 2 || os.Args[1] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}

消费者代码也与简单模式一致,为了能从多个消费者区分差异,对消息做了一些虚假的业务处理(按照消息中字符.的个数睡眠相应秒数)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
	"bytes"
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

const rabbit_url = "amqp://guest:guest@47.99.140.12:5672/"

func main() {
	connection, err := amqp.Dial(rabbit_url)
	if err != nil {
		fmt.Println(err)
	}
	defer connection.Close()
	channel, err := connection.Channel()
	if err != nil {
		fmt.Println(err)
	}
	defer channel.Close()
	queue, err := channel.QueueDeclare("work_queues", true, false, false, false, nil)
	if err != nil {
		fmt.Println(err)
	}
	deliveries, err := channel.Consume(queue.Name, "", true, false, false, false, nil)
	if err != nil {
		fmt.Println(err)
	}
	for delivery := range deliveries {
		fmt.Printf("receive work: %s\n", delivery.Body)
		dotCount := bytes.Count(delivery.Body, []byte("."))
		d := time.Duration(dotCount)
		time.Sleep(d * time.Second)
		fmt.Printf("work done after %d\n", d)
	}
}

先分别在两个终端中启动两个消费者

1
go run receive.go

再启动生产者发送多条消息

1
2
3
4
5
go run send.go First Message.
go run send.go Second Message..
go run send.go Third Message...
go run send.go Fourth message....
go run send.go Fifth message.....

生产者 生产者.png

消费者 1 消费者1.png

消费者 2 消费者2.png

消费者 1 消费了消息 135,消费者 2 消费了消息 24,这样提高了消费者消费的能力,并且由于工作队列模式默认的消息分发机制是轮询,官网称之为Fair dispatch(公平调度)

当有多个消费者时,每个消费者的消费能力不一样,轮询的方式会有问题。

11.2 设置预取值的工作队列方式

为了解决公平调度带来的问题,我们可以将预取计数设置为值1。这告诉RabbitMQ不要一次向一个worker提供多条消息。

或者,换句话说,在worker处理并确认前一条消息之前,不要向worker发送新消息。相反,它会将其调度给下一个尚未繁忙的工作人员。

设置预取值需要调用 channel.Qos 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 通常设置 prefetchCount = 1 prefetchSize = 0  global = false
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error {
	return ch.call(
		&basicQos{
			PrefetchCount: uint16(prefetchCount),
			PrefetchSize:  uint32(prefetchSize),
			Global:        global,
		},
		&basicQosOk{},
	)
}
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计