The following examples show how to use
org.springframework.messaging.Message
.
You can vote up the ones you like or vote down the ones you don't like,
and go to the original project or source file by following the links above each example. You may check out the related API usage on the sidebar.
@Test // SPR-14690
public void handleMessageFromClientWithTokenAuthentication() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
channel.addInterceptor(new AuthenticationInterceptor("[email protected]"));
channel.addInterceptor(new ImmutableMessageChannelInterceptor());
TestMessageHandler messageHandler = new TestMessageHandler();
channel.subscribe(messageHandler);
StompSubProtocolHandler handler = new StompSubProtocolHandler();
handler.afterSessionStarted(this.session, channel);
TextMessage wsMessage = StompTextMessageBuilder.create(StompCommand.CONNECT).build();
handler.handleMessageFromClient(this.session, wsMessage, channel);
assertEquals(1, messageHandler.getMessages().size());
Message<?> message = messageHandler.getMessages().get(0);
Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
assertNotNull(user);
assertEquals("[email protected]", user.getName());
@Test
public void clientInboundChannelSendMessage() throws Exception {
ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);
List<ChannelInterceptor> interceptors = channel.getInterceptors();
assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());
TestWebSocketSession session = new TestWebSocketSession("s1");
session.setOpen(true);
webSocketHandler.afterConnectionEstablished(session);
webSocketHandler.handleMessage(session,
StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build());
Message<?> message = channel.messages.get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertNotNull(accessor);
assertFalse(accessor.isMutable());
assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
assertEquals("/foo", accessor.getDestination());
@Test
public void replyWithCustomTimeToLive() throws JMSException {
Session session = mock(Session.class);
Queue replyDestination = mock(Queue.class);
given(session.createQueue("queueOut")).willReturn(replyDestination);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage("Response")).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class);
QosSettings settings = new QosSettings();
settings.setTimeToLive(6000);
listener.setResponseQosSettings(settings);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session).createQueue("queueOut");
verify(session).createTextMessage("Response");
verify(messageProducer).send(responseMessage, javax.jms.Message.DEFAULT_DELIVERY_MODE,
javax.jms.Message.DEFAULT_PRIORITY, 6000);
verify(messageProducer).close();
private Object convertPayload(Message<?> message, MethodParameter parameter, Class<?> targetPayloadType) {
Object result = null;
if (this.converter instanceof SmartMessageConverter) {
SmartMessageConverter smartConverter = (SmartMessageConverter) this.converter;
result = smartConverter.fromMessage(message, targetPayloadType, parameter);
else if (this.converter != null) {
result = this.converter.fromMessage(message, targetPayloadType);
if (result == null) {
throw new MessageConversionException(message, "No converter found from actual payload type '" +
ClassUtils.getDescriptiveType(message.getPayload()) + "' to expected payload type '" +
ClassUtils.getQualifiedName(targetPayloadType) + "'");
return result;
public void ack() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String messageId = "123";
this.session.acknowledge(messageId, true);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.ACK, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
assertEquals(messageId, stompHeaders.getId());
@Test
public void attemptToReadDisallowedUserPropertyIsNotFatal() throws JMSException {
javax.jms.Message jmsMessage = new StubTextMessage() {
@Override
public Object getObjectProperty(String name) throws JMSException {
if (name.equals("fail")) {
throw new JMSException("illegal property");
else {
return super.getObjectProperty(name);
jmsMessage.setBooleanProperty("fail", true);
assertAttemptReadDisallowedPropertyIsNotFatal(jmsMessage, "fail");
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
ApplicationContext context = loadConfig(SimpleBrokerConfig.class);
TestChannel channel = context.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler =
context.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setSubscriptionId("subs1");
headers.setDestination("/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
messageHandler.handleMessage(message);
message = channel.messages.get(0);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/foo", headers.getDestination());
assertEquals("bar", new String((byte[]) message.getPayload()));
@Test
public void leaveMutableDefaultBehavior() {
MessageHeaderAccessor accessor = new MessageHeaderAccessor();
accessor.setHeader("foo", "bar");
MessageHeaders headers = accessor.getMessageHeaders();
Message<?> message = MessageBuilder.createMessage("payload", headers);
this.thrown.expect(IllegalStateException.class);
this.thrown.expectMessage("Already immutable");
accessor.setLeaveMutable(true);
this.thrown.expect(IllegalStateException.class);
this.thrown.expectMessage("Already immutable");
accessor.setHeader("foo", "baz");
assertEquals("bar", headers.get("foo"));
assertSame(accessor, MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class));
@Override
public Subscription subscribe(StompHeaders headers, StompFrameHandler handler) {
Assert.hasText(headers.getDestination(), "Destination header is required");
Assert.notNull(handler, "StompFrameHandler must not be null");
String subscriptionId = headers.getId();
if (!StringUtils.hasText(subscriptionId)) {
subscriptionId = String.valueOf(DefaultStompSession.this.subscriptionIndex.getAndIncrement());
headers.setId(subscriptionId);
checkOrAddReceipt(headers);
Subscription subscription = new DefaultSubscription(headers, handler);
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE);
accessor.addNativeHeaders(headers);
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message);
return subscription;
public void handleMessageToOtherUser() {
TestSimpUser otherSimpUser = new TestSimpUser("anna");
otherSimpUser.addSessions(new TestSimpSession("456"));
given(this.registry.getUser("anna")).willReturn(otherSimpUser);
TestPrincipal user = new TestPrincipal("joe");
TestPrincipal otherUser = new TestPrincipal("anna");
String sourceDestination = "/user/anna/queue/foo";
Message<?> message = createMessage(SimpMessageType.MESSAGE, user, "456", sourceDestination);
UserDestinationResult actual = this.resolver.resolveDestination(message);
assertEquals(sourceDestination, actual.getSourceDestination());
assertEquals(1, actual.getTargetDestinations().size());
assertEquals("/queue/foo-user456", actual.getTargetDestinations().iterator().next());
assertEquals("/user/queue/foo", actual.getSubscribeDestination());
assertEquals(otherUser.getName(), actual.getUser());
@Test
public void attemptToWriteDisallowedTypePropertyIsNotFatal() throws JMSException {
Message<String> message = initBuilder()
.setHeader(JmsHeaders.TYPE, "someType")
.setHeader("foo", "bar")
.build();
javax.jms.Message jmsMessage = new StubTextMessage() {
@Override
public void setJMSType(String type) throws JMSException {
throw new JMSException("illegal property");
mapper.fromHeaders(message.getHeaders(), jmsMessage);
assertNull(jmsMessage.getJMSType());
assertNotNull(jmsMessage.getStringProperty("foo"));
assertEquals("bar", jmsMessage.getStringProperty("foo"));
@Test
public void toMessage() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
MyBean payload = new MyBean();
payload.setString("Foo");
payload.setNumber(42);
payload.setFraction(42F);
payload.setArray(new String[]{"Foo", "Bar"});
payload.setBool(true);
payload.setBytes(new byte[]{0x1, 0x2});
Message<?> message = converter.toMessage(payload, null);
String actual = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
assertTrue(actual.contains("\"string\":\"Foo\""));
assertTrue(actual.contains("\"number\":42"));
assertTrue(actual.contains("fraction\":42.0"));
assertTrue(actual.contains("\"array\":[\"Foo\",\"Bar\"]"));
assertTrue(actual.contains("\"bool\":true"));
assertTrue(actual.contains("\"bytes\":\"AQI=\""));
assertEquals("Invalid content-type", new MimeType("application", "json", StandardCharsets.UTF_8),
message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class));
protected void processHandlerMethodException(HandlerMethod handlerMethod, Exception exception, Message<?> message) {
InvocableHandlerMethod invocable = getExceptionHandlerMethod(handlerMethod, exception);
if (invocable == null) {
logger.error("Unhandled exception from message handler method", exception);
return;
invocable.setMessageMethodArgumentResolvers(this.argumentResolvers);
if (logger.isDebugEnabled()) {
logger.debug("Invoking " + invocable.getShortLogMessage());
try {
Throwable cause = exception.getCause();
Object returnValue = (cause != null ?
invocable.invoke(message, exception, cause, handlerMethod) :
invocable.invoke(message, exception, handlerMethod));
MethodParameter returnType = invocable.getReturnType();
if (void.class == returnType.getParameterType()) {
return;
this.returnValueHandlers.handleReturnValue(returnValue, returnType, message);
catch (Throwable ex2) {
logger.error("Error while processing handler method exception", ex2);
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
Class<?> targetMessageType = parameter.getParameterType();
Class<?> targetPayloadType = getPayloadType(parameter);
if (!targetMessageType.isAssignableFrom(message.getClass())) {
throw new MethodArgumentTypeMismatchException(message, parameter, "Actual message type '" +
ClassUtils.getDescriptiveType(message) + "' does not match expected type '" +
ClassUtils.getQualifiedName(targetMessageType) + "'");
Object payload = message.getPayload();
if (targetPayloadType.isInstance(payload)) {
return message;
if (isEmptyPayload(payload)) {
throw new MessageConversionException(message, "Cannot convert from actual payload type '" +
ClassUtils.getDescriptiveType(payload) + "' to expected payload type '" +
ClassUtils.getQualifiedName(targetPayloadType) + "' when payload is empty");
payload = convertPayload(message, parameter, targetPayloadType);
return MessageBuilder.createMessage(payload, message.getHeaders());
public void send() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
String payload = "sample payload";
this.session.send(destination, payload);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SEND, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(new MimeType("text", "plain", StandardCharsets.UTF_8), stompHeaders.getContentType());
assertEquals(-1, stompHeaders.getContentLength()); // StompEncoder isn't involved
assertEquals(payload, new String(message.getPayload(), StandardCharsets.UTF_8));
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
return future;
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
@Test
public void decodeFrameWithContentLengthZero() {
Message<byte[]> frame = decode("SEND\ncontent-length:0\n\n\0");
StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
assertEquals(StompCommand.SEND, headers.getCommand());
assertEquals(1, headers.toNativeHeaderMap().size());
assertEquals(Integer.valueOf(0), headers.getContentLength());
String bodyText = new String(frame.getPayload());
assertEquals("", bodyText);
private void logMessage(Message<?> message) {
if (logger.isDebugEnabled()) {
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
accessor = (accessor != null ? accessor : SimpMessageHeaderAccessor.wrap(message));
logger.debug("Processing " + accessor.getShortLogMessage(message.getPayload()));
@Override
public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException {
if (this.called) {
throw new java.lang.IllegalStateException("Converter called twice");
this.called = true;
TextMessage textMessage = (TextMessage) message;
return Long.parseLong(textMessage.getText());
@Aggregator(inputChannel = Sink.INPUT, outputChannel = AggregatorApplication.TEMP_INPUT)
public String receive(List<Message> messageList) {
Random random = new Random();
int batch = random.nextInt(100000);
for (Message msg : messageList) {
System.out.println(batch + " === " + msg);
return "aggregator result[" + batch + "]";
@Test
public void decodeFrameWithContentLengthZero() {
Message<byte[]> frame = decode("SEND\ncontent-length:0\n\n\0");
StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
assertEquals(StompCommand.SEND, headers.getCommand());
assertEquals(1, headers.toNativeHeaderMap().size());
assertEquals(Integer.valueOf(0), headers.getContentLength());
String bodyText = new String(frame.getPayload());
assertEquals("", bodyText);
@Override
public javax.jms.Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
if (!(object instanceof Message)) {
throw new IllegalArgumentException("Could not convert [" + object + "] - only [" +
Message.class.getName() + "] is handled by this converter");
Message<?> input = (Message<?>) object;
MessageHeaders headers = input.getHeaders();
Object conversionHint = headers.get(AbstractMessagingTemplate.CONVERSION_HINT_HEADER);
javax.jms.Message reply = createMessageForPayload(input.getPayload(), session, conversionHint);
this.headerMapper.fromHeaders(headers, reply);
return reply;
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
return future;
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
public void triggerAfterReceiveCompletion(
@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
for (int i = this.receiveInterceptorIndex; i >= 0; i--) {
ChannelInterceptor interceptor = interceptors.get(i);
try {
interceptor.afterReceiveCompletion(message, channel, ex);
catch (Throwable ex2) {
if (logger.isErrorEnabled()) {
logger.error("Exception from afterReceiveCompletion in " + interceptor, ex2);
@Nullable
public Object fromMessage(Message<?> message, Class<?> targetClass) {
for (MessageConverter converter : getConverters()) {
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
return result;
return null;
@Test
public void resolveNonAnnotatedParameterFailValidation() throws Exception {
// See testValidator()
Message<?> message = MessageBuilder.withPayload("invalidValue".getBytes()).build();
thrown.expect(MethodArgumentNotValidException.class);
thrown.expectMessage("invalid value");
assertEquals("invalidValue", this.resolver.resolveArgument(this.paramValidatedNotAnnotated, message));
@Test
public void twoMessagesInOneChunk() throws InterruptedException {
BufferingStompDecoder stompDecoder = new BufferingStompDecoder(STOMP_DECODER, 128);
String chunk = "SEND\na:alpha\n\nPayload1\0" + "SEND\na:alpha\n\nPayload2\0";
List<Message<byte[]>> messages = stompDecoder.decode(toByteBuffer(chunk));
assertEquals(2, messages.size());
assertEquals("Payload1", new String(messages.get(0).getPayload()));
assertEquals("Payload2", new String(messages.get(1).getPayload()));
assertEquals(0, stompDecoder.getBufferSize());
assertNull(stompDecoder.getExpectedContentLength());
@Test
public void sendToDefaultDestinationWhenUsingDotPathSeparator() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
Message<?> inputMessage = createMessage("sess1", "sub1", "/app/", "dest.foo.bar", null);
this.handler.handleReturnValue(PAYLOAD, this.sendToDefaultDestReturnType, inputMessage);
verify(this.messageChannel, times(1)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor accessor = getCapturedAccessor(0);
assertEquals("/topic/dest.foo.bar", accessor.getDestination());
@Nullable
@SuppressWarnings("unchecked")
protected Object resolveArgumentInternal(MethodParameter parameter, Message<?> message, String name) {
MessageHeaders headers = message.getHeaders();
Map<String, String> vars = (Map<String, String>) headers.get(DESTINATION_TEMPLATE_VARIABLES_HEADER);
return vars != null ? vars.get(name) : null;
@StreamListener("index-message-in")
public void indexTorrent(Message<Torrent> message) {
try {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
Torrent torrent = message.getPayload();
log.debug("Index torrent to elasticsearch, info hash is {}", torrent.getInfoHash());
torrentService.index(torrent);
//no error, execute acknowledge
if (acknowledgment != null) {
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Index torrent error: {}", e);