Java源码示例:org.springframework.amqp.core.MessageListener
示例1
@Bean
MessageListener exampleListener(final RabbitTemplate rabbitTemplate) {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
try {
String payload = new ObjectMapper().writeValueAsString(new MyPojo(
"992e46d8-ab05-4a26-a740-6ef7b0daeab3", "CREATED"));
Message outputMessage = MessageBuilder.withBody(payload.getBytes())
.build();
rabbitTemplate.send(issue178OutputExchange().getName(), "routingkey",
outputMessage);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
};
}
示例2
@Override
public void send(Message message, String destination) {
final String routingKey = message.getMessageProperties().getReceivedRoutingKey();
List<SimpleMessageListenerContainer> listenerContainers = this.messageListenerAccessor
.getListenerContainersForDestination(destination, routingKey);
if (listenerContainers.isEmpty()) {
throw new IllegalStateException(
"no listeners found for destination " + destination);
}
for (SimpleMessageListenerContainer listenerContainer : listenerContainers) {
Object messageListener = listenerContainer.getMessageListener();
if (isChannelAwareListener(listenerContainer, messageListener)) {
try {
((ChannelAwareMessageListener) messageListener).onMessage(message,
createChannel(listenerContainer, transactionalChannel()));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
else {
((MessageListener) messageListener).onMessage(message);
}
}
}
示例3
public MessageListenerProxy(MessageListener listener, String jobName) {
this.delegate = listener;
this.jobName = jobName;
}
示例4
protected MessageListener createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) {
return new RabbitChannelMessageListenerAdapter(eventRegistry, inboundChannelModel);
}
示例5
/**
* Receive messages for the given queue
* @param name
* @param errorHandler
* @return
*/
@GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {
DestinationsConfig.DestinationInfo d = destinationsConfig.getQueues()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}
MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(d.getRoutingKey());
Flux<String> f = Flux.<String> create(emitter -> {
log.info("[I168] Adding listener, queue={}", d.getRoutingKey());
mlc.setupMessageListener((MessageListener) m -> {
String qname = m.getMessageProperties()
.getConsumerQueue();
log.info("[I137] Message received, queue={}", qname);
if (emitter.isCancelled()) {
log.info("[I166] cancelled, queue={}", qname);
mlc.stop();
return;
}
String payload = new String(m.getBody());
emitter.next(payload);
log.info("[I176] Message sent to client, queue={}", qname);
});
emitter.onRequest(v -> {
log.info("[I171] Starting container, queue={}", d.getRoutingKey());
mlc.start();
});
emitter.onDispose(() -> {
log.info("[I176] onDispose: queue={}", d.getRoutingKey());
mlc.stop();
});
log.info("[I171] Container started, queue={}", d.getRoutingKey());
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> {
log.info("[I209] sending keepalive message...");
return "No news is good news";
})
.mergeWith(f);
}
示例6
@GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
DestinationsConfig.DestinationInfo d = destinationsConfig.getTopics()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}
Queue topicQueue = createTopicQueue(d);
String qname = topicQueue.getName();
MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
Flux<String> f = Flux.<String> create(emitter -> {
log.info("[I168] Adding listener, queue={}", qname);
mlc.setupMessageListener((MessageListener) m -> {
log.info("[I137] Message received, queue={}", qname);
if (emitter.isCancelled()) {
log.info("[I166] cancelled, queue={}", qname);
mlc.stop();
return;
}
String payload = new String(m.getBody());
emitter.next(payload);
log.info("[I176] Message sent to client, queue={}", qname);
});
emitter.onRequest(v -> {
log.info("[I171] Starting container, queue={}", qname);
mlc.start();
});
emitter.onDispose(() -> {
log.info("[I176] onDispose: queue={}", qname);
amqpAdmin.deleteQueue(qname);
mlc.stop();
});
log.info("[I171] Container started, queue={}", qname);
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> {
log.info("[I209] sending keepalive message...");
return "No news is good news";
})
.mergeWith(f);
}