問(wèn)題描述
我正在嘗試將消息發(fā)布到隊(duì)列,然后只有當(dāng)它包含某個(gè)標(biāo)頭時(shí)才讓某些消費(fèi)者使用它,如果它包含另一個(gè)標(biāo)頭,另一個(gè)消費(fèi)者會(huì)使用它.
I'm trying to publish a message to a queue and then have certain consumers consume it only if it contains a certain header and another consumer consume it if it contains another header.
到目前為止,我所做的是設(shè)置一個(gè)標(biāo)頭交換,僅當(dāng)消息包含該標(biāo)頭時(shí)才將消息路由到某個(gè)隊(duì)列.
What I've done so far is to setup a headers-exchange that routes messages to a certain queue only if it contains that header.
這是我用來(lái)設(shè)置交換、隊(duì)列和監(jiān)聽器的配置:
This is the config I'm using to setup the exchange and the queue and the listener:
<!-- Register Queue Listener Beans -->
<bean id="ActionMessageListener" class="com.mycee.Action" />
<!-- Register RabbitMQ Connections -->
<rabbit:connection-factory
id="connectionFactory"
port="${rabbit.port}"
virtual-host="${rabbit.virtual}"
host="${rabbit.host}"
username="${rabbit.username}"
password="${rabbit.password}"
connection-factory="nativeConnectionFactory" />
<!-- Register RabbitMQ Listeners -->
<rabbit:listener-container
connection-factory="connectionFactory"
channel-transacted="true"
requeue-rejected="true"
concurrency="${rabbit.consumers}">
<rabbit:listener queues="${queue.myqueue}" ref="ActionMessageListener" method="handle"/>
</rabbit:listener-container>
<!-- Setup RabbitMQ headers exchange -->
<rabbit:headers-exchange id="${exchange.myexchange}" name="${exchange.myexchange}">
<rabbit:bindings>
<rabbit:binding queue="${queue.myqueue}" key="action" value="action3" />
</rabbit:bindings>
</rabbit:headers-exchange>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="${queue.myqueue}" />
所以我使用 action 的鍵和 action3 的值將 myqueue 綁定到 myexchange.
So I'm binding myqueue to myexchange using key of action and value of action3.
現(xiàn)在當(dāng)我在交易所發(fā)布時(shí):
Now when I publish on the exchange:
即使操作設(shè)置為 action1 而不是 action3,ChannelAwareMessageListener 仍在使用它
the ChannelAwareMessageListener is consuming it even though the action was set to action1 instead of action3
public class Action implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(message.toString());
}
}
要么我沒(méi)有正確使用 headers-exchange,要么我沒(méi)有正確配置它 - 有什么建議嗎?
Either I'm not using a headers-exchange correctly or I'm not configuring it correctly - any advice ?
推薦答案
這樣不行;每個(gè)消費(fèi)者都需要一個(gè)單獨(dú)的隊(duì)列.請(qǐng)參閱教程.
It doesn't work that way; you need a separate queue for each consumer. See the tutorial.
當(dāng)多個(gè)消費(fèi)者從同一個(gè)隊(duì)列消費(fèi)時(shí),他們會(huì)競(jìng)爭(zhēng)所有消息;您不能在消費(fèi)者端選擇消息;選擇"是由交換機(jī)通過(guò)將消息路由到特定隊(duì)列來(lái)完成的.
When multiple consumers consume from the same queue they compete for all messages; you can't select messages on the consumer side; the "selection" is done by the exchange by routing messages to specific queue(s).
這篇關(guān)于僅使用 RabbitMQ 和 SpringAMQP 使用具有某些標(biāo)頭的消息的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!