Java源码示例:org.axonframework.eventhandling.EventMessage
示例1
private PlatformDocument getPlatformAtPointInTime(String platformId, Long timestamp) {
DomainEventStream eventStream = eventStorageEngine.readEvents(platformId).filter(domainEventMessage ->
(timestamp == null || domainEventMessage.getTimestamp().toEpochMilli() <= timestamp)
&& !domainEventMessage.getPayloadType().equals(RestoreDeletedPlatformEvent.class)
);
InmemoryPlatformRepository inmemoryPlatformRepository = new InmemoryPlatformRepository();
AnnotationEventListenerAdapter eventHandlerAdapter = new AnnotationEventListenerAdapter(new MongoPlatformProjectionRepository(inmemoryPlatformRepository));
boolean zeroEventsBeforeTimestamp = true;
while (eventStream.hasNext()) {
zeroEventsBeforeTimestamp = false;
try {
EventMessage<?> event = eventStream.next();
eventHandlerAdapter.handle(event);
} catch (Exception error) {
throw new UnreplayablePlatformEventsException(timestamp, error);
}
}
if (zeroEventsBeforeTimestamp) {
throw new InexistantPlatformAtTimeException(timestamp);
}
return inmemoryPlatformRepository.getCurrentPlatformDocument();
}
示例2
@Qualifier("trade-events")
@Bean
public SubscribableMessageSource<EventMessage<?>> tradeEvents(AMQPMessageConverter messageConverter) {
return new SpringAMQPMessageSource(messageConverter) {
@Transactional
@RabbitListener(queues = "trades")
@Override
public void onMessage(Message message, Channel channel) {
super.onMessage(message, channel);
}
};
}
示例3
/**
* Send {@code events} to the configured Kafka {@code topic}. It takes the current Unit of Work into account when
* available.
* <p>
* If {@link ProducerFactory} is configured to use:
* </p>
* <ul>
* <li>Transactions: use kafka transactions for publishing events</li>
* <li>Ack: send messages and wait for acknowledgement from Kafka. Acknowledgement timeout can be configured via
* {@link KafkaPublisher.Builder#publisherAckTimeout(long)}).</li>
* <li>None: fire and forget.</li>
* </ul>
*
* @param event the events to publish on the Kafka broker.
*/
public <T extends EventMessage<?>> void send(T event) {
UnitOfWork<?> uow = CurrentUnitOfWork.get();
MonitorCallback monitorCallback = messageMonitor.onMessageIngested(event);
Producer<K, V> producer = producerFactory.createProducer();
ConfirmationMode confirmationMode = producerFactory.confirmationMode();
if (confirmationMode.isTransactional()) {
tryBeginTxn(producer);
}
// Send's event messages to Kafka and receive a future indicating the status.
Future<RecordMetadata> publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic));
uow.onPrepareCommit(u -> {
if (confirmationMode.isTransactional()) {
tryCommit(producer, monitorCallback);
} else if (confirmationMode.isWaitForAck()) {
waitForPublishAck(publishStatus, monitorCallback);
}
tryClose(producer);
});
uow.onRollback(u -> {
if (confirmationMode.isTransactional()) {
tryRollback(producer);
}
tryClose(producer);
});
}
示例4
/**
* {@inheritDoc}
* <p/>
* Any subscribed Event Processor will be placed in the same Consumer Group, defined through the (mandatory) {@link
* Builder#groupId(String)} method.
*/
@Override
public Registration subscribe(java.util.function.Consumer<List<? extends EventMessage<?>>> eventProcessor) {
if (this.eventProcessors.add(eventProcessor)) {
logger.debug("Event Processor [{}] subscribed successfully", eventProcessor);
} else {
logger.info("Event Processor [{}] not added. It was already subscribed", eventProcessor);
}
if (autoStart) {
logger.info("Starting event consumption as auto start is enabled");
start();
}
return () -> {
if (eventProcessors.remove(eventProcessor)) {
logger.debug("Event Processor [{}] unsubscribed successfully", eventProcessor);
if (eventProcessors.isEmpty() && autoStart) {
logger.info("Closing event consumption as auto start is enabled");
close();
}
return true;
} else {
logger.info("Event Processor [{}] not removed. It was already unsubscribed", eventProcessor);
return false;
}
};
}
示例5
@Override
public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
try {
Headers headers = consumerRecord.headers();
if (isAxonMessage(headers)) {
byte[] messageBody = consumerRecord.value();
SerializedMessage<?> message = extractSerializedMessage(headers, messageBody);
return buildMessage(headers, message);
}
} catch (Exception e) {
logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
}
return Optional.empty();
}
示例6
private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMessage<?> message, long timestamp) {
return Optional.of(new GenericDomainEventMessage<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
message,
() -> Instant.ofEpochMilli(timestamp)
));
}
示例7
/**
* Generates Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}, using the given
* {@code headerValueMapper} to correctly map the values to byte arrays.
*
* @param eventMessage the {@link EventMessage} to create headers for
* @param serializedObject the serialized payload of the given {@code eventMessage}
* @param headerValueMapper function for converting {@code values} to bytes. Since {@link RecordHeader} can handle
* only bytes this function needs to define the logic how to convert a given value to
* bytes. See {@link HeaderUtils#byteMapper()} for sample implementation
* @return the generated Kafka {@link Headers} based on an {@link EventMessage} and {@link SerializedObject}
*/
public static Headers toHeaders(EventMessage<?> eventMessage,
SerializedObject<byte[]> serializedObject,
BiFunction<String, Object, RecordHeader> headerValueMapper) {
notNull(eventMessage, () -> "EventMessage may not be null");
notNull(serializedObject, () -> "SerializedObject may not be null");
notNull(headerValueMapper, () -> "Header key-value mapper function may not be null");
RecordHeaders headers = new RecordHeaders();
eventMessage.getMetaData()
.forEach((k, v) -> ((Headers) headers).add(headerValueMapper.apply(generateMetadataKey(k), v)));
defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> addHeader(headers, k, v));
return headers;
}
示例8
@Test
public void testGeneratingHeadersForEventMessageShouldGenerateEventHeaders() {
String metaKey = "someHeaderKey";
EventMessage<Object> evt = asEventMessage("SomePayload").withMetaData(
MetaData.with(metaKey, "someValue")
);
SerializedObject<byte[]> so = serializedObject();
Headers headers = toHeaders(evt, so, byteMapper());
assertEventHeaders(metaKey, evt, so, headers);
}
示例9
@Test
public void testWritingEventMessageAsKafkaMessageShouldAppendEventHeaders() {
EventMessage<?> expected = eventMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
SerializedObject<byte[]> serializedObject = expected.serializePayload(serializer, byte[].class);
assertEventHeaders("key", expected, serializedObject, senderMessage.headers());
}
示例10
@Test
public void testReadingMessageWithoutIdShouldReturnEmptyMessage() {
EventMessage<?> event = eventMessage();
ProducerRecord<String, byte[]> msg = testSubject.createKafkaMessage(event, SOME_TOPIC);
msg.headers().remove(MESSAGE_ID);
assertThat(testSubject.readKafkaMessage(toReceiverRecord(msg)).isPresent()).isFalse();
}
示例11
@Test
public void testReadingMessageWithoutTypeShouldReturnEmptyMessage() {
EventMessage<?> event = eventMessage();
ProducerRecord<String, byte[]> msg = testSubject.createKafkaMessage(event, SOME_TOPIC);
msg.headers().remove(MESSAGE_TYPE);
assertThat(testSubject.readKafkaMessage(toReceiverRecord(msg)).isPresent()).isFalse();
}
示例12
@SuppressWarnings("unchecked")
@Test
public void testReadingMessagePayloadDifferentThanByteShouldReturnEmptyMessage() {
EventMessage<Object> eventMessage = eventMessage();
SerializedObject serializedObject = mock(SerializedObject.class);
when(serializedObject.getType()).thenReturn(new SimpleSerializedType("foo", null));
Headers headers = toHeaders(eventMessage, serializedObject, byteMapper());
ConsumerRecord payloadDifferentThanByte = new ConsumerRecord(
"foo", 0, 0, NO_TIMESTAMP, NO_TIMESTAMP_TYPE,
-1L, NULL_SIZE, NULL_SIZE, 1, "123", headers
);
assertThat(testSubject.readKafkaMessage(payloadDifferentThanByte).isPresent()).isFalse();
}
示例13
@Test
public void testWritingEventMessageShouldBeReadAsEventMessage() {
EventMessage<?> expected = eventMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
EventMessage<?> actual = receiverMessage(senderMessage);
assertEventMessage(actual, expected);
}
示例14
@Test
public void testWritingEventMessageWithNullRevisionShouldWriteRevisionAsNull() {
testSubject = DefaultKafkaMessageConverter.builder()
.serializer(XStreamSerializer.builder().build())
.build();
EventMessage<?> eventMessage = eventMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(eventMessage, SOME_TOPIC);
assertThat(valueAsString(senderMessage.headers(), MESSAGE_REVISION)).isNull();
}
示例15
@Test
public void testWritingDomainEventMessageShouldBeReadAsDomainMessage() {
DomainEventMessage<?> expected = domainMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
EventMessage<?> actual = receiverMessage(senderMessage);
assertEventMessage(actual, expected);
assertDomainMessage((DomainEventMessage<?>) actual, expected);
}
示例16
private static void assertEventMessage(EventMessage<?> actual, EventMessage<?> expected) {
assertThat(actual.getIdentifier()).isEqualTo(expected.getIdentifier());
assertEquals(actual.getPayloadType(), (expected.getPayloadType()));
assertThat(actual.getMetaData()).isEqualTo(expected.getMetaData());
assertThat(actual.getPayload()).isEqualTo(expected.getPayload());
assertThat(actual.getTimestamp().toEpochMilli()).isEqualTo(expected.getTimestamp().toEpochMilli());
}
示例17
public static void assertEventHeaders(String metaDataKey,
EventMessage<?> eventMessage,
SerializedObject<byte[]> so,
Headers headers) {
assertThat(headers.toArray().length).isGreaterThanOrEqualTo(5);
assertThat(valueAsString(headers, MESSAGE_ID)).isEqualTo(eventMessage.getIdentifier());
assertThat(valueAsLong(headers, MESSAGE_TIMESTAMP)).isEqualTo(eventMessage.getTimestamp().toEpochMilli());
assertThat(valueAsString(headers, MESSAGE_TYPE)).isEqualTo(so.getType().getName());
assertThat(valueAsString(headers, MESSAGE_REVISION)).isEqualTo(so.getType().getRevision());
assertThat(valueAsString(headers, generateMetadataKey(metaDataKey)))
.isEqualTo(eventMessage.getMetaData().get(metaDataKey));
}
示例18
@Autowired
public void config(EventProcessingConfiguration epConfig,
@Qualifier("trade-events") SubscribableMessageSource<EventMessage<?>> tradeEvents) {
epConfig.registerSubscribingEventProcessor("trading", c -> tradeEvents);
}
示例19
@Override
public Object handle(EventMessage<?> event) {
kafkaPublisher.send(event);
return null;
}
示例20
private String recordKey(EventMessage<?> eventMessage) {
Object sequenceIdentifier = sequencingPolicy.getSequenceIdentifierFor(eventMessage);
return sequenceIdentifier != null ? sequenceIdentifier.toString() : null;
}
示例21
private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP);
return headers.lastHeader(AGGREGATE_ID) != null
? buildDomainEvent(headers, message, timestamp)
: buildEvent(message, timestamp);
}
示例22
private Optional<EventMessage<?>> buildEvent(SerializedMessage<?> message, long timestamp) {
return Optional.of(new GenericEventMessage<>(message, () -> Instant.ofEpochMilli(timestamp)));
}
示例23
private static KafkaEventMessage message(int partition, int offset, int timestamp) {
EventMessage<Object> eventMessage = asEventMessage(offset + "abc" + (offset * 17 + 123));
return new KafkaEventMessage(asTrackedEventMessage(eventMessage, null), partition, offset, timestamp);
}
示例24
private static KafkaEventMessage message(int partition, int offset, int timestamp) {
EventMessage<Object> eventMessage = asEventMessage(offset + "abc" + (offset * 17 + 123));
return new KafkaEventMessage(asTrackedEventMessage(eventMessage, null), partition, offset, timestamp);
}
示例25
@Override
public ProducerRecord<String, String> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
throw new UnsupportedOperationException();
}
示例26
@Override
public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, String> consumerRecord) {
return Optional.of(asEventMessage(consumerRecord.value()));
}
示例27
private static EventMessage<Object> eventMessage() {
return asEventMessage("SomePayload").withMetaData(MetaData.with("key", "value"));
}
示例28
private EventMessage<?> receiverMessage(ProducerRecord<String, byte[]> senderMessage) {
return testSubject.readKafkaMessage(
toReceiverRecord(senderMessage)).orElseThrow(() -> new AssertionError("Expected valid message")
);
}
示例29
/**
* Construct a {@link KafkaEventMessage} based on the deserialized body, the {@code eventMessage}, of a {@link
* ConsumerRecord} retrieved from a Kafka topic. The {@code trackingToken} is used to change the {@code
* eventMessage} in an {@link TrackedEventMessage}.
*
* @param eventMessage the {@link EventMessage} to wrap
* @param consumerRecord the {@link ConsumerRecord} which the given {@code eventMessage} was the body of
* @param trackingToken the {@link KafkaTrackingToken} defining the position of this message
* @return the {@link KafkaEventMessage} constructed from the given {@code eventMessage}, {@code consumerRecord} and
* {@code trackingToken}
*/
public static KafkaEventMessage from(EventMessage<?> eventMessage,
ConsumerRecord<?, ?> consumerRecord,
KafkaTrackingToken trackingToken) {
return new KafkaEventMessage(
asTrackedEventMessage(eventMessage, trackingToken),
consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp()
);
}
示例30
/**
* {@inheritDoc}
* <p/>
* Note that the {@link ProducerRecord} created through this method sets the {@link ProducerRecord#timestamp()} to
* {@code null}. Doing so will ensure the used Producer sets a timestamp itself for the record. The
* {@link EventMessage#getTimestamp()} field is however still taken into account, but as headers.
* <p/>
* Additional note that the ProducerRecord will be given a {@code null} {@link ProducerRecord#partition()} value.
* In return, the {@link ProducerRecord#key()} field is defined by using the configured {@link SequencingPolicy} to
* retrieve the given {@code eventMessage}'s {@code sequenceIdentifier}. The combination of a {@code null}
* partition and the possibly present or empty key will define which partition the Producer will choose to dispatch
* the record on.
*
* @see ProducerRecord
*/
@Override
public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
SerializedObject<byte[]> serializedObject = eventMessage.serializePayload(serializer, byte[].class);
return new ProducerRecord<>(
topic, null, null, recordKey(eventMessage),
serializedObject.getData(),
toHeaders(eventMessage, serializedObject, headerValueMapper)
);
}