- A+
RabbitMQ
RabbitMQ的消息流主要由两个部分组成:Exchange和Queue。
Exchange
Exchange其实可以类比为一个交换机?其根据Exchange的类型和一些规则,来将消息分发到特定的Queue队列中。
RabbitMQ支持以下几种Exchange类型:
Direct Exchange
将每个Message(消息)的routing_key
与其下的Queue进行匹配,若一致则将Message下发到对应的Queue
Default Exchange
由RabbitMQ预定义的一个Exchange,当一个Message的Exchange为空字符串
时,就会交由这个Default Exchange去处理。且该Message会被路由到和它routing_key
同名的Queue去。
每个Queue都与Default Exchange进行绑定,绑定的routing_key
为Queue的名称
Topic Exchange
- 将
routing_key
以.
分割成多个单词 *
匹配一个单词#
匹配一个或零个单词test.*
匹配test.fun
,不匹配test
test.#
匹配test.fun
和test
Fanout Exchange
将Message下发到其下的所有Queue里(相当于一次针对Queue的广播)
Headers Exchange
跟Topic Exchange类似,但是是按header
参数进行匹配。
在Exchange和Queue的Binding中有一个特殊参数x-match
,用来表示需要全部参数匹配还是部分参数匹配:
* x-match = all
:header
的所有参数都要和Binding的条件匹配才行,默认
* x-match = any
:header
只要有一个参数与Binding的条件匹配就行
若出现多个匹配的Queue,则将消息分发到每个匹配的Queue。
Dead Letter Exchange
用于捕捉没有匹配到Queue的所有Message。
Celery
Celery的配置有两个参数,分别是task_routes
和task_queues
。
当仅指定task_routes
,会自动合适的生成Queue和Exchange,但是很多时候我们想要对Exchange和Queue进行定制化,这就到task_queues
出场的时候了。
task_queues
task_queues
的默认值为None
,但若是我们需要对Queue进行定制化,则需要给他赋值:
app.conf.task_queues = [
Queue(...),
Queue(...)
]
task_routes
task_routes
是一个用于描述task.name
(任务名称)和Queue(队列)关系的字典。
当一个任务的task.name
符合(完全匹配、正则匹配等)某个key
时,该任务就会被分发到queue
名称相对应的Queue
app.conf.task_routes = {
'test': {'queue': 'test_queue'},
'test.*' {'queue': 'test_prefix_queue', 'routing_key': 'test.a'},
re.compile(r'\d{10}'): {'queue': '10_digits_queue'}
}
注意一点,这里的key
仅用于和Celery的task.name
进行匹配,而不是最后生成Message的routing_key
。
最后生成的routing_key
是由在task_queues
里的Queue()
对象进行指定的。
Exchange
用于生成特定类型的Exchange。
from kombu import Exchange
direct_exchange = Exchange(name='direct_exchange', type='direct')
fanout_exchange = Exchange(name='fanout_exchange', type='fanout')
topic_exchange = Exchange(name='topic_exchange', type='topic')
headers_exchange = Exchange(name='headers_exchange', type='headers')
其还支持以下参数:
* durablt
:在服务器重启后,是否保留该Exchange,默认为True
* auto_delete
:在其下的所有Queue都完成之后,自动删除Exchange,默认为False
* delivery_mode
:默认的消息分发方式
* 1
:消息仅保留在内存里,重启丢失
* 2
:消息保存在内存和硬盘里,重启不丢失,默认值
* arguments
:其他一些参数(比如Headers Exchange的x-match
参数)
Queue
用于生成特定的Queue。
from kombu import Queue
Queue(name='test', exchange=direct_exchange, routing_key='test')
Queue(name='topic.a', exchange=topic_exchange, routing_key='fanout.*')
Queue(name='fanout_queue', exchange=fanout_exchange)
Queue
主要有以下参数:
* name
:队列名称
* exchange
:队列绑定的Exchange
* routing_key
:队列绑定时采用的routing key
,根据不同类型的Exchange而不同
* durable
:服务器重启后是否保留队列,默认为True
* auto_delete
:当所有的消费者完成之后,是否自动删除队列,默认为False
* expires
:当队列不再被使用(没有任何消费者,没有被重新声明)一定时间后,自动删除队列
* message_ttl
:队列内消息的ttl
* max_length
:队列能够储存的消息总数量的上限
* mex_length_bytes
:队列能储存的消息总大小的上限
* max_priority
:队列的最大优先级
优先级相关
RabbitMQ原生支持优先级设定,但是默认不开启优先级。
我们需要在声明Queue队列的时候,指定x-max-priority
参数,才能开启优先级功能。
优先级范围是0 ~ 255
,越大优先级越高,若x-max-priority
设置越大,则越消耗服务器计算资源。
from kombu import Queue
Queue(name='test', exchange=direct_exchange, routing_key='test', max_priority=10)
这里的优先级其实指的是,在一个队列里,优先级高的消息先被消费。
若在Celery里,一个Task在没有指定priority
,则该消息的优先级为0
。
参考
- 我的微信
- 这是我的微信扫一扫
- 我的微信公众号
- 我的微信公众号扫一扫