Hi Solace team, we are using Spring Cloud Stream Solace in our Spring Boot project, when we try to send messages simultaneously from 5 different threads by using `StreamBridge.send()` the underlying implementation "randomly" throws `org.springframework.messaging.MessagingException`, by randomly I mean it doesn't matter if I try to send 1000 messages at the same time or 50.
Library version:
`implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace:4.2.0")`
Full stack trace:
org.springframework.messaging.MessagingException: Cannot send message using handler 028dfc28-458f-46d9-a669-aa22169323fd
at com.solace.spring.cloud.stream.binder.util.ErrorChannelSendingCorrelationKey.send(ErrorChannelSendingCorrelationKey.java:57)
at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessagingException(JCSMPOutboundMessageHandler.java:188)
at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessage(JCSMPOutboundMessageHandler.java:76)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:187)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:146)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:141)
at com.ingka.fms.inbound.reversereceiving.common.service.EventMeshPublisher.publish(EventMeshPublisher.kt:22)
at com.ingka.fms.inbound.reversereceiving.common.service.EventMeshPublisher.publish$default(EventMeshPublisher.kt:18)
at com.ingka.fms.inbound.reversereceiving.receiving.messaging.publisher.reverseReceivingUnitStatus.ReverseReceivingUnitStatusUpdatedPublisher.publish(ReverseReceivingUnitStatusUpdatedPublisher.kt:37)
at com.ingka.fms.inbound.reversereceiving.receiving.messaging.publisher.reverseReceivingUnitStatus.ReverseReceivingUnitStatusUpdatedPublisher.createEventAndPublish(ReverseReceivingUnitStatusUpdatedPublisher.kt:73)
at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.publishEvents(UnitService.kt:281)
at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.updateUnitStatusAndPublishEvents(UnitService.kt:391)
at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.updateUnitStatusAndPublishEvent(UnitService.kt:253)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717)
at com.ingka.fms.inbound.reversereceiving.common.service.UnitService$$SpringCGLIB$$0.updateUnitStatusAndPublishEvent(<generated>)
at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceivingUnit(ReceivingService.kt:563)
at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceivingUnits(ReceivingService.kt:554)
at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceiving$lambda$8(ReceivingService.kt:298)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.solace.spring.cloud.stream.binder.util.ClosedChannelBindingException: Message handler 028dfc28-458f-46d9-a669-aa22169323fd is not running
... 41 common frames omitted
Tagged:
Spring
spring cloud stream
Springboot
Thanks @siavashsoleymani,
Out of curiosity can you try adjusting the spring.cloud.stream.dynamic-destination-cache-size
parameter (default 10) and see if that helps? If you know you are sending to like 100 different topics over your apps lifetime try to change it to like 110 and see if that helps.
**If this is the issue you might consider trying out the second option for publishing to dynamic destinations as it should also be more performant with Solace :) . I covered it in section 6 here:
https://codelabs.solace.dev/codelabs/spring-cloud-stream-beyond/?index=..%2F..index#5
Hi @siavashsoleymani,
Good to see you again! It's been a while :) And thank you for sharing. Any chance you have code you can share that easily reproduces it? Also, can I get you to open a github issue on this? It sounds like it's going to be a bug. If you have official support I'd also suggest reporting it via that avenue to help get a fix prioritized.
https://github.com/SolaceProducts/solace-spring-cloud
Also as a heads up I deleted the duplicate discussion that was started and left the one marked as a "Question".
Hi @marc nice to see you too!
I am trying to write a simple code that reproduces it, unfortunately, I have not been able to replicate it on my local yet, I am going to create an issue on GitHub and if I make it on my local I will add that code
maybe it gives more clue I do see this in the logs as well:
Stopping producer to TOPIC xxx/xxx/status/updated/V2/FA/FMSRR/xx/xx/ddd/aaaa/rc <message handler ID: 088fd415-8c5c-4ea0-91af-c319403403d3>
before the above log this log happens:
logger
: c.s.s.c.s.b.util.SharedResourceManager
message
: 088fd415-8c5c-4ea0-91af-c319403403d3 is not the last user, persisting producer...
@marc
I also see this log:
logger
: c.s.s.c.s.b.util.SharedResourceManager
message
: 088fd415-8c5c-4ea0-91af-c319403403d3 is not the last user, persisting producer...
Also, the difference I see between my small local project is that in this project this opening/closing producers doesn't happen, but in our production, it frequently happens
Thanks for the update @siavashsoleymani,
I think you found the key here:
Caused by: com.solace.spring.cloud.stream.binder.util.ClosedChannelBindingException: Message handler 028dfc28-458f-46d9-a669-aa22169323fd is not running
does indeed mean that someone/something is administratively stopping the producer binding. Either through code or a person/machine through the bindings
actuator REST endpoint.
So we need to find out why it is being stopped in production. Are you deploying into K8s or something that is performing a health check and trying to restart the container if it fails?
@marc so after some more investigation I realized that the cardinality of the topics is important and on my local, I was trying to produce those messages on the same topic and adding some random suffix to the topic names, I was able to see in the logs that application open/close the handlers.
2024-07-16T14:52:25.900+02:00 INFO 68298 --- [ool-1-thread-16] c.s.s.c.s.b.util.SharedResourceManager : b15b7319-a225-4d6f-bcb2-0111ce5ae655 is not the last user, persisting producer...
2024-07-16T14:52:25.901+02:00 INFO 68298 --- [ool-1-thread-17] .s.s.c.s.b.o.JCSMPOutboundMessageHandler : Creating producer to TOPIC ingka.ilo.fms/inboundUnit/status/receiving-completed/V1/EU/FMSRR/BE/STO/siavash3 <message handler ID: 3db42e77-8a15-4eff-a842-c39ed5163e20>
2024-07-16T14:52:25.901+02:00 INFO 68298 --- [ool-1-thread-17] o.s.c.s.m.DirectWithAttributesChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
2024-07-16T14:52:25.901+02:00 INFO 68298 --- [ool-1-thread-17] .s.s.c.s.b.o.JCSMPOutboundMessageHandler : Stopping producer to TOPIC ingka.ilo.fms/inboundUnit/status/receiving-completed/V1/EU/FMSRR/BE/STO/siavash39 <message handler ID: c29b584f-2895-4d2c-9c14-0417c68b297c>
By reviewing the code in `SharedResourceManager.java` and `JCSMPOutboundMessageHandler.java` in the Solace Library, a potential issue that stands out is related to the lifecycle management and synchronization of shared resources.
The `release(String key)` method in `SharedResourceManager.java` closes the shared resource when the last key is deregistered. However, if another thread calls `get(String key)` concurrently just after the resource has been checked to be not empty but before it is closed, it might end up using a resource that is about to be closed. This race condition could lead to unexpected behavior or exceptions
The `start()` and `stop()` methods manage the lifecycle. However, there's no explicit synchronization around the checks and operations on `isRunning` state and `producer` instance creation/release. In a multi-threaded scenario, this could lead to cases where multiple threads might concurrently attempt to start or stop the handler, potentially leading to inconsistent states or exceptions like `ClosedChannelBindingException` if one thread closes the producer while another is sending a message.
Thanks @siavashsoleymani,
Out of curiosity can you try adjusting the spring.cloud.stream.dynamic-destination-cache-size
parameter (default 10) and see if that helps? If you know you are sending to like 100 different topics over your apps lifetime try to change it to like 110 and see if that helps.
**If this is the issue you might consider trying out the second option for publishing to dynamic destinations as it should also be more performant with Solace :) . I covered it in section 6 here:
https://codelabs.solace.dev/codelabs/spring-cloud-stream-beyond/?index=..%2F..index#5
hey @marc good news! the problem is resolved now by increasing the cache size! but still, I think it is worth investing in fixing the bug specifically for bigger projects using Solace that every MB of RAM counts for them.
All in all many thanks for your follow-ups and prompt responses helping us to resolve the issue.
Hi @siavashsoleymani,
Glad it worked for you! Thanks for confirming. And yes, we are still going to look at it internally. Tbh we are considering recommending NOT using streambridge when publishing to many dynamic topics b/c of this issue. It is more efficient, and not much harder just to use the second option I referenced above.
So basically doing
streamBridge.send("some-fixed-output-binding", MessageBuilder.fromMessage(msg).setHeader(BinderHeaders.TARGET_DESTINATION, myDestination).build());
instead of:
streamBridge.send(myDestination, msg);