Java源码示例:org.apache.nifi.jms.processors.JMSConsumer.JMSResponse

示例1
/**
 * At the moment the only two supported message types are TextMessage and
 * BytesMessage which is sufficient for the type if JMS use cases NiFi is
 * used. The may change to the point where all message types are supported
 * at which point this test will no be longer required.
 */
@Test(expected = IllegalStateException.class)
public void validateFailOnUnsupportedMessageType() throws Exception {
    final String destinationName = "testQueue";
    JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

    jmsTemplate.send(destinationName, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createObjectMessage();
        }
    });

    JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
    try {
        consumer.consume(destinationName, new ConsumerCallback() {
            @Override
            public void accept(JMSResponse response) {
                // noop
            }
        });
    } finally {
        ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
    }
}
 
示例2
/**
 * At the moment the only two supported message types are TextMessage and
 * BytesMessage which is sufficient for the type if JMS use cases NiFi is
 * used. The may change to the point where all message types are supported
 * at which point this test will no be longer required.
 */
@Test(expected = IllegalStateException.class)
public void validateFailOnUnsupportedMessageType() throws Exception {
    final String destinationName = "testQueue";
    JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

    jmsTemplate.send(destinationName, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createObjectMessage();
        }
    });

    JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
    try {
        consumer.consume(destinationName, new ConsumerCallback() {
            @Override
            public void accept(JMSResponse response) {
                // noop
            }
        });
    } finally {
        ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
    }
}
 
示例3
public void validateFailOnUnsupportedMessageTypeOverJNDI() throws Exception {
    final String destinationName = "testQueue";
    JmsTemplate jmsTemplate = CommonTest.buildJmsJndiTemplateForDestination(false);

    jmsTemplate.send(destinationName, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createObjectMessage();
        }
    });

    JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
    try {
        consumer.consume(destinationName, new ConsumerCallback() {
            @Override
            public void accept(JMSResponse response) {
                // noop
            }
        });
    } finally {
        ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
    }
}
 
示例4
/**
 * At the moment the only two supported message types are TextMessage and
 * BytesMessage which is sufficient for the type if JMS use cases NiFi is
 * used. The may change to the point where all message types are supported
 * at which point this test will no be longer required.
 */
@Test
public void validateFailOnUnsupportedMessageType() throws Exception {
    final String destinationName = "validateFailOnUnsupportedMessageType";
    JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

    try {
        jmsTemplate.send(destinationName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage();
            }
        });

        JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
        consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
            @Override
            public void accept(JMSResponse response) {
                // noop
            }
        });
    } finally {
        ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
    }
}
 
示例5
/**
 * Will construct a {@link FlowFile} containing the body of the consumed JMS
 * message (if {@link GetResponse} returned by {@link JMSConsumer} is not
 * null) and JMS properties that came with message which are added to a
 * {@link FlowFile} as attributes, transferring {@link FlowFile} to
 * 'success' {@link Relationship}.
 */
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
    final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
    this.targetResource.consume(destinationName, new ConsumerCallback(){
        @Override
        public void accept(final JMSResponse response) {
            if (response != null){
                FlowFile flowFile = processSession.create();
                flowFile = processSession.write(flowFile, new OutputStreamCallback() {
                    @Override
                    public void process(final OutputStream out) throws IOException {
                        out.write(response.getMessageBody());
                    }
                });
                Map<String, Object> jmsHeaders = response.getMessageHeaders();
                Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
                flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
                processSession.getProvenanceReporter().receive(flowFile, destinationName);
                processSession.transfer(flowFile, REL_SUCCESS);
                processSession.commit();
            } else {
                context.yield();
            }
        }
    });
}
 
示例6
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
    final String destinationName = "testQueue";
    JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

    jmsTemplate.send(destinationName, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            TextMessage message = session.createTextMessage("hello from the other side");
            message.setStringProperty("foo", "foo");
            message.setBooleanProperty("bar", false);
            message.setJMSReplyTo(session.createQueue("fooQueue"));
            return message;
        }
    });

    JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
    final AtomicBoolean callbackInvoked = new AtomicBoolean();
    consumer.consume(destinationName, new ConsumerCallback() {
        @Override
        public void accept(JMSResponse response) {
            callbackInvoked.set(true);
            assertEquals("hello from the other side", new String(response.getMessageBody()));
            assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
            assertEquals("foo", response.getMessageProperties().get("foo"));
            assertEquals("false", response.getMessageProperties().get("bar"));
        }
    });
    assertTrue(callbackInvoked.get());

    ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
 
示例7
/**
 * Will construct a {@link FlowFile} containing the body of the consumed JMS
 * message (if {@link GetResponse} returned by {@link JMSConsumer} is not
 * null) and JMS properties that came with message which are added to a
 * {@link FlowFile} as attributes, transferring {@link FlowFile} to
 * 'success' {@link Relationship}.
 */
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
    final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
    this.targetResource.consume(destinationName, new ConsumerCallback(){
        @Override
        public void accept(final JMSResponse response) {
            if (response != null){
                FlowFile flowFile = processSession.create();
                flowFile = processSession.write(flowFile, new OutputStreamCallback() {
                    @Override
                    public void process(final OutputStream out) throws IOException {
                        out.write(response.getMessageBody());
                    }
                });
                Map<String, Object> jmsHeaders = response.getMessageHeaders();
                Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
                flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
                processSession.getProvenanceReporter().receive(flowFile, destinationName);
                processSession.transfer(flowFile, REL_SUCCESS);
                processSession.commit();
            } else {
                context.yield();
            }
        }
    });
}
 
示例8
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
    final String destinationName = "testQueue";
    JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

    jmsTemplate.send(destinationName, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            TextMessage message = session.createTextMessage("hello from the other side");
            message.setStringProperty("foo", "foo");
            message.setBooleanProperty("bar", false);
            message.setJMSReplyTo(session.createQueue("fooQueue"));
            return message;
        }
    });

    JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
    final AtomicBoolean callbackInvoked = new AtomicBoolean();
    consumer.consume(destinationName, new ConsumerCallback() {
        @Override
        public void accept(JMSResponse response) {
            callbackInvoked.set(true);
            assertEquals("hello from the other side", new String(response.getMessageBody()));
            assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
            assertEquals("foo", response.getMessageProperties().get("foo"));
            assertEquals("false", response.getMessageProperties().get("bar"));
        }
    });
    assertTrue(callbackInvoked.get());

    ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
 
示例9
@Test
public void validateConsumeWithCustomHeadersAndPropertiesOverJNDI() throws Exception {
    final String destinationName = "testQueue";
    JmsTemplate jmsTemplate = CommonTest.buildJmsJndiTemplateForDestination(false);

    jmsTemplate.send(destinationName, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            TextMessage message = session.createTextMessage("hello from the other side");
            message.setStringProperty("foo", "foo");
            message.setBooleanProperty("bar", false);
            message.setJMSReplyTo(session.createQueue("fooQueue"));
            return message;
        }
    });

    JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
    final AtomicBoolean callbackInvoked = new AtomicBoolean();
    consumer.consume(destinationName, new ConsumerCallback() {
        @Override
        public void accept(JMSResponse response) {
            callbackInvoked.set(true);
            assertEquals("hello from the other side", new String(response.getMessageBody()));
            assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
            assertEquals("foo", response.getMessageProperties().get("foo"));
            assertEquals("false", response.getMessageProperties().get("bar"));
        }
    });
    assertTrue(callbackInvoked.get());

    ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
 
示例10
/**
 * Will construct a {@link FlowFile} containing the body of the consumed JMS
 * message (if {@link JMSResponse} returned by {@link JMSConsumer} is not
 * null) and JMS properties that came with message which are added to a
 * {@link FlowFile} as attributes, transferring {@link FlowFile} to
 * 'success' {@link Relationship}.
 */
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException {
    final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
    final String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
    final boolean durable = isDurableSubscriber(context);
    final boolean shared = isShared(context);
    final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
    final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();

    try {
        consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
            @Override
            public void accept(final JMSResponse response) {
                if (response == null) {
                    return;
                }

                FlowFile flowFile = processSession.create();
                flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));

                final Map<String, String> jmsHeaders = response.getMessageHeaders();
                final Map<String, String> jmsProperties = response.getMessageProperties();

                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
                flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);

                processSession.getProvenanceReporter().receive(flowFile, destinationName);
                processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
                processSession.transfer(flowFile, REL_SUCCESS);
                processSession.commit();
            }
        });
    } catch(Exception e) {
        consumer.setValid(false);
        context.yield();
        throw e; // for backward compatibility with exception handling in flows
    }
}
 
示例11
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
    final String destinationName = "validateConsumeWithCustomHeadersAndProperties";
    JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);

    try {
        jmsTemplate.send(destinationName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("hello from the other side");
                message.setStringProperty("foo", "foo");
                message.setBooleanProperty("bar", false);
                message.setJMSReplyTo(session.createQueue("fooQueue"));
                return message;
            }
        });

        JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
        final AtomicBoolean callbackInvoked = new AtomicBoolean();
        consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
            @Override
            public void accept(JMSResponse response) {
                callbackInvoked.set(true);
                assertEquals("hello from the other side", new String(response.getMessageBody()));
                assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
                assertEquals("foo", response.getMessageProperties().get("foo"));
                assertEquals("false", response.getMessageProperties().get("bar"));
            }
        });
        assertTrue(callbackInvoked.get());

    } finally {
        ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
    }
}
 
示例12
@Test(timeout = 20000)
public void testMultipleThreads() throws Exception {
    String destinationName = "testMultipleThreads";
    JmsTemplate publishTemplate = CommonTest.buildJmsTemplateForDestination(false);
    final CountDownLatch consumerTemplateCloseCount = new CountDownLatch(4);

    try {
        JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), publishTemplate, mock(ComponentLog.class));
        for (int i = 0; i < 4000; i++) {
            publisher.publish(destinationName, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
        }

        final AtomicInteger msgCount = new AtomicInteger(0);

        final ConsumerCallback callback = new ConsumerCallback() {
            @Override
            public void accept(JMSResponse response) {
                msgCount.incrementAndGet();
            }
        };

        final Thread[] threads = new Thread[4];
        for (int i = 0; i < 4; i++) {
            final Thread t = new Thread(() -> {
                JmsTemplate consumeTemplate = CommonTest.buildJmsTemplateForDestination(false);

                try {
                    JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));

                    for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
                        consumer.consume(destinationName, null, false, false, null, "UTF-8", callback);
                    }
                } finally {
                    ((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
                    consumerTemplateCloseCount.countDown();
                }
            });

            threads[i] = t;
            t.start();
        }

        int iterations = 0;
        while (msgCount.get() < 4000) {
            Thread.sleep(10L);
            if (++iterations % 100 == 0) {
                System.out.println(msgCount.get() + " messages received so far");
            }
        }
    } finally {
        ((CachingConnectionFactory) publishTemplate.getConnectionFactory()).destroy();

        consumerTemplateCloseCount.await();
    }
}