添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
活泼的铁链  ·  连接器 | EMQX文档·  3 天前    · 
飘逸的花卷  ·  MQTT v5 Feedbacks · ...·  4 天前    · 
豪情万千的铁板烧  ·  Identifier 'Buffer' ...·  4 天前    · 
慈祥的消防车  ·  paho-mqtt · PyPI·  4 天前    · 
直爽的马克杯  ·  Windows server 2016, ...·  1 年前    · 
谈吐大方的小蝌蚪  ·  React-脚本之家·  1 年前    · 
  • 问题原因 :SpringBoot后端服务中集成的mqtt客户端,如果mqtt客户端无法及时处理消息队列,当长时间处于等待状态后会导致超时mqtt连接断开,即使重连,又会因为等待超时的原因而再次掉线。
  • 解决方案 :从上面的问题原因可知,解决此问题的核心是加快mqtt客户端处理消息的能力,因SpringBoot自带的线程池和异步方法处理多线程的性能很好,且在SpringBoot项目中集成简单方便,故作为解决此问题的核心技术方案。
  • 核心要点
  • 容量足够大的线程池:这样有利于提高并发量,充分利用CPU的时间分片。
  • 异步方法:主线程调用异步方法后直接进入创建子线程去处理异步方法中的流程,主线程跳过等待子线程执行结束直接进行后面的流程,这样可以快速结束当前主线程,然后接收下一个mqtt消息并处理。
  • 依赖注入:使用Spring的IoC机制对类的对象创建进行管理,所有要使用Spring的 @Async 注解的类都要通过@Service或@Component之类的注解来进入Spring的bean对象管理,只有这样才能使用AOP的机制,从而使用异步方法。
  • AOP机制:使用 @Async 注解的方法A 不能调用在同一个类中的同样用 @Async 修饰的方法B,这样会导致方法B的异步失效变成同步,解决方法是将方法B放进另外一个类中。简述原因,因为AOP的动态代理机制导致无法在同一个类中异步方法调用异步方法。
  • SpringBoot的启动类添加 @EnableAsync 注解
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    package com.urbmn.deviceCommunity;

    import lombok.extern.slf4j.Slf4j;
    import org.apache.catalina.connector.Connector;
    import org.apache.coyote.ProtocolHandler;
    import org.apache.coyote.http11.AbstractHttp11Protocol;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.web.embedded.tomcat.TomcatConnectorCustomizer;
    import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
    import org.springframework.boot.web.server.WebServerFactoryCustomizer;
    import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.cloud.openfeign.EnableFeignClients;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;

    /**
    *
    */
    @EnableAsync
    @EnableRedisHttpSession
    @EnableFeignClients
    @EnableDiscoveryClient
    @SpringBootApplication
    @MapperScan(basePackages = "com.urbmn.deviceCommunity.mapper")
    @Configuration
    @Slf4j
    public class DeviceCommunityApplication implements WebServerFactoryCustomizer<ConfigurableServletWebServerFactory> {

    public static void main(String[] args) {
    SpringApplication.run(DeviceCommunityApplication.class, args);
    }

    @Override
    public void customize(ConfigurableServletWebServerFactory factory) {
    /*TomcatServletWebServerFactory f = (TomcatServletWebServerFactory) factory;
    f.setProtocol("org.apache.coyote.http11.Http11Nio2Protocol");
    public static void main(String[] args) {
    SpringApplication.run(ManagerApplication.class, args);
    }

    f.addConnectorCustomizers(c -> {
    Http11NioProtocol protocol = (Http11NioProtocol) c.getProtocolHandler();
    protocol.setMaxConnections(200);
    protocol.setMaxThreads(200);
    protocol.setSelectorTimeout(3000);
    protocol.setSessionTimeout(3000);
    protocol.setConnectionTimeout(3000);
    });*/
    ((TomcatServletWebServerFactory) factory).setProtocol("org.apache.coyote.http11.Http11Nio2Protocol");
    ((TomcatServletWebServerFactory) factory).addConnectorCustomizers(new TomcatConnectorCustomizer() {
    @Override
    public void customize(Connector connector) {
    ProtocolHandler protocol = connector.getProtocolHandler();
    log.info("########### Tomcat({}) -- MaxConnection:{};MaxThreads:{};MinSpareThreads:{}", //
    protocol.getClass().getName(), //
    ((AbstractHttp11Protocol<?>) protocol).getMaxConnections(), //
    ((AbstractHttp11Protocol<?>) protocol).getMaxThreads(), //
    ((AbstractHttp11Protocol<?>) protocol).getMinSpareThreads());

    }
    });
    }
    }

  • 添加SpringBoot异步方法配置类,配置异步方法的线程池
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package com.urbmn.deviceCommunity.config;


    import lombok.extern.slf4j.Slf4j;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

    import java.lang.reflect.Method;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;

    @Slf4j
    @Configuration
    @EnableAsync
    public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
    int corePoolSize = Runtime.getRuntime().availableProcessors() * 100;
    int maxPoolSize = Runtime.getRuntime().availableProcessors() * 1200;
    int queueCapacity = Runtime.getRuntime().availableProcessors() * 2400;
    log.info("\n### [AsyncConfig] corePoolSize: {}, maxPoolSize: {}, queueCapacity: {}", corePoolSize, maxPoolSize, queueCapacity);
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 核心线程池数量,方法: 返回可用处理器的Java虚拟机的数量。
    executor.setCorePoolSize(corePoolSize);
    // 最大线程数量
    executor.setMaxPoolSize(maxPoolSize);
    // 线程池的队列容量
    executor.setQueueCapacity(queueCapacity);
    // 线程名称的前缀
    executor.setThreadNamePrefix("Executor-");
    // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
    // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
    // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    // 设置拒绝策略
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    // 等待所有任务结束后再关闭线程池
    //executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.initialize();
    return executor;
    }

    /* 异步任务中异常处理 */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return (Throwable exception, Method method, Object... params) -> {
    log.error("ClassName: {}, MethodName: {}", method.getDeclaringClass().getName(), method.getName());
    log.error("exceptionName: {}", exception.getClass().getName());
    log.error("exceptionMessage: {}", exception.getMessage());
    };
    }
    }

    MQTT配置

  •