Java源码示例:org.springframework.amqp.core.MessageListener

示例1
@Bean
MessageListener exampleListener(final RabbitTemplate rabbitTemplate) {
	return new MessageListener() {
		public void onMessage(Message message) {
			System.out.println("received: " + message);
			try {
				String payload = new ObjectMapper().writeValueAsString(new MyPojo(
						"992e46d8-ab05-4a26-a740-6ef7b0daeab3", "CREATED"));
				Message outputMessage = MessageBuilder.withBody(payload.getBytes())
						.build();
				rabbitTemplate.send(issue178OutputExchange().getName(), "routingkey",
						outputMessage);
			}
			catch (JsonProcessingException e) {
				throw new RuntimeException(e);
			}
		}
	};
}
 
示例2
@Override
public void send(Message message, String destination) {
	final String routingKey = message.getMessageProperties().getReceivedRoutingKey();
	List<SimpleMessageListenerContainer> listenerContainers = this.messageListenerAccessor
			.getListenerContainersForDestination(destination, routingKey);
	if (listenerContainers.isEmpty()) {
		throw new IllegalStateException(
				"no listeners found for destination " + destination);
	}
	for (SimpleMessageListenerContainer listenerContainer : listenerContainers) {
		Object messageListener = listenerContainer.getMessageListener();
		if (isChannelAwareListener(listenerContainer, messageListener)) {
			try {
				((ChannelAwareMessageListener) messageListener).onMessage(message,
						createChannel(listenerContainer, transactionalChannel()));
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
		}
		else {
			((MessageListener) messageListener).onMessage(message);
		}
	}
}
 
示例3
public MessageListenerProxy(MessageListener listener, String jobName) {
    this.delegate = listener;
    this.jobName = jobName;
}
 
示例4
protected MessageListener createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) {
    return new RabbitChannelMessageListenerAdapter(eventRegistry, inboundChannelModel);
}
 
示例5
/**
 * Receive messages for the given queue
 * @param name
 * @param errorHandler 
 * @return
 */
@GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationsConfig.DestinationInfo d = destinationsConfig.getQueues()
        .get(name);

    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(d.getRoutingKey());

    Flux<String> f = Flux.<String> create(emitter -> {

        log.info("[I168] Adding listener, queue={}", d.getRoutingKey());
        mlc.setupMessageListener((MessageListener) m -> {

            String qname = m.getMessageProperties()
                .getConsumerQueue();

            log.info("[I137] Message received, queue={}", qname);

            if (emitter.isCancelled()) {
                log.info("[I166] cancelled, queue={}", qname);
                mlc.stop();
                return;
            }

            String payload = new String(m.getBody());
            emitter.next(payload);

            log.info("[I176] Message sent to client, queue={}", qname);

        });

        emitter.onRequest(v -> {
            log.info("[I171] Starting container, queue={}", d.getRoutingKey());
            mlc.start();
        });

        emitter.onDispose(() -> {
            log.info("[I176] onDispose: queue={}", d.getRoutingKey());
            mlc.stop();
        });

        log.info("[I171] Container started, queue={}", d.getRoutingKey());

    });
    

    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> {
            log.info("[I209] sending keepalive message...");
            return "No news is good news";
        })
        .mergeWith(f);
}
 
示例6
@GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {

    DestinationsConfig.DestinationInfo d = destinationsConfig.getTopics()
        .get(name);

    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }

    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();

    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);

    Flux<String> f = Flux.<String> create(emitter -> {

        log.info("[I168] Adding listener, queue={}", qname);

        mlc.setupMessageListener((MessageListener) m -> {

            log.info("[I137] Message received, queue={}", qname);

            if (emitter.isCancelled()) {
                log.info("[I166] cancelled, queue={}", qname);
                mlc.stop();
                return;
            }

            String payload = new String(m.getBody());
            emitter.next(payload);

            log.info("[I176] Message sent to client, queue={}", qname);

        });

        emitter.onRequest(v -> {
            log.info("[I171] Starting container, queue={}", qname);
            mlc.start();
        });

        emitter.onDispose(() -> {
            log.info("[I176] onDispose: queue={}", qname);
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            

        log.info("[I171] Container started, queue={}", qname);

      });
    
    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> {
            log.info("[I209] sending keepalive message...");
            return "No news is good news";
      })
      .mergeWith(f);

}