requestAttributeMap = new HashMap<>();
@Override
public Object getAttribute(String name, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
return requestAttributeMap.get(name);
return null;
@Override
public void setAttribute(String name, Object value, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
requestAttributeMap.put(name, value);
@Override
public void removeAttribute(String name, int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
requestAttributeMap.remove(name);
@Override
public String[] getAttributeNames(int scope) {
if (scope == RequestAttributes.SCOPE_REQUEST) {
return requestAttributeMap
.keySet()
.toArray(new String[0]);
return new String[0];
// todo implement other methods (not used. just return null)
By calling these methods RequestContextHolder.setRequestAttributes(new CustomRequestScopeAttr()) to start the process and RequestContextHolder.resetRequestAttributes() to end it – we take control of determining when a request begins and concludes.
In the following sections, we will explore various scenarios to illustrate how we can determine the start and end of a request effectively.
Using Request Scoped Beans in @Async
Encountering an @Async annotation within a web-based request can lead to an IllegalStateException when attempting to access a request scoped bean. To overcome this, we need to set the RequestAttributes when creating the async thread. This involves creating a custom AsyncConfiguration to manage async thread creation, where we set the RequestAttributes accordingly.
Following is an example of an AsyncConfiguration:
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
private final TaskExecutionProperties taskExecutionProperties;
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
return getExecutor(
taskExecutionProperties.getPool(),
taskExecutionProperties.getThreadNamePrefix()
private Executor getExecutor(TaskExecutionProperties.Pool pool, String threadNamePrefix) {
ContextAwarePoolExecutor executor = new ContextAwarePoolExecutor();
executor.setCorePoolSize(pool.getCoreSize());
executor.setMaxPoolSize(pool.getMaxSize());
executor.setQueueCapacity(pool.getQueueCapacity());
executor.setThreadNamePrefix(threadNamePrefix);
return executor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
* @param task the {@code Callable} to execute (never {@code null}) - is the actual method we want to call
@Override
public Future submit(Callable task) {
return super.submit(
new ContextAwareCallable(RequestContextHolder.currentRequestAttributes(), task)
@Override
public ListenableFuture submitListenable(Callable task) {
return super.submitListenable(
new ContextAwareCallable(RequestContextHolder.currentRequestAttributes(), task)
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.Callable;
public class ContextAwareCallable implements Callable {
private final CustomRequestScopeAttr requestAttributes;
private Callable task;
public ContextAwareCallable(RequestAttributes requestAttributes, Callable task) {
this.task = task;
this.requestAttributes = cloneRequestAttributes(requestAttributes);
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return task.call();
} finally {
RequestContextHolder.resetRequestAttributes();
* this is needed, because once the main thread is finished, the object may get garbage collected, even if the async thread is not finished
* @param requestAttributes
* @return
private CustomRequestScopeAttr cloneRequestAttributes(RequestAttributes requestAttributes) {
CustomRequestScopeAttr clonedRequestAttribute = null;
try {
clonedRequestAttribute = new CustomRequestScopeAttr();
for (String name : requestAttributes.getAttributeNames(RequestAttributes.SCOPE_REQUEST)) {
clonedRequestAttribute.setAttribute(
name,
requestAttributes.getAttribute(name, RequestAttributes.SCOPE_REQUEST),
RequestAttributes.SCOPE_REQUEST
return clonedRequestAttribute;
} catch (Exception e) {
return new CustomRequestScopeAttr();
The ContextAwareCallable class implements the Callable interface and handles the execution of the new thread. In its call() method, we set the RequestAttributes to “mark” the start of the request. As we pass the RequestAttributes from the main thread, we need to clone them before setting. This prevents issues where the main thread might finish before the async thread, leading to attribute garbage collection. In the finally-block of the call() method, we “mark” the end of the request by resetting the RequestAttributes. This approach allows for cascading multiple async calls, because request attributes being passed down and always cloned for each new thread.
By using this mechanism, you can seamlessly utilize @Async annotations while retaining the functionality of request scoped beans.
Using Request Scoped Beans with Pub/Sub
The firewall Rule for the connection to the backend only applies to a specific tag, which looks like a node. For example: “gke-staging-456b4340-node”However this is a Network Tag, which is on every Compute Instance of the cluster. Therefor the heaWhen working with Pub/Sub consume events, which are requests from a queue rather than web-based interactions, using request scoped beans requires the setting of RequestAttributes. Fortunately, defining the start and end of a request in a Pub/Sub “context” is straightforward, as we have clear visibility into when a call begins and concludes. In our case, we use Spring Cloud GCP Pub/Sub.
We have a PubSubConsumer class which somehow looks like this:
public MessageReceiver receiver() {
return (PubsubMessage message, AckReplyConsumer consumer) -> {
try {
String messageString = parseMessageToString(message, consumer);
if (messageString == null) {
return;
startConsumeProcess(messageString);
consumer.ack();
} catch (NackException e) {
// we are fine. just nack and try again
log.info("received nack exception. we will nack this queue entry", e);
consumer.nack();
} catch (AckException e) {
// we are fine. we can ack this one
log.info("received ack exception. we will ack this queue entry", e);
consumer.ack();
} catch (Exception e) {
// we are not fine
log.error("error while receiving message from subscription {}", subscription, e);
consumer.nack();
} finally {
RequestContextHolder.resetRequestAttributes();
protected T parseStringToPayloadType(String messageString) {
try {
return objectMapper.readValue(messageString, payloadType);
} catch (IOException e) {
protected String parseMessageToString(PubsubMessage message, AckReplyConsumer consumer) {
log.info("receive message from subscription {} with payload {}", subscription, message);
if (message == null || message.getData() == null) {
return message.getData().toStringUtf8();
* actual consumer process logic.
* @param messageString String content of a message.
* @throws Exception
protected void startConsumeProcess(String messageString) throws Exception {
RequestContextHolder.setRequestAttributes(new CustomRequestScopeAttr());
T payload = parseStringToPayloadType(messageString);
setContextVariables(payload);
consume(payload);
When starting the consume process in the startConsumeProcess() method, we set the RequestScope attributes. In the finally-block of the actual Receiver, we reset the RequestAttributes. The entire Pub/Sub event, from the beginning of the consumer until the end, forms a complete request, ensuring that the management of request scoped beans works seamlessly within this context.
Even if you invoke an asynchronous method within the consumer, this approach remains effective. Just make sure to implement the changes from the “Async” chapter to handle asynchronous scenarios correctly. By combining these strategies, you can confidently leverage request scoped beans in Pub/Sub events, facilitating robust and efficient processing of messages within your Spring application.
Dealing with Java ParallelStream
By now I sadly have not found a feasible / generic solution to access a request scoped bean inside of a java ParallelStream. The underlying issue lies in the use of a common Fork/Join Pool by Java streams for parallelization. These threads are neither created nor configured by Spring or the developer, unlike what we did in the “Async” section. While you could manually set the request attributes for each ParallelStream, this is not a practical solution, especially when already dealing with multiple ParallelStreams, leading to potential oversights.
Another approach would be to create a custom ForkJoinPool and use this for parallelstreaming. This may look something like this (copied from here):
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();