添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
public static Scheduler newElastic(String name) { // name = "selfTrade-thread" return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS); // 2. public static Scheduler newElastic(String name, int ttlSeconds) { // name = "selfTrade-thread" // ttlSeconds = 60 return newElastic(name, ttlSeconds, false); // 3. public static Scheduler newElastic(String name, int ttlSeconds) { // name = "selfTrade-thread" // ttlSeconds = 60 return newElastic(name, ttlSeconds, false); // 4. public static Scheduler newElastic(String name, int ttlSeconds, boolean daemon) return newElastic( ttlSeconds, // 4.1 new ReactorThreadFactory( // selfTrade-thread name, // 统计信息 // AtomicLong COUNTER = new AtomicLong(); ElasticScheduler.COUNTER, // false daemon, false, // 异常处理方法 Schedulers::defaultUncaughtException) static volatile Factory factory = DEFAULT; static final Factory DEFAULT = new Factory() { }; public static Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) { // ttlSeconds = 60 // threadFactory = reactor.core.scheduler.ReactorThreadFactory // fatory 指向了自己内部的一个属性:DEFAULT // factory = Schedulers.DEFAULT return factory.newElastic(ttlSeconds, threadFactory); public interface Schedulers.Factory { // 6. default Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) { // ttlSeconds = 60 // threadFactory = reactor.core.scheduler.ReactorThreadFactory // 创建ElasticScheduler并返回(看第5点) return new ElasticScheduler(threadFactory, ttlSeconds);

(3).ReactorThreadFactory

package reactor.core.scheduler;
class ReactorThreadFactory 
       implements ThreadFactory,
                  Supplier<String>,
                  Thread.UncaughtExceptionHandler {
    // 线程名称
    final private String                        name;
	// 统计信息
   final private AtomicLong                    counterReference;
   // 是否守护线程 false
	final private boolean                       daemon;
   // 是否阻塞并拒绝 false
	final private boolean                       rejectBlocking;
    @Nullable
	final private BiConsumer<Thread, Throwable> uncaughtExceptionHandler;
	ReactorThreadFactory(String name,
			AtomicLong counterReference,
			boolean daemon,
			boolean rejectBlocking,
			@Nullable BiConsumer<Thread, Throwable> uncaughtExceptionHandler) {
		this.name = name;
		this.counterReference = counterReference;
		this.daemon = daemon;
		this.rejectBlocking = rejectBlocking;
            // 异常处理方式
		this.uncaughtExceptionHandler = uncaughtExceptionHandler;
 @Override
	public final Thread newThread(@NotNull Runnable runnable) {
           // 创建线程名称
		String newThreadName = name + "-" + counterReference.incrementAndGet();
		// rejectBlocking  = false
           Thread t = rejectBlocking
                      ? new NonBlockingThread(runnable, newThreadName)
                      : new Thread(runnable, newThreadName);
		if (daemon) {
              t.setDaemon(true);
		if (uncaughtExceptionHandler != null) {
              t.setUncaughtExceptionHandler(this);
		return t;
   // 拒绝策略
	@Override
	public void uncaughtException(Thread t, Throwable e) {
        if (uncaughtExceptionHandler == null) {
            return;
        uncaughtExceptionHandler.accept(t,e);
    // 获得线程名称
	@Override
	public final String get() {
        return name;
    static final class NonBlockingThread extends Thread implements NonBlocking {
        public NonBlockingThread(Runnable target, String name) {
            super(target, name);
    }// end NonBlockingThread

(4).Scheduler

public interface Scheduler extends Disposable {
    Disposable schedule(Runnable task);
    default long now(TimeUnit unit) {
        return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
     Worker createWorker();
    default void dispose() {
    default void start() {
     interface Worker extends Disposable {
         Disposable schedule(Runnable task);
         default Disposable schedule(Runnable task, long delay, TimeUnit unit) {
             throw Exceptions.failWithRejectedNotTimeCapable();
          default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
                throw Exceptions.failWithRejectedNotTimeCapable();
     } //end Worker    

(5).ElasticScheduler

final class ElasticScheduler 
      // Reactor自定义的线程调度方法
      implements Scheduler, 
      // 返回JDK自带的线程池
      Supplier<ScheduledExecutorService>,
      Scannable {
    // 每创建一个线程 COUNTER++
    static final AtomicLong COUNTER = new AtomicLong();
    // 创建回收线程,并设置为守护线程 
    static final ThreadFactory EVICTOR_FACTORY = r -> {
		Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet());
		t.setDaemon(true);
		return t;
   static final CachedService SHUTDOWN = new CachedService(null);
	static final int DEFAULT_TTL_SECONDS = 60;
	final ThreadFactory factory;
	final int ttlSeconds;
	final Deque<ScheduledExecutorServiceExpiry> cache;
	final Queue<CachedService> all;
	final ScheduledExecutorService evictor;
	volatile boolean shutdown;
    // 1. 
    ElasticScheduler(ThreadFactory factory, int ttlSeconds) {
           // ttlSeconds = 60 
		if (ttlSeconds < 0) {
			throw new IllegalArgumentException("ttlSeconds must be positive, was: " + ttlSeconds);
		this.ttlSeconds = ttlSeconds;
           // factory = reactor.core.scheduler.ReactorThreadFactory
		this.factory = factory;
           // *******************************
           // 创建双端队列
           // *******************************
		this.cache = new ConcurrentLinkedDeque<>();
		this.all = new ConcurrentLinkedQueue<>();
           // ***************************************
           // 创建回收线程池 core:1 max:Integer.MAX_VALUE
           // ***************************************
		this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
		// 启动线程回收
           this.evictor.scheduleAtFixedRate(
                           this::eviction, 
                            // 延迟60秒
				ttlSeconds,
                            // 间隔60秒
				ttlSeconds,
                            // 单位为秒
				TimeUnit.SECONDS);

(6).总结

  • 创建Schedulers.ElasticScheduler, 它的数据结构是: ThreadFactory(reactor.core.scheduler.ReactorThreadFactory) ScheduledExecutorService
  • 实际就是创建了ScheduledExecutorService的子类
  •