让我们一起了解如何自定义异步并行处理框架 原创
2018-11-06 16:43
我们这一节教程,就为大家介绍如何自定义异步并行处理框架,希望大家能够好好学。
代码说明:任务有工作组组成,每个工作组有自己的任务线程,每个工作组有完成后的处理接口,所有工作组完成后有处理接口
1、CompletableFuture相当于一个容器,容器里面运行的是工作组,通过new关键字创建容器
2、为容器添加工作组,工作组有容器的getWorkGroup()方法获取,
3、为工作组添加任务:workGroup.addTask,这里添加任务有三种方式,第一种:直接通过jquery,Callable添加。第二种:通过WorkGroupTask添加,第三种:mysql通过WorkGroupForAround添加任务,这种方式支持前后通知,可以自定义任务完成前后需要处理的事情。
4、CompletableInterface接口是每个工作组或容器执行完后的汇总接口,
completable是容器完成任务后的方法,
workerCompletable是工作组完成任务的方法,也是方法之一。 下面的例子是演示了一个加法的运算,通过创建三个工作组,每个工作组创建三个任务,每个任务计算不同长度的加法,然后每个工作组完成后对该工作组内的加法做汇总,当所有工作组完成后, 容器再对所有工作组内的结果再汇总。
package com.sgcc.sgd5000.utils;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ScheduledExecutorService;public final class CompletableFuture<T>{ private int seconds=60; public static void main(String[] args) {
CompletableInterface comp=new CompletableFuture.CompletableInterface<BigDecimal>() {
@Override public void completable(List<List<BigDecimal>> resultSet) {
}
@Override public void workerCompletable(List<BigDecimal> resultSet) {
BigDecimal total=new BigDecimal(0); for(BigDecimal set:resultSet){ for(BigDecimal i:resultSet){
total=total.add(i);
}
}
resultSet.clear();
resultSet.add(total);
}
};
CompletableFuture futur=new CompletableFuture<Integer>();
CompletableFuture.WorkGroup workGroup = futur.getWorkGroup();
workGroup.addWorkCompletableInterface(comp);
CompletableFuture.WorkGroup workGroup1 = futur.getWorkGroup();
workGroup1.addWorkCompletableInterface(comp);
CompletableFuture.WorkGroup workGroup2 = futur.getWorkGroup();
workGroup2.addWorkCompletableInterface(comp);
workGroup.addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=0;i<300000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
}).addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=300000000;i<600000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
}).addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=600000000;i<800000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
});
workGroup.addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=800000000;i<1000000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
});
workGroup1.addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=1000000000;i<1300000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
});
workGroup1.addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=1300000000;i<1600000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
});
workGroup1.addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=1600000000;i<1800000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
});
workGroup1.addTask(new Callable<BigDecimal>() {
@Override public BigDecimal call() throws Exception {
BigDecimal b=new BigDecimal(0); for(int i=1800000000;i<2000000000;i++){
b=b.add(new BigDecimal(i));
} return b;
}
});
workGroup2.addTaskInteface(new WorkGroupTask<BigDecimal>() {
@Override public BigDecimal call() {
BigDecimal b=new BigDecimal(0); for(int i=9;i<11;i++){
b=b.add(new BigDecimal(i));
} return b;
}
});
futur.addWorkGroup(workGroup).addWorkGroup(workGroup1);
futur.awaits(1000);
futur.forkJoin(new CompletableFuture.CompletableInterface<BigDecimal>() {
@Override public void completable(List<List<BigDecimal>> resultSet) {
BigDecimal total=new BigDecimal(0); for(List<BigDecimal> set:resultSet){ for(BigDecimal i:set){
total=total.add(i);
}
}
System.out.println(total);
}
@Override public void workerCompletable(List<BigDecimal> resultSet) { // TODO Auto-generated method stub
}
});
} public interface CompletableInterface<T>{ public void completable(List<List<T>> resultSet); public void workerCompletable(List<T> resultSet);
} public ScheduledExecutorService schedulePool = Executors.newScheduledThreadPool(3); public ScheduledExecutorService sch = Executors.newScheduledThreadPool(6);
List<WorkGroup<T>> workGroups=new ArrayList<WorkGroup<T>>();
public void awaits(int seconds){ this.seconds=seconds;
invoke();
} public CompletableFuture<T> addWorkGroup(WorkGroup<T> w){
workGroups.add(w); return this;
}
private void invoke(){ for(WorkGroup<T> group:workGroups){
schedulePool.execute(group);
}
}
class ListenerTaskFinshed extends TimerTask{ public int time=0;
@Override public void run() {
time++;
} public int getTime(){ return time;
}
}
public <T> void forkJoin(CompletableInterface<T> completableInterface){ final Timer timer=new Timer(); try {
ListenerTaskFinshed listener=this.new ListenerTaskFinshed();
timer.schedule(listener, 0, 1000); int workGroupsCount=0; do{ for(WorkGroup<?> group:workGroups){ if(group.getFinshed()){
workGroupsCount++;
group.setFinshed(false);
} if(workGroupsCount ==workGroups.size()){ break;
}
}
}while(workGroupsCount <workGroups.size()&& listener.getTime()<seconds);
timer.cancel(); if(workGroupsCount ==workGroups.size()){
List<List<T>> rset=new ArrayList<List<T>>(); for(WorkGroup group:workGroups){
List<T> rList=group.getFutures();
rset.add(rList);
}
completableInterface.completable(rset);
}
} catch (Exception e) {
e.printStackTrace();
schedulePool.shutdownNow();
sch.shutdownNow();
}finally{
timer.cancel();
sch.shutdown();
schedulePool.shutdown();
}
} public <T> WorkGroup<T> getWorkGroup(){ return this.new WorkGroup<T>(){};
}
public interface WorkGroupTask<T>{ public T call();
}
public abstract class WorkGroupForAround<T> implements WorkGroupTask<T>{ public abstract void beforeCall(); public final T runTask(){
beforeCall();
T t=call();
afterCall(t); return t;
}; public abstract void afterCall(T result);
}
//工作组
public abstract class WorkGroup<T> implements Runnable{ private List<Callable<T>> taskGroup=new ArrayList<Callable<T>>(); private List<T> listFuture; private boolean finshed=false; private CompletableInterface workCompletableInterface; public WorkGroup<T> addTask(Callable<T> call){
taskGroup.add(call); return this;
}
public WorkGroup<T> addTaskInteface(final WorkGroupTask<T> task){
taskGroup.add(new Callable<T>() {
@Override public T call() throws Exception { // TODO Auto-generated method stub
return task.call();
}
}); return this;
}
public WorkGroup<T> addTaskInteface(final WorkGroupForAround<T> task){
taskGroup.add(new Callable<T>() {
@Override public T call() throws Exception { // TODO Auto-generated method stub
return task.runTask();
}
}); return this;
}
public void addWorkCompletableInterface(CompletableInterface workInterface){
workCompletableInterface=workInterface;
}
@Override public void run() { try { if(taskGroup.size() ==0){
listFuture=new ArrayList<T>();
setFinshed(true); return ;
}
List<Future<T>> reList=sch.invokeAll(taskGroup);
listFuture=new ArrayList<T>(); for(Future<T> fu:reList){
listFuture.add(fu.get());
}
if(null != workCompletableInterface){
workCompletableInterface.workerCompletable(listFuture);
}
setFinshed(true);
} catch (Exception e) { // TODO: handle exception }
}
public synchronized void setFinshed(boolean bl){
finshed=bl;
} public synchronized boolean getFinshed(){ return finshed;
} public List<T> getFutures(){ return listFuture;
}
}
}
小编结语:相信大家在看完了这一段代码之后,都知道如何自定义异步并行处理了吧,感谢大家的支持。原文出处:550
版权声明:原创作品,允许转载,转载时请务必以超链接形式标明文章原始出处、作者信息和本声明,否则将追究法律责任。https://m.blog.kokojia.com/love15200922/b-1974.html
阅读 11339 / 评论 1
- 上一篇: 了解Go中函数特性简介






