让我们一起了解如何自定义异步并行处理框架 原创

2018-11-06 16:43

我们这一节教程,就为大家介绍如何自定义异步并行处理框架,希望大家能够好好学。

代码说明:任务有工作组组成,每个工作组有自己的任务线程,每个工作组有完成后的处理接口,所有工作组完成后有处理接口

1、CompletableFuture相当于一个容器,容器里面运行的是工作组,通过new关键字创建容器

2、为容器添加工作组,工作组有容器的getWorkGroup()方法获取,

3、为工作组添加任务:workGroup.addTask,这里添加任务有三种方式,第一种:直接通过jqueryCallable添加。第二种:通过WorkGroupTask添加,第三种:mysql通过WorkGroupForAround添加任务,这种方式支持前后通知,可以自定义任务完成前后需要处理的事情。

4、CompletableInterface接口是每个工作组或容器执行完后的汇总接口,

completable是容器完成任务后的方法,
workerCompletable是工作组完成任务的方法,也是方法之一。
下面的例子是演示了一个加法的运算,通过创建三个工作组,每个工作组创建三个任务,每个任务计算不同长度的加法,然后每个工作组完成后对该工作组内的加法做汇总,当所有工作组完成后,
容器再对所有工作组内的结果再汇总。


package com.sgcc.sgd5000.utils;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ScheduledExecutorService;public final class CompletableFuture<T>{        private int seconds=60;        public static void main(String[] args) {
            CompletableInterface comp=new CompletableFuture.CompletableInterface<BigDecimal>() {
                @Override                public void completable(List<List<BigDecimal>> resultSet) {
                }
                @Override                public void workerCompletable(List<BigDecimal> resultSet) {
                    BigDecimal total=new BigDecimal(0);                    for(BigDecimal set:resultSet){                        for(BigDecimal i:resultSet){
                            total=total.add(i);
                        }
                    }
                    resultSet.clear();
                    resultSet.add(total);
                }
            };
            CompletableFuture futur=new CompletableFuture<Integer>();
            CompletableFuture.WorkGroup workGroup = futur.getWorkGroup();
            workGroup.addWorkCompletableInterface(comp);
            
            CompletableFuture.WorkGroup workGroup1 = futur.getWorkGroup();
            workGroup1.addWorkCompletableInterface(comp);
            
            CompletableFuture.WorkGroup workGroup2 = futur.getWorkGroup();
            workGroup2.addWorkCompletableInterface(comp);
            
            workGroup.addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=0;i<300000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            }).addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=300000000;i<600000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            }).addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=600000000;i<800000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            });
            workGroup.addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=800000000;i<1000000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            });
            
            
            workGroup1.addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=1000000000;i<1300000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            });
            workGroup1.addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=1300000000;i<1600000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            });
            workGroup1.addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=1600000000;i<1800000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            });
            workGroup1.addTask(new Callable<BigDecimal>() {
                @Override                public BigDecimal call() throws Exception {
                    BigDecimal b=new BigDecimal(0);                    for(int i=1800000000;i<2000000000;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            });
            workGroup2.addTaskInteface(new WorkGroupTask<BigDecimal>() {
                @Override                public BigDecimal call() {
                    BigDecimal b=new BigDecimal(0);                    for(int i=9;i<11;i++){
                        b=b.add(new BigDecimal(i));
                    }                    return b;
                }
            });
            futur.addWorkGroup(workGroup).addWorkGroup(workGroup1);
            futur.awaits(1000);
            futur.forkJoin(new CompletableFuture.CompletableInterface<BigDecimal>() {
                @Override                public void completable(List<List<BigDecimal>> resultSet) {
                    BigDecimal total=new BigDecimal(0);                    for(List<BigDecimal> set:resultSet){                        for(BigDecimal i:set){
                            total=total.add(i);
                        }
                    }
                    System.out.println(total);
                }

                @Override                public void workerCompletable(List<BigDecimal> resultSet) {                    // TODO Auto-generated method stub                    
                }
            });
        }        public interface CompletableInterface<T>{            public void completable(List<List<T>> resultSet);            public void workerCompletable(List<T> resultSet);
            
        }        public ScheduledExecutorService schedulePool = Executors.newScheduledThreadPool(3);        public ScheduledExecutorService sch = Executors.newScheduledThreadPool(6);
        List<WorkGroup<T>> workGroups=new ArrayList<WorkGroup<T>>();        
        public void awaits(int seconds){            this.seconds=seconds;
            invoke();
        }        public CompletableFuture<T> addWorkGroup(WorkGroup<T> w){
            workGroups.add(w);            return this;
        }        
        private void invoke(){            for(WorkGroup<T> group:workGroups){
                schedulePool.execute(group);
            }
        }        
        class ListenerTaskFinshed extends TimerTask{            public int time=0;
            @Override            public void run() {
                time++;
            }            public int getTime(){                return time;
            }
        }        
        
        public <T> void forkJoin(CompletableInterface<T> completableInterface){            final Timer timer=new Timer();            try {
                ListenerTaskFinshed listener=this.new ListenerTaskFinshed();
                timer.schedule(listener, 0, 1000);                int workGroupsCount=0;                do{                    for(WorkGroup<?> group:workGroups){                        if(group.getFinshed()){
                            workGroupsCount++;
                            group.setFinshed(false);
                        }                        if(workGroupsCount ==workGroups.size()){                            break;
                        }
                    }
                }while(workGroupsCount <workGroups.size()&& listener.getTime()<seconds);
                timer.cancel();                if(workGroupsCount ==workGroups.size()){
                    List<List<T>> rset=new ArrayList<List<T>>();                    for(WorkGroup group:workGroups){
                        List<T> rList=group.getFutures();
                        rset.add(rList);
                    }
                    
                    completableInterface.completable(rset);
                }
            } catch (Exception e) {
                e.printStackTrace();
                schedulePool.shutdownNow();
                sch.shutdownNow();
            }finally{
                timer.cancel();
                sch.shutdown();
                schedulePool.shutdown();
            }
        }        public <T> WorkGroup<T> getWorkGroup(){            return this.new WorkGroup<T>(){};
        }        
        
        public interface WorkGroupTask<T>{            public T call();
        }        
        public abstract class WorkGroupForAround<T> implements WorkGroupTask<T>{            public abstract void beforeCall();            public final T runTask(){
                beforeCall();
                T t=call();
                afterCall(t);                return t;
            };            public abstract void afterCall(T result);
        }        
        //工作组
        public abstract class  WorkGroup<T> implements Runnable{            private List<Callable<T>> taskGroup=new ArrayList<Callable<T>>();            private List<T> listFuture;            private boolean finshed=false;            private CompletableInterface workCompletableInterface;            public WorkGroup<T> addTask(Callable<T> call){
                taskGroup.add(call);                return this;
            }            
            public WorkGroup<T> addTaskInteface(final WorkGroupTask<T> task){
                taskGroup.add(new Callable<T>() {
                    @Override                    public T call() throws Exception {                        // TODO Auto-generated method stub
                        return task.call();
                    }
                });                return this;
            }            
            public WorkGroup<T> addTaskInteface(final WorkGroupForAround<T> task){
                taskGroup.add(new Callable<T>() {
                    @Override                    public T call() throws Exception {                        // TODO Auto-generated method stub
                        return task.runTask();
                    }
                });                return this;
            }            
            
            public void addWorkCompletableInterface(CompletableInterface workInterface){
                workCompletableInterface=workInterface;
            }
            @Override            public void run() {                try {                    if(taskGroup.size() ==0){
                        listFuture=new ArrayList<T>();
                        setFinshed(true);                        return ;
                    }
                    List<Future<T>> reList=sch.invokeAll(taskGroup);
                    listFuture=new ArrayList<T>();                    for(Future<T> fu:reList){
                        listFuture.add(fu.get());
                    }                    
                    if(null != workCompletableInterface){
                        workCompletableInterface.workerCompletable(listFuture);
                    }
                    setFinshed(true);
                } catch (Exception e) {                    // TODO: handle exception                }
            
            }            
            public synchronized void setFinshed(boolean bl){
                finshed=bl;
            }            public synchronized boolean getFinshed(){                return finshed;
            }            public List<T> getFutures(){                return listFuture;
            }
        }
}
小编结语:相信大家在看完了这一段代码之后,都知道如何自定义异步并行处理了吧,感谢大家的支持。

 原文出处:550


 版权声明:原创作品,允许转载,转载时请务必以超链接形式标明文章原始出处、作者信息和本声明,否则将追究法律责任。https://m.blog.kokojia.com/love15200922/b-1974.html

阅读 11108 / 评论 1

 相关视频教程更多课程