private MessageBuilder(Message<T> originalMessage) {
Assert.notNull(originalMessage, "Message must not be null");
this.payload = originalMessage.getPayload();
this.originalMessage = originalMessage;
this.headerAccessor = new MessageHeaderAccessor(originalMessage);
private void sendInternal(Message<?> message) {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
Assert.notNull(destination, "Destination header required");
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
/**
* Encodes the given STOMP {@code message} into a {@code byte[]}.
* @param message the message to encode
* @return the encoded message
public byte[] encode(Message<byte[]> message) {
return encode(message.getHeaders(), message.getPayload());
@Override protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) { Charset charset = getContentTypeCharset(getMimeType(message.getHeaders())); Object payload = message.getPayload(); return (payload instanceof String ? payload : new String((byte[]) payload, charset));
@Nullable public Message<?> preHandle(Message<?> message) throws MessagingException { String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); if (!getBroadcastDestination().equals(destination)) { return message; SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); Assert.state(accessor != null, "No SimpMessageHeaderAccessor"); if (accessor.getSessionId() == null) { // Our own broadcast return null; destination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); if (logger.isTraceEnabled()) { logger.trace("Checking unresolved user destination: " + destination); SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); for (String name : accessor.toNativeHeaderMap().keySet()) { if (NO_COPY_LIST.contains(name)) { continue; newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name)); if (destination != null) { newAccessor.setDestination(destination); newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders());
protected void doAddMessage(Message<?> message) {
Assert.notNull(message, "'message' must not be null");
UUID messageId = message.getHeaders().getId();
Assert.notNull(messageId, "Cannot store messages without an ID header");
doStoreIfAbsent(this.messagePrefix + messageId, new MessageHolder(message));
@Override @Nullable public Message<byte[]> handleErrorMessageToClient(Message<byte[]> errorMessage) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class); Assert.notNull(accessor, "No StompHeaderAccessor"); if (!accessor.isMutable()) { accessor = StompHeaderAccessor.wrap(errorMessage); return handleInternal(accessor, errorMessage.getPayload(), null, null);
@Override @Nullable protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) { Assert.notNull(this.unmarshaller, "Property 'unmarshaller' is required"); try { Source source = getSource(message.getPayload()); Object result = this.unmarshaller.unmarshal(source); if (!targetClass.isInstance(result)) { throw new TypeMismatchException(result, targetClass); return result; catch (Exception ex) { throw new MessageConversionException(message, "Could not unmarshal XML: " + ex.getMessage(), ex);
/**
* Launches a Batch Job using the provided request {@link Message}. The payload
* of the {@link Message} <em>must</em> be an instance of {@link JobLaunchRequest}.
* @param requestMessage must not be null.
* @return Generally a {@link JobExecution} will always be returned. An
* exception ({@link MessageHandlingException}) will only be thrown if there
* is a failure to start the job. The cause of the exception will be a
* {@link JobExecutionException}.
* @throws MessageHandlingException when a job cannot be launched
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
Assert.notNull(requestMessage, "The provided requestMessage must not be null.");
final Object payload = requestMessage.getPayload();
Assert.isInstanceOf(JobLaunchRequest.class, payload, "The payload must be of type JobLaunchRequest.");
final JobLaunchRequest jobLaunchRequest = (JobLaunchRequest) payload;
final JobExecution jobExecution;
try {
jobExecution = this.jobLaunchingMessageHandler.launch(jobLaunchRequest);
} catch (JobExecutionException e) {
throw new MessageHandlingException(requestMessage, e);
return jobExecution;
/**
* Extract the SiMP session attributes from the given message and
* wrap them in a {@link SimpAttributes} instance.
* @param message the message to extract session attributes from
public static SimpAttributes fromMessage(Message<?> message) {
Assert.notNull(message, "Message must not be null");
MessageHeaders headers = message.getHeaders();
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
Map<String, Object> sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers);
if (sessionId == null) {
throw new IllegalStateException("No session id in " + message);
if (sessionAttributes == null) {
throw new IllegalStateException("No session attributes in " + message);
return new SimpAttributes(sessionId, sessionAttributes);
@Override protected void doSend(String destination, Message<?> message) { Assert.notNull(destination, "Destination must not be null"); SimpMessageHeaderAccessor simpAccessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); if (simpAccessor != null) { if (simpAccessor.isMutable()) { simpAccessor.setDestination(destination); simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); simpAccessor.setImmutable(); sendInternal(message); return; else { // Try and keep the original accessor type simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message); initHeaders(simpAccessor); else { simpAccessor = SimpMessageHeaderAccessor.wrap(message); initHeaders(simpAccessor); simpAccessor.setDestination(destination); simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders()); sendInternal(message);
protected final void doSend(MessageChannel channel, Message<?> message, long timeout) {
Assert.notNull(channel, "MessageChannel is required");
Message<?> messageToSend = message;
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
accessor.removeHeader(this.sendTimeoutHeader);
accessor.removeHeader(this.receiveTimeoutHeader);
accessor.setImmutable();
else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
|| message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
messageToSend = MessageBuilder.fromMessage(message)
.setHeader(this.sendTimeoutHeader, null)
.setHeader(this.receiveTimeoutHeader, null)
.build();
boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to channel '" + channel + "' within timeout: " + timeout);
@Override @Nullable protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) { Assert.notNull(channel, "'channel' is required"); Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); long sendTimeout = sendTimeout(requestMessage); long receiveTimeout = receiveTimeout(requestMessage); TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(this.throwExceptionOnLateReply); requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel) .setHeader(this.sendTimeoutHeader, null) .setHeader(this.receiveTimeoutHeader, null) .setErrorChannel(tempReplyChannel).build(); try { doSend(channel, requestMessage, sendTimeout); catch (RuntimeException ex) { tempReplyChannel.setSendFailed(true); throw ex; Message<?> replyMessage = this.doReceive(tempReplyChannel, receiveTimeout); if (replyMessage != null) { replyMessage = MessageBuilder.fromMessage(replyMessage) .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader) .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader) .build(); return replyMessage;
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); Assert.notNull(accessor, "No StompHeaderAccessor available"); byte[] payload = message.getPayload(); byte[] bytes = ENCODER.encode(accessor.getMessageHeaders(), payload); boolean useBinary = (payload.length > 0 && !(SockJsSession.class.isAssignableFrom(sessionType)) && MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(accessor.getContentType())); return (useBinary ? new BinaryMessage(bytes) : new TextMessage(bytes));
private void assertTextMessage(Message<?> message) {
assertNotNull("message should not be null", message);
assertEquals("Wrong payload", "Hello", message.getPayload());
assertEquals("Invalid foo property", "bar", message.getHeaders().get("foo"));
private void execute(Message<byte[]> message) {
if (logger.isTraceEnabled()) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null) {
logger.trace("Sending " + accessor.getDetailedLogMessage(message.getPayload()));