Java源码示例:org.axonframework.eventhandling.EventMessage

示例1
private PlatformDocument getPlatformAtPointInTime(String platformId, Long timestamp) {
    DomainEventStream eventStream = eventStorageEngine.readEvents(platformId).filter(domainEventMessage ->
            (timestamp == null || domainEventMessage.getTimestamp().toEpochMilli() <= timestamp)
                    && !domainEventMessage.getPayloadType().equals(RestoreDeletedPlatformEvent.class)
    );
    InmemoryPlatformRepository inmemoryPlatformRepository = new InmemoryPlatformRepository();
    AnnotationEventListenerAdapter eventHandlerAdapter = new AnnotationEventListenerAdapter(new MongoPlatformProjectionRepository(inmemoryPlatformRepository));
    boolean zeroEventsBeforeTimestamp = true;
    while (eventStream.hasNext()) {
        zeroEventsBeforeTimestamp = false;
        try {
            EventMessage<?> event = eventStream.next();
            eventHandlerAdapter.handle(event);
        } catch (Exception error) {
            throw new UnreplayablePlatformEventsException(timestamp, error);
        }
    }
    if (zeroEventsBeforeTimestamp) {
        throw new InexistantPlatformAtTimeException(timestamp);
    }
    return inmemoryPlatformRepository.getCurrentPlatformDocument();
}
 
示例2
@Qualifier("trade-events")
@Bean
public SubscribableMessageSource<EventMessage<?>> tradeEvents(AMQPMessageConverter messageConverter) {
    return new SpringAMQPMessageSource(messageConverter) {
        @Transactional
        @RabbitListener(queues = "trades")
        @Override
        public void onMessage(Message message, Channel channel) {
            super.onMessage(message, channel);
        }
    };
}
 
示例3
/**
 * Send {@code events} to the configured Kafka {@code topic}. It takes the current Unit of Work into account when
 * available.
 * <p>
 * If {@link ProducerFactory} is configured to use:
 * </p>
 * <ul>
 * <li>Transactions: use kafka transactions for publishing events</li>
 * <li>Ack: send messages and wait for acknowledgement from Kafka. Acknowledgement timeout can be configured via
 * {@link KafkaPublisher.Builder#publisherAckTimeout(long)}).</li>
 * <li>None: fire and forget.</li>
 * </ul>
 *
 * @param event the events to publish on the Kafka broker.
 */
public <T extends EventMessage<?>> void send(T event) {
    UnitOfWork<?> uow = CurrentUnitOfWork.get();

    MonitorCallback monitorCallback = messageMonitor.onMessageIngested(event);
    Producer<K, V> producer = producerFactory.createProducer();
    ConfirmationMode confirmationMode = producerFactory.confirmationMode();

    if (confirmationMode.isTransactional()) {
        tryBeginTxn(producer);
    }

    // Send's event messages to Kafka and receive a future indicating the status.
    Future<RecordMetadata> publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic));

    uow.onPrepareCommit(u -> {
        if (confirmationMode.isTransactional()) {
            tryCommit(producer, monitorCallback);
        } else if (confirmationMode.isWaitForAck()) {
            waitForPublishAck(publishStatus, monitorCallback);
        }
        tryClose(producer);
    });

    uow.onRollback(u -> {
        if (confirmationMode.isTransactional()) {
            tryRollback(producer);
        }
        tryClose(producer);
    });
}
 
示例4
/**
 * {@inheritDoc}
 * <p/>
 * Any subscribed Event Processor will be placed in the same Consumer Group, defined through the (mandatory) {@link
 * Builder#groupId(String)} method.
 */
@Override
public Registration subscribe(java.util.function.Consumer<List<? extends EventMessage<?>>> eventProcessor) {
    if (this.eventProcessors.add(eventProcessor)) {
        logger.debug("Event Processor [{}] subscribed successfully", eventProcessor);
    } else {
        logger.info("Event Processor [{}] not added. It was already subscribed", eventProcessor);
    }

    if (autoStart) {
        logger.info("Starting event consumption as auto start is enabled");
        start();
    }

    return () -> {
        if (eventProcessors.remove(eventProcessor)) {
            logger.debug("Event Processor [{}] unsubscribed successfully", eventProcessor);
            if (eventProcessors.isEmpty() && autoStart) {
                logger.info("Closing event consumption as auto start is enabled");
                close();
            }
            return true;
        } else {
            logger.info("Event Processor [{}] not removed. It was already unsubscribed", eventProcessor);
            return false;
        }
    };
}
 
示例5
@Override
public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
    try {
        Headers headers = consumerRecord.headers();
        if (isAxonMessage(headers)) {
            byte[] messageBody = consumerRecord.value();
            SerializedMessage<?> message = extractSerializedMessage(headers, messageBody);
            return buildMessage(headers, message);
        }
    } catch (Exception e) {
        logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
    }

    return Optional.empty();
}
 
示例6
private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMessage<?> message, long timestamp) {
    return Optional.of(new GenericDomainEventMessage<>(
            valueAsString(headers, AGGREGATE_TYPE),
            valueAsString(headers, AGGREGATE_ID),
            valueAsLong(headers, AGGREGATE_SEQ),
            message,
            () -> Instant.ofEpochMilli(timestamp)
    ));
}
 
示例7
/**
 * Generates Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}, using the given
 * {@code headerValueMapper} to correctly map the values to byte arrays.
 *
 * @param eventMessage      the {@link EventMessage} to create headers for
 * @param serializedObject  the serialized payload of the given {@code eventMessage}
 * @param headerValueMapper function for converting {@code values} to bytes. Since {@link RecordHeader} can handle
 *                          only bytes this function needs to define the logic how to convert a given value to
 *                          bytes. See {@link HeaderUtils#byteMapper()} for sample implementation
 * @return the generated Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}
 */
public static Headers toHeaders(EventMessage<?> eventMessage,
                                SerializedObject<byte[]> serializedObject,
                                BiFunction<String, Object, RecordHeader> headerValueMapper) {
    notNull(eventMessage, () -> "EventMessage may not be null");
    notNull(serializedObject, () -> "SerializedObject may not be null");
    notNull(headerValueMapper, () -> "Header key-value mapper function may not be null");

    RecordHeaders headers = new RecordHeaders();
    eventMessage.getMetaData()
                .forEach((k, v) -> ((Headers) headers).add(headerValueMapper.apply(generateMetadataKey(k), v)));
    defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> addHeader(headers, k, v));
    return headers;
}
 
示例8
@Test
public void testGeneratingHeadersForEventMessageShouldGenerateEventHeaders() {
    String metaKey = "someHeaderKey";
    EventMessage<Object> evt = asEventMessage("SomePayload").withMetaData(
            MetaData.with(metaKey, "someValue")
    );
    SerializedObject<byte[]> so = serializedObject();
    Headers headers = toHeaders(evt, so, byteMapper());

    assertEventHeaders(metaKey, evt, so, headers);
}
 
示例9
@Test
public void testWritingEventMessageAsKafkaMessageShouldAppendEventHeaders() {
    EventMessage<?> expected = eventMessage();
    ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
    SerializedObject<byte[]> serializedObject = expected.serializePayload(serializer, byte[].class);

    assertEventHeaders("key", expected, serializedObject, senderMessage.headers());
}
 
示例10
@Test
public void testReadingMessageWithoutIdShouldReturnEmptyMessage() {
    EventMessage<?> event = eventMessage();
    ProducerRecord<String, byte[]> msg = testSubject.createKafkaMessage(event, SOME_TOPIC);
    msg.headers().remove(MESSAGE_ID);

    assertThat(testSubject.readKafkaMessage(toReceiverRecord(msg)).isPresent()).isFalse();
}
 
示例11
@Test
public void testReadingMessageWithoutTypeShouldReturnEmptyMessage() {
    EventMessage<?> event = eventMessage();
    ProducerRecord<String, byte[]> msg = testSubject.createKafkaMessage(event, SOME_TOPIC);
    msg.headers().remove(MESSAGE_TYPE);

    assertThat(testSubject.readKafkaMessage(toReceiverRecord(msg)).isPresent()).isFalse();
}
 
示例12
@SuppressWarnings("unchecked")
@Test
public void testReadingMessagePayloadDifferentThanByteShouldReturnEmptyMessage() {
    EventMessage<Object> eventMessage = eventMessage();
    SerializedObject serializedObject = mock(SerializedObject.class);
    when(serializedObject.getType()).thenReturn(new SimpleSerializedType("foo", null));
    Headers headers = toHeaders(eventMessage, serializedObject, byteMapper());
    ConsumerRecord payloadDifferentThanByte = new ConsumerRecord(
            "foo", 0, 0, NO_TIMESTAMP, NO_TIMESTAMP_TYPE,
            -1L, NULL_SIZE, NULL_SIZE, 1, "123", headers
    );

    assertThat(testSubject.readKafkaMessage(payloadDifferentThanByte).isPresent()).isFalse();
}
 
示例13
@Test
public void testWritingEventMessageShouldBeReadAsEventMessage() {
    EventMessage<?> expected = eventMessage();
    ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
    EventMessage<?> actual = receiverMessage(senderMessage);

    assertEventMessage(actual, expected);
}
 
示例14
@Test
public void testWritingEventMessageWithNullRevisionShouldWriteRevisionAsNull() {
    testSubject = DefaultKafkaMessageConverter.builder()
                                              .serializer(XStreamSerializer.builder().build())
                                              .build();
    EventMessage<?> eventMessage = eventMessage();
    ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(eventMessage, SOME_TOPIC);

    assertThat(valueAsString(senderMessage.headers(), MESSAGE_REVISION)).isNull();
}
 
示例15
@Test
public void testWritingDomainEventMessageShouldBeReadAsDomainMessage() {
    DomainEventMessage<?> expected = domainMessage();
    ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
    EventMessage<?> actual = receiverMessage(senderMessage);

    assertEventMessage(actual, expected);
    assertDomainMessage((DomainEventMessage<?>) actual, expected);
}
 
示例16
private static void assertEventMessage(EventMessage<?> actual, EventMessage<?> expected) {
    assertThat(actual.getIdentifier()).isEqualTo(expected.getIdentifier());
    assertEquals(actual.getPayloadType(), (expected.getPayloadType()));
    assertThat(actual.getMetaData()).isEqualTo(expected.getMetaData());
    assertThat(actual.getPayload()).isEqualTo(expected.getPayload());
    assertThat(actual.getTimestamp().toEpochMilli()).isEqualTo(expected.getTimestamp().toEpochMilli());
}
 
示例17
public static void assertEventHeaders(String metaDataKey,
                                      EventMessage<?> eventMessage,
                                      SerializedObject<byte[]> so,
                                      Headers headers) {
    assertThat(headers.toArray().length).isGreaterThanOrEqualTo(5);
    assertThat(valueAsString(headers, MESSAGE_ID)).isEqualTo(eventMessage.getIdentifier());
    assertThat(valueAsLong(headers, MESSAGE_TIMESTAMP)).isEqualTo(eventMessage.getTimestamp().toEpochMilli());
    assertThat(valueAsString(headers, MESSAGE_TYPE)).isEqualTo(so.getType().getName());
    assertThat(valueAsString(headers, MESSAGE_REVISION)).isEqualTo(so.getType().getRevision());
    assertThat(valueAsString(headers, generateMetadataKey(metaDataKey)))
            .isEqualTo(eventMessage.getMetaData().get(metaDataKey));
}
 
示例18
@Autowired
public void config(EventProcessingConfiguration epConfig,
                   @Qualifier("trade-events") SubscribableMessageSource<EventMessage<?>> tradeEvents) {
    epConfig.registerSubscribingEventProcessor("trading", c -> tradeEvents);
}
 
示例19
@Override
public Object handle(EventMessage<?> event) {
    kafkaPublisher.send(event);
    return null;
}
 
示例20
private String recordKey(EventMessage<?> eventMessage) {
    Object sequenceIdentifier = sequencingPolicy.getSequenceIdentifierFor(eventMessage);
    return sequenceIdentifier != null ? sequenceIdentifier.toString() : null;
}
 
示例21
private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
    long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP);
    return headers.lastHeader(AGGREGATE_ID) != null
            ? buildDomainEvent(headers, message, timestamp)
            : buildEvent(message, timestamp);
}
 
示例22
private Optional<EventMessage<?>> buildEvent(SerializedMessage<?> message, long timestamp) {
    return Optional.of(new GenericEventMessage<>(message, () -> Instant.ofEpochMilli(timestamp)));
}
 
示例23
private static KafkaEventMessage message(int partition, int offset, int timestamp) {
    EventMessage<Object> eventMessage = asEventMessage(offset + "abc" + (offset * 17 + 123));
    return new KafkaEventMessage(asTrackedEventMessage(eventMessage, null), partition, offset, timestamp);
}
 
示例24
private static KafkaEventMessage message(int partition, int offset, int timestamp) {
    EventMessage<Object> eventMessage = asEventMessage(offset + "abc" + (offset * 17 + 123));
    return new KafkaEventMessage(asTrackedEventMessage(eventMessage, null), partition, offset, timestamp);
}
 
示例25
@Override
public ProducerRecord<String, String> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
    throw new UnsupportedOperationException();
}
 
示例26
@Override
public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, String> consumerRecord) {
    return Optional.of(asEventMessage(consumerRecord.value()));
}
 
示例27
private static EventMessage<Object> eventMessage() {
    return asEventMessage("SomePayload").withMetaData(MetaData.with("key", "value"));
}
 
示例28
private EventMessage<?> receiverMessage(ProducerRecord<String, byte[]> senderMessage) {
    return testSubject.readKafkaMessage(
            toReceiverRecord(senderMessage)).orElseThrow(() -> new AssertionError("Expected valid message")
    );
}
 
示例29
/**
 * Construct a {@link KafkaEventMessage} based on the deserialized body, the {@code eventMessage}, of a {@link
 * ConsumerRecord} retrieved from a Kafka topic. The {@code trackingToken} is used to change the {@code
 * eventMessage} in an {@link TrackedEventMessage}.
 *
 * @param eventMessage   the {@link EventMessage} to wrap
 * @param consumerRecord the {@link ConsumerRecord} which the given {@code eventMessage} was the body of
 * @param trackingToken  the {@link KafkaTrackingToken} defining the position of this message
 * @return the {@link KafkaEventMessage} constructed from the given {@code eventMessage}, {@code consumerRecord} and
 * {@code trackingToken}
 */
public static KafkaEventMessage from(EventMessage<?> eventMessage,
                                     ConsumerRecord<?, ?> consumerRecord,
                                     KafkaTrackingToken trackingToken) {
    return new KafkaEventMessage(
            asTrackedEventMessage(eventMessage, trackingToken),
            consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp()
    );
}
 
示例30
/**
 * {@inheritDoc}
 * <p/>
 * Note that the {@link ProducerRecord} created through this method sets the {@link ProducerRecord#timestamp()} to
 * {@code null}. Doing so will ensure the used Producer sets a timestamp itself for the record. The
 * {@link EventMessage#getTimestamp()} field is however still taken into account, but as headers.
 * <p/>
 * Additional note that the ProducerRecord will be given a {@code null} {@link ProducerRecord#partition()} value.
 * In return, the {@link ProducerRecord#key()} field is defined by using the configured {@link SequencingPolicy} to
 * retrieve the given {@code eventMessage}'s {@code sequenceIdentifier}. The combination of a {@code null}
 * partition and the possibly present or empty key will define which partition the Producer will choose to dispatch
 * the record on.
 *
 * @see ProducerRecord
 */
@Override
public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
    SerializedObject<byte[]> serializedObject = eventMessage.serializePayload(serializer, byte[].class);
    return new ProducerRecord<>(
            topic, null, null, recordKey(eventMessage),
            serializedObject.getData(),
            toHeaders(eventMessage, serializedObject, headerValueMapper)
    );
}