老虎机支付宝充值多线程高并发编程(8) -- Fork/Join源码分析

一.概念

本文来源:http://www.ssb52.com/www_xs8_cn/

老虎机支付宝充值,问题来了,韩国企业的成本价是多少呢据韩国最大券商新韩金融投资(ShinhanInvestmentCorp)的分析报告指出,去年OCI的多晶硅生产成本是19.7美元/公斤,现金成本16.1美元/公斤。  本网所呈现的所有内容,包括但不限于文字、图像、图片、照片、图表、音频、视频、标志、标识、广告、商标、商号、域名、程序、版面设计、专栏目录与名称、内容分类标准以及为注册用户提供的任何信息,均受到《中华人民共和国著作权法》、《中华人民共和国商标法》、《中华人民共和国专利法》、《中华人民共和国反不正当竞争法》等相关法律法规的保护,为本网或权利人所有。  陈健康提醒,市民在购买汽车支付定金时,就应当对交车时间、是否二手翻新车等信息在合同里进行备注,保护自己的合法权益。  实际上,前海人寿的表态代表了“资产荒”背景下绝大多数险资投资上市公司的初衷。

至少有1个亿才能成为服务于多个家族的联合家族办公室的服务对象,目前中国大陆有8.9万名亿万超高净值人士达到这一门槛,五年后将增至18万名。  也门总理达格尔发布命令,要求组建一个专门委员会跟踪事件进展情况,并与海岸警备队合作进行搜救。  扎哈罗娃表示,如果这项法案最终生效,俄外交官就得在每次“越界”远行前数天通知美国务院,然而俄外交官需每天面对不断变化的国际局势,他们经常无法提前数日确定自己的行程。那作为中央深改组组长的习近平是如何殚精竭力指导这场全面深化改革的呢?习近平曾说,“改革开放是前无古人的崭新事业,必须坚持正确的方法论,在不断实践探索中前进”,可见他对改革方法论的重视。

  新华社北京12月5日电国家主席习近平12月5日致信米尔济约耶夫,祝贺他当选总统。我愿继续为增进美中相互了解和交往合作积极发挥作用。赵平的弟弟赵兵不愿意回忆起那个惨烈的画面。要结合推进供给侧结构性改革,加快推动绿色、循环、低碳发展,形成节约资源、保护环境的生产生活方式。

  Fork/Join就是将一个大任务分解(fork)成许多个独立的小任务,然后多线程并行去处理这些小任务,每个小任务处理完得到结果再进行合并(join)得到最终的结果。

  流程:任务继承RecursiveTask,重写compute方法,使用ForkJoinPool的submit提交任务,任务在某个线程中运行,工作任务中的compute方法的代码开始对任务进行分析,如果符合条件就进行任务拆分,拆分成多个子任务,每个子任务进行数据的计算或操作,得到结果返回给上一层任务开启线程进行合并,最终通过get获取整体处理结果。【只能将任务1个切分为两个,不能切分为3个或其他数量

  • ForkJoinTask:代表fork/join里面的任务类型,一般用它的两个子类RecursiveTask(任务有返回值)和RecursiveAction(任务没有返回值),任务的处理逻辑包括任务的切分都是在重写compute方法里面进行处理。只有ForkJoinTask任务可以被拆分运行和合并运行。可查看上篇Future源码分析的类图结构】【ForkJoinTask使用了模板模式进行设计,将ForkJoinTask的执行相关代码进行隐藏,通过提供抽象类(即子类RecursiveTask、RecursiveAction)暴露用户的实际业务处理。】
    • RecursiveTask:在进行exec之后会使用一个result的变量进行接受返回的结果;
      public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
          V result;
          protected abstract V compute();
      
          public final V getRawResult() {
              return result;
          }
      
          protected final void setRawResult(V value) {
              result = value;
          }
          protected final boolean exec() {
              result = compute();
              return true;
          }
      
      }
    • RecursiveAction:在进行exec之后没有返回结果;
      public abstract class RecursiveAction extends ForkJoinTask<Void> {
         
          protected abstract void compute();
      
          public final Void getRawResult() { return null; }
      
          protected final void setRawResult(Void mustBeNull) { }
      
          protected final boolean exec() {
              compute();
              return true;
          }
      
      } 
  • ForkJoinPool:fork/join框架的管理者,最原始的任务都要交给它来处理。它负责控制整个fork/join有多少个工作线程,工作线程的创建、机会都是由它来控制。它还负责workQueue队列的创建和分配,每当创建一个工作线程,它负责分配对应的workQueue,然后它把接到的活都交给工作线程去处理。是整个fork/join的容器。
    • ForkJoinPool.WorkQueue:双端队列,负责存储接收的任务;
  • ForkJoinWorkerThread:fork/join里面真正干活的”工人“,它继承了Thread,所以本质是一个线程。它有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue,然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool销毁了,它也会跟着结束。【每一个ForkJoinWorkerThread线程都具有一个独立的任务等待队列workQueue。】
    • 当使用ForkJoinPool进行submit任务提交时,创建1个workQueue将任务放进去,然后进行fork任务切分,如果切分后的任务放的进去之前的workQueue就放进去,不行就随机选取workQueue放进去,如果还放不了就创建一个新的workQueue放进去;
      public class ForkJoinWorkerThread extends Thread {
          final ForkJoinPool pool;
          final ForkJoinPool.WorkQueue workQueue;
          protected ForkJoinWorkerThread(ForkJoinPool pool) {
              super("aForkJoinWorkerThread");
              this.pool = pool;
              this.workQueue = pool.registerWorker(this);/向ForkJoinPool执行池注册当前工作线程,ForkJoinPool为其分配一个工作队列
          }
      }

二.用法

  以前1+2+3+...+100这样的处理可以用for循环处理,现在使用fork/join来处理:从下面结果可以看到,大任务被不断的拆分成小任务,然后添加到工作线程的队列中,每个小任务都会被工作线程从队列中取出进行运行,然后每个小任务的结果的合并也由工作线程执行,然后不断的汇总成最终结果。【task通过ForkJoinPool来执行,分割的子任务添加到当前工作线程的队列中,进入队列的头部,当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务。(工作窃取:当前工作线程对应的队列中没有任务了,从其他工作线程对应的队列中取出任务进行操作,然后将操作结果返还给对应队列的线程。)】

public class MyFrokJoinTask extends RecursiveTask<Integer> {
    private int begin;
    private int end;

    public MyFrokJoinTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    public static void main(String[] args) throws Exception {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> result = pool.submit(new MyFrokJoinTask(1, 100));/提交任务
        System.out.println("计算的值:"+result.get());/得到最终的结果

    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - begin <= 2) {
            for (int i = begin; i <= end; i++) {
                sum += i;
                System.out.println("i:"+i);
            }
        } else {
            MyFrokJoinTask d1 = new MyFrokJoinTask(begin, (begin + end) / 2);
            MyFrokJoinTask d2 = new MyFrokJoinTask((begin + end) / 2+1, end);
            d1.fork();/任务拆分
            d2.fork();/任务拆分
            Integer a = d1.join();/每个任务的结果
            Integer b = d2.join();/每个任务的结果
            sum = a + b;/汇总任务结果
            System.out.println("sum:" + sum + ",a:" + a + ",b:" + b);
        }
        System.out.println("name:"+Thread.currentThread().getName());
        return sum;
    }
}
/=========结果============
i:1
i:2
name:ForkJoinPool-1-worker-1
i:3
i:4
name:ForkJoinPool-1-worker-1
sum:10,a:3,b:7
name:ForkJoinPool-1-worker-1
i:5
i:6
i:7
name:ForkJoinPool-1-worker-1
sum:28,a:10,b:18
name:ForkJoinPool-1-worker-1
...............
...............
sum:91,a:28,b:63
sum:99,a:45,b:54
name:ForkJoinPool-1-worker-3
name:ForkJoinPool-1-worker-1
i:23
i:24
i:25
name:ForkJoinPool-1-worker-2
sum:135,a:63,b:72
name:ForkJoinPool-1-worker-2
sum:234,a:99,b:135
name:ForkJoinPool-1-worker-3
sum:325,a:91,b:234
name:ForkJoinPool-1-worker-1
sum:1275,a:325,b:950
name:ForkJoinPool-1-worker-1
sum:5050,a:1275,b:3775
name:ForkJoinPool-1-worker-1
计算的值:5050

三.分析

  ForkJoinPool

ForkJoinPool forkJoinPool = new ForkJoinPool();
/Runtime.getRuntime().availableProcessors()当前操作系统可以使用的CPU内核数量
public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
/this调用到下面这段代码
public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism), /并行度
            checkFactory(factory), /工作线程创建工厂
            handler, /异常处理handler
            asyncMode ? FIFO_QUEUE : LIFO_QUEUE, /任务队列出队模式 异步:先进先出,同步:后进先出
            "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
/上面的this最终调用到下面这段代码
private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); / offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
  • parallelism:可并行数量,fork/join框架将依据这个并行数量的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理;
  • factory当fork/join创建一个新的线程时,同样会用到线程创建工厂。它实现了ForkJoinWorkerThreadFactory接口,使用默认的的接口实现类DefaultForkJoinWorkerThreadFactory来实现newThread方法创建一个新的工作线程;
    public static interface ForkJoinWorkerThreadFactory {
            /**
             * Returns a new worker thread operating in the given pool.
             */
            public ForkJoinWorkerThread newThread(ForkJoinPool pool);
        }
    
        static final class DefaultForkJoinWorkerThreadFactory
            implements ForkJoinWorkerThreadFactory {
            public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                return new ForkJoinWorkerThread(pool);
            }
        }
  • handler:异常捕获处理器。当执行的任务出现异常,并从任务中被抛出时,就会被handler捕获;
  • asyncMode:fork/join为每一个独立的工作线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即可以使用先进先出的工作模式,也可以使用后进先出的工作模式;

   Fork()和Join()

  fork/join框架中提供的fork()和join()是最重要的两个方法,它们和parallelism(”可并行任务数量“)配合工作,可以导致拆分的子任务T1.1、T1.2甚至TX在fork/join中不同的运行效果(上面1+2....+100的每次运行的子任务都是不同的)。即TX子任务或等待其他已存在的线程运行关联的子任务(sum操作),或在运行TX的线程中”递归“执行其他任务(将1-50进行拆分后的子任务递归运行),或启动一个新的线程执行子任务(运行1-50另一边拆分的任务,即50-100的子任务)。

  fork()用于将新创建的子任务放入当前线程的workQueue队列中,fork/join框架将根据当前正在并发执行ForkJoinTask任务的ForkJoinWorkerThread线程状态,决定是让这个任务在队列中等待,还是创建一个新的ForkJoinWorkedThread线程运行它,又或者是唤起其他正在等待任务的ForkJoinWorkerThread线程运行它。

  join()用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列workQueue中,则取出这个子任务进行”递归“执行,其目的是尽快得到当前子任务的运行结果,然后继续执行。

  提交任务:

  1.  sumbit的第一次提交:ForkJoinPool.submit(ForkJoinTask<T> task) -> externalPush(task) -> externalSubmit(task)

    1. submit:

      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
              if (task == null)
                  throw new NullPointerException();
              externalPush(task);
              return task;
          }
      
          public <T> ForkJoinTask<T> submit(Callable<T> task) {
              ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
              externalPush(job);
              return job;
          }
      
          public <T> ForkJoinTask<T> submit(Runnable task, T result) {
              ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
              externalPush(job);
              return job;
          }
      
          public ForkJoinTask<?> submit(Runnable task) {
              if (task == null)
                  throw new NullPointerException();
              ForkJoinTask<?> job;
              if (task instanceof ForkJoinTask<?>) / avoid re-wrap
                  job = (ForkJoinTask<?>) task;
              else
                  job = new ForkJoinTask.AdaptedRunnableAction(task);
              externalPush(job);
              return job;
          }
    2. externalPush:将任务添加到随机选取的队列中或新创建的队列中;
      final void externalPush(ForkJoinTask<?> task) {
              WorkQueue[] ws; WorkQueue q; int m;
              int r = ThreadLocalRandom.getProbe();/当前线程的一个随机数
              int rs = runState;/当前容器的状态
              /如果随机选取的队列还有空位置可以存放、队列加锁锁定成功,任务就放入队列中
              if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                  (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                  U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                  ForkJoinTask<?>[] a; int am, n, s;
                  if ((a = q.array) != null &&
                      (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                      int j = ((am & s) << ASHIFT) + ABASE;
                      U.putOrderedObject(a, j, task);/任务加入队列中
                      U.putOrderedInt(q, QTOP, s + 1);/挪动下次任务存放的槽的位置
                      U.putIntVolatile(q, QLOCK, 0);/队列解锁
                      if (n <= 1)/当前数组元素少时,进行唤醒当前线程;或者当没有活动线程或线程数较少时,添加新的线程
                          signalWork(ws, q);
                      return;
                  }
                  U.compareAndSwapInt(q, QLOCK, 1, 0);/队列解锁
              }
              externalSubmit(task);/升级版的externalPush
          }
      
      
          volatile int runState;               / lockable status锁定状态
          / runState: SHUTDOWN为负数,其他的为2的次幂
          private static final int  RSLOCK     = 1;
          private static final int  RSIGNAL    = 1 << 1;/唤醒
          private static final int  STARTED    = 1 << 2;/启动
          private static final int  STOP       = 1 << 29;/停止
          private static final int  TERMINATED = 1 << 30;/结束
          private static final int  SHUTDOWN   = 1 << 31;/关闭
    3. externalSubmit:队列添加任务失败,进行升级版操作,即创建队列数组和创建队列后,将任务放入新创建的队列中;
      private void externalSubmit(ForkJoinTask<?> task) {
          int r;                                    / initialize caller's probe
          if ((r = ThreadLocalRandom.getProbe()) == 0) {
              ThreadLocalRandom.localInit();
              r = ThreadLocalRandom.getProbe();
          }
          for (;;) {/自旋
              WorkQueue[] ws; WorkQueue q; int rs, m, k;
              boolean move = false;
              /**
              *ForkJoinPool执行器停止工作了,抛出异常
              *ForkJoinPool extends AbstractExecutorService
              *abstract class AbstractExecutorService implements ExecutorService
              *interface ExecutorService extends Executor
              *interface Executor执行提交的对象Runnable任务
              */
              if ((rs = runState) < 0) {
                  tryTerminate(false, false);    / help terminate
                  throw new RejectedExecutionException();
              }
              /第一次遍历,队列数组未创建,进行创建
              else if ((rs & STARTED) == 0 ||     / initialize初始化
                       ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                  int ns = 0;
                  rs = lockRunState();
                  try {
                      if ((rs & STARTED) == 0) {
                          U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                 new AtomicLong());
                          / create workQueues array with size a power of two
                          int p = config & SMASK; / ensure at least 2 slots,config是CPU核数
                          int n = (p > 1) ? p - 1 : 1;
                          n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                          n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                          workQueues = new WorkQueue[n];/创建
                          ns = STARTED;
                      }
                  } finally {
                      unlockRunState(rs, (rs & ~RSLOCK) | ns);
                  }
              }
              /第三次遍历,把任务放入队列中
              else if ((q = ws[k = r & m & SQMASK]) != null) {
                  if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                      ForkJoinTask<?>[] a = q.array;
                      int s = q.top;
                      boolean submitted = false; / initial submission or resizing
                      try {                      / locked version of push
                          if ((a != null && a.length > s + 1 - q.base) ||
                              (a = q.growArray()) != null) {
                              int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                              U.putOrderedObject(a, j, task);
                              U.putOrderedInt(q, QTOP, s + 1);
                              submitted = true;
                          }
                      } finally {
                          U.compareAndSwapInt(q, QLOCK, 1, 0);
                      }
                      if (submitted) {
                          signalWork(ws, q);
                          return;
                      }
                  }
                  move = true;                   / move on failure
              }
              /第二次遍历,队列数组为空,创建队列
              else if (((rs = runState) & RSLOCK) == 0) { / create new queue
                  q = new WorkQueue(this, null);
                  q.hint = r;
                  q.config = k | SHARED_QUEUE;
                  q.scanState = INACTIVE;
                  rs = lockRunState();           / publish index
                  if (rs > 0 &&  (ws = workQueues) != null &&
                      k < ws.length && ws[k] == null)
                      ws[k] = q;                 / else terminated
                  unlockRunState(rs, rs & ~RSLOCK);
              }
              else
                  move = true;                   / move if busy
              if (move)
                  r = ThreadLocalRandom.advanceProbe(r);
          }
      }
  2. fork任务切分的提交:ForkJoinTask.fork() -> ForkJoinWorkerThread.workQueue.push(task)/ForkJoinPool.common.externalPush(task) -> ForkJoinPool.push(task)/externalPush(task)

    1. fork:
      public final ForkJoinTask<V> fork() {
              Thread t;
              if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)/当前线程是workerThread,任务直接放入workerThread当前的workQueue
                  ((ForkJoinWorkerThread)t).workQueue.push(this);
              else
                  ForkJoinPool.common.externalPush(this);/将任务添加到随机选取的队列中或新创建的队列中
              return this;
          }
    2.  push:

      public class ForkJoinPool extends AbstractExecutorService {
              static final class WorkQueue {
                  final void push(ForkJoinTask<?> task) {
                      ForkJoinTask<?>[] a; ForkJoinPool p;
                      int b = base, s = top, n;
                      if ((a = array) != null) {    / ignore if queue removed,队列被移除忽略
                          int m = a.length - 1;     / fenced write for task visibility
                          U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);/任务加入队列中
                          U.putOrderedInt(this, QTOP, s + 1);/挪动下次任务存放的槽的位置
                          if ((n = s - b) <= 1) {/当前数组元素少时,进行唤醒当前线程;或者当没有活动线程或线程数较少时,添加新的线程
                              if ((p = pool) != null)
                                  p.signalWork(p.workQueues, this);
                          }
                          else if (n >= m)/数组所有元素都满了进行2倍扩容
                              growArray();
                      }
                  }
                  final ForkJoinTask<?>[] growArray() {
                      ForkJoinTask<?>[] oldA = array;
                      int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;/2倍扩容或初始化
                      if (size > MAXIMUM_QUEUE_CAPACITY)
                          throw new RejectedExecutionException("Queue capacity exceeded");
                      int oldMask, t, b;
                      ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
                      if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
                          (t = top) - (b = base) > 0) {
                          int mask = size - 1;
                          do { / emulate poll from old array, push to new array遍历从旧数组中取出放到新数组中
                              ForkJoinTask<?> x;
                              int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                              int j    = ((b &    mask) << ASHIFT) + ABASE;
                              x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);/从旧数组中取出
                              if (x != null &&
                                  U.compareAndSwapObject(oldA, oldj, x, null))/将旧数组取出的位置的对象置为null
                                  U.putObjectVolatile(a, j, x);/放入新数组
                          } while (++b != t);
                      }
                      return a;
                  }
              }
          }

  任务的消费

  任务的消费的执行链路是ForkJoinTask.doExec() -> RecursiveTask.exec()/RecursiveAction.exec() -> 覆盖重写的compute()

  1.  doExec:任务的执行入口

    final int doExec() {
            int s; boolean completed;
            if ((s = status) >= 0) {
                try {
                    completed = exec();/消费任务
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    s = setCompletion(NORMAL);/任务执行完设置状态为NORMAL,并唤醒其他等待任务
            }
            return s;
        }
        protected abstract boolean exec();
        private int setCompletion(int completion) {
            for (int s;;) {
                if ((s = status) < 0)
                    return s;
                if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {/任务状态修改为NORMAL
                    if ((s >>> 16) != 0)/状态不是SMASK
                        synchronized (this) { notifyAll(); }/唤醒其他等待任务
                    return completion;
                }
            }
        }
        /** The run status of this task 任务的运行状态*/
        volatile int status; / accessed directly by pool and workers由ForkJoinPool池或ForkJoinWorkerThread控制
        static final int DONE_MASK   = 0xf0000000;  / mask out non-completion bits
        static final int NORMAL      = 0xf0000000;  / must be negative
        static final int CANCELLED   = 0xc0000000;  / must be < NORMAL
        static final int EXCEPTIONAL = 0x80000000;  / must be < CANCELLED
        static final int SIGNAL      = 0x00010000;  / must be >= 1 << 16
        static final int SMASK       = 0x0000ffff;  / short bits for tags

  任务真正执行处理逻辑

  任务提交到ForkJoinPool,最终真正的是由继承Thread的ForkJoinWorkerThread的run方法来执行消费任务的,ForkJoinWorkerThread处理哪个任务是由join来出队的;

    1. ForkJoinTask.join()

          public final V join() {
              int s;
              if ((s = doJoin() & DONE_MASK) != NORMAL)
                  reportException(s);
              return getRawResult();/得到返回结果
          }
          private int doJoin() {
              int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
              /**
               * (s = status) < 0 判断任务是否已经完成,完成直接返回s
               * 任务未完成:
               *          1)线程是ForkJoinWorkerThread,tryUnpush任务出队然后消费任务doExec
               *              1.1)出队或消费失败,执行awaitJoin进行自旋,如果任务状态是完成就退出,否则继续尝试出队,直到任务完成或超时为止;
               *          2)如果线程不是ForkJoinWorkerThread,执行externalAwaitDone进行出队消费
               */
              return (s = status) < 0 ? s :
                  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                  (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                  tryUnpush(this) && (s = doExec()) < 0 ? s :
                  wt.pool.awaitJoin(w, this, 0L) :
                  externalAwaitDone();
          }
          private void reportException(int s) {
              if (s == CANCELLED)/取消
                  throw new CancellationException();
              if (s == EXCEPTIONAL)/异常
                  rethrow(getThrowableException());
          }
      1. awaitJoin:
            public class ForkJoinPool{
                final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
                    int s = 0;
                    if (task != null && w != null) {
                        ForkJoinTask<?> prevJoin = w.currentJoin;
                        U.putOrderedObject(w, QCURRENTJOIN, task);
                        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                            (CountedCompleter<?>)task : null;
                        for (;;) {
                            if ((s = task.status) < 0)/任务完成退出
                                break;
                            if (cc != null)/当前任务即将完成,检查是否还有其他的等待任务,如果有
                                /运行当前队列的其他任务,若当前的队列中没有任务了,则窃取其他队列的任务并运行
                                helpComplete(w, cc, 0);
                            /当前队列没有任务了,或遍历当前队列有没有任务,如果有且在top端取出来运行,或在队列中间使用EmptyTask替代原位置取出来运行,如果没有,执行helpStealer
                            else if (w.base == w.top || w.tryRemoveAndExec(task))
                                helpStealer(w, task);/窃取其他队列的任务
                            if ((s = task.status) < 0)
                                break;
                            long ms, ns;
                            if (deadline == 0L)
                                ms = 0L;
                            else if ((ns = deadline - System.nanoTime()) <= 0L)/超时退出
                                break;
                            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                                ms = 1L;
                            if (tryCompensate(w)) {/当前队列阻塞了
                                task.internalWait(ms);/进行等待
                                U.getAndAddLong(this, CTL, AC_UNIT);
                            }
                        }
                        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
                    }
                    return s;
                }
            }
      2. externalAwaitDone:
            private int externalAwaitDone() {
                /**
                *   当前任务是CountedCompleter
                *   1)是则执行ForkJoinPool.common.externalHelpComplete()
                *   2)否则执行ForkJoinPool.common.tryExternalUnpush(this)进行任务出队
                *       2.1)出队成功,进行doExec()消费,否则进行阻塞等待
                */
                int s = ((this instanceof CountedCompleter) ? / try helping
                         ForkJoinPool.common.externalHelpComplete(
                             (CountedCompleter<?>)this, 0) :
                         ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
                if (s >= 0 && (s = status) >= 0) {/任务未完成
                    boolean interrupted = false;
                    do {
                        if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {/任务状态标记为SIGNAL
                            synchronized (this) {
                                if (status >= 0) {
                                    try {
                                        wait(0L);/阻塞等待
                                    } catch (InterruptedException ie) {/有中断异常
                                        interrupted = true;/设置中断标识为true
                                    }
                                }
                                else
                                    notifyAll();/任务完成唤醒其他任务
                            }
                        }
                    } while ((s = status) >= 0);
                    if (interrupted)
                        Thread.currentThread().interrupt();/当前线程进行中断
                }
                return s;
            }
            final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
                WorkQueue[] ws; int n;
                int r = ThreadLocalRandom.getProbe();
                /没有任务直接结束,有任务则执行helpComplete
                /helpComplete:运行随机选取的队列的任务,若选取的队列中没有任务了,则窃取其他队列的任务并运行
                return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
                    helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
            } 
  1. run和工作窃取

  任务是由workThread来窃取的,workThread是一个线程。线程的所有逻辑都是由run()方法执行:

public class ForkJoinWorkerThread extends Thread {
    public void run() {
        if (workQueue.array == null) { / only run once
            Throwable exception = null;
            try {
                onStart();/初始化状态
                pool.runWorker(workQueue);/处理任务队列
            } catch (Throwable ex) {
                exception = ex;/记录异常
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);/注销工作线程
                }
            }
        }
    }
}
    public class ForkJoinPool{
        final void runWorker(WorkQueue w) {
            w.growArray();                   / allocate queue,队列初始化
            int seed = w.hint;               / initially holds randomization hint
            int r = (seed == 0) ? 1 : seed;  / avoid 0 for xorShift
            for (ForkJoinTask<?> t;;) {/自旋
                if ((t = scan(w, r)) != null)/从队列中窃取任务成功,scan()进行任务窃取
                    w.runTask(t);/执行任务,内部方法调用了doExec()进行任务的消费
                else if (!awaitWork(w, r))/队列没有任务了则结束
                    break;
                r ^= r << 13; r ^= r >>> 17; r ^= r << 5; / xorshift
            }
        }
    }
    1. scan:
      private ForkJoinTask<?> scan(WorkQueue w, int r) {
              WorkQueue[] ws; int m;
              if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
                  int ss = w.scanState;                     / initially non-negative
                  for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                      WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                      int b, n; long c;
                      if ((q = ws[k]) != null) {   /随机选中了非空队列 q
                          if ((n = (b = q.base) - q.top) < 0 &&
                              (a = q.array) != null) {      / non-empty
                              long i = (((a.length - 1) & b) << ASHIFT) + ABASE;  /从尾部出队,b是尾部下标
                              if ((t = ((ForkJoinTask<?>)
                                        U.getObjectVolatile(a, i))) != null &&
                                  q.base == b) {
                                  if (ss >= 0) {
                                      if (U.compareAndSwapObject(a, i, t, null)) { /利用cas出队
                                          q.base = b + 1;
                                          if (n < -1)       / signal others
                                              signalWork(ws, q);
                                          return t;  /出队成功,成功窃取一个任务!
                                      }
                                  }
                                  else if (oldSum == 0 &&   / try to activate 队列没有激活,尝试激活
                                           w.scanState < 0)
                                      tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                              }
                              if (ss < 0)                   / refresh
                                  ss = w.scanState;
                              r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                              origin = k = r & m;           / move and rescan
                              oldSum = checkSum = 0;
                              continue;
                          }
                          checkSum += b;
                      }
         /k = k + 1表示取下一个队列 如果(k + 1) & m == origin表示已经遍历完所有队列了 if ((k = (k + 1) & m) == origin) { / continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) / already inactive break; int ns = ss | INACTIVE; / try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; / hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; / back out } checkSum = 0; } } } return null; }
    2. ForkJoinPool.runTask:
              volatile int scanState;    / versioned, <0: inactive; odd:scanning,版本标记,小于0暂停,奇数进行扫描其他任务
              static final int SCANNING     = 1;             / false when running tasks,有任务执行是false
              /**
               * Executes the given task and any remaining local tasks.
               * 执行给定的任务和任何剩余的本地任务
               */
              final void runTask(ForkJoinTask<?> task) {
                  if (task != null) {
                      scanState &= ~SCANNING; / mark as busy,暂停扫描,当前有任务执行
                      (currentSteal = task).doExec();/执行窃取的任务
                      U.putOrderedObject(this, QCURRENTSTEAL, null); / release for GC,窃取的任务执行完置为null
                      execLocalTasks();/执行本地的任务,即自己workQueue的任务,调用doExec执行到workQueue空为止
                      ForkJoinWorkerThread thread = owner;
                      if (++nsteals < 0)      / collect on overflow,窃取计数溢出
                          transferStealCount(pool);/重置窃取计数
                      scanState |= SCANNING;/继续扫描队列
                      if (thread != null)
                          thread.afterTopLevelExec();
                  }
              }
              static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
                      @Override / to erase ThreadLocals,清除threadLocals
                      void afterTopLevelExec() {
                          eraseThreadLocals();
                      }
                      /**
                       * Erases ThreadLocals by nulling out Thread maps.
                       */
                      final void eraseThreadLocals() {
                          U.putObject(this, THREADLOCALS, null);/threadLocals置为null
                          U.putObject(this, INHERITABLETHREADLOCALS, null);/inheritablethreadlocals置为null
                      }
              } 

四.总结

  对于fork/join来说,在使用时还是存在下面的一些问题的:

  • 在使用JVM的时候我们要考虑OOM的问题,如果我们的任务处理时间非常耗时,并且处理的数据非常大的时候,会造成OOM;
  • ForkJoin是通过多线程的方式进行处理任务,那么我们不得不考虑是否应该使用ForkJoin。因为当数据量不是特别大的时候,我们没有必要使用ForkJoin。因为多线程会涉及到上下文的切换,所以数据量不大的时候使用串行比使用多线程快;
    • 项目中进行本地测试发现,业务层Service进行excel表数据(数据量几百)的复杂处理,进行单线程for循环统计消耗时间,然后与使用fork/join进行处理统计消耗时间,发现fork/join的消耗时间是单线程for的2倍;
 
posted @ 2020-05-11 18:11  老虎机支付宝充值码猿手  阅读(...)  评论(...老虎机支付宝充值编辑  收藏
申博手机投注登入 电子游戏支付宝充值 旧版申博开户直营网 申博代理有限公司登入 申博游戏下载登入 申博游戏网址
菲律宾申博娱乐官网 菲律宾申博开户登入 菲律宾太阳娱乐网址登入 www.sb61.com www.11msc.com 申博登录不了
www.tyc33.com 太阳城申博开户登入 菲律宾申博官网登入 菲律宾太阳成娱乐管理网 www.msc66.com 申博桌面版下载直营网