千家信息网

JAVA多线程怎么实现用户任务排队并预估排队时长

发表于:2024-11-24 作者:千家信息网编辑
千家信息网最后更新 2024年11月24日,这篇文章主要介绍"JAVA多线程怎么实现用户任务排队并预估排队时长",在日常操作中,相信很多人在JAVA多线程怎么实现用户任务排队并预估排队时长问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作
千家信息网最后更新 2024年11月24日JAVA多线程怎么实现用户任务排队并预估排队时长

这篇文章主要介绍"JAVA多线程怎么实现用户任务排队并预估排队时长",在日常操作中,相信很多人在JAVA多线程怎么实现用户任务排队并预估排队时长问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"JAVA多线程怎么实现用户任务排队并预估排队时长"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

实现流程

初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理。

假设初始化5个处理器,代码执行 BlockingQueue.take 时候,每次take都会处理器队列就会减少一个,当处理器队列为空时,take就是阻塞线程,当用户处理某某任务完成时候,调用资源释放接口,在处理器队列put 一个处理器对象,原来阻塞的take ,就继续执行。

排队论简介

排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,为运筹学的一个分支。我们下面对排队论做下简化处理,先看下图:

代码具体实现

任务队列初始化 TaskQueue

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;import org.springframework.stereotype.Component; import javax.annotation.PostConstruct;import java.util.Optional;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicInteger; /** * 初始化队列及线程池 * @author tarzan * */@Componentpublic class TaskQueue {    //处理器队列    public static BlockingQueue taskProcessors;    //等待任务队列    public static BlockingQueue waitTasks;    //处理任务队列    public static BlockingQueue executeTasks;    //线程池    public static ExecutorService exec;    //初始处理器数(计算机cpu可用线程数)    public static Integer processorNum=Runtime.getRuntime().availableProcessors();     /**     * 初始化处理器、等待任务、处理任务队列及线程池     */    @PostConstruct    public static void initEquipmentAndUsersQueue(){        exec = Executors.newCachedThreadPool();        taskProcessors =new LinkedBlockingQueue(processorNum);        //将空闲的设备放入设备队列中        setFreeDevices(processorNum);        waitTasks =new LinkedBlockingQueue();        executeTasks=new LinkedBlockingQueue(processorNum);    }      /**     * 将空闲的处理器放入处理器队列中     */    private static void setFreeDevices(int num) {        //获取可用的设备        for (int i = 0; i < num; i++) {            TaskProcessor dc=new TaskProcessor();            try {                taskProcessors.put(dc);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }       public static CompileTask getWaitTask(Long clazzId) {        return get(TaskQueue.waitTasks,clazzId);    }     public static CompileTask getExecuteTask(Long clazzId) {        return get(TaskQueue.executeTasks,clazzId);    }      private static CompileTask get(BlockingQueue users, Long clazzId) {        CompileTask compileTask =null;        if (CollectionUtils.isNotEmpty(users)){            Optional optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();            if(optional.isPresent()){                compileTask =  optional.get();            }        }        return compileTask;    }     public static Integer getSort(Long clazzId) {        AtomicInteger index = new AtomicInteger(-1);        BlockingQueue compileTasks = TaskQueue.waitTasks;        if (CollectionUtils.isNotEmpty(compileTasks)){            compileTasks.stream()                    .filter(e -> {                        index.getAndIncrement();                        return e.getClazzId().longValue() == clazzId.longValue();                    })                    .findFirst();        }        return index.get();    }     //单位秒    public static int estimatedTime(Long clazzId){        return  estimatedTime(60,getSort(clazzId)+1);    }     //单位秒    public static int estimatedTime(int cellMs,int num){         int a= (num-1)/processorNum;         int b= cellMs*(a+1);        return  b;    }

编译任务类 CompileTask

import lombok.Data;import org.springblade.core.tool.utils.SpringUtil;import org.springblade.gis.common.enums.DataScheduleEnum;import org.springblade.gis.dynamicds.service.DynamicDataSourceService;import org.springblade.gis.modules.feature.schedule.service.DataScheduleService; import java.util.Date;  @Datapublic class CompileTask implements Runnable {    //当前请求的线程对象    private Long clazzId;    //用户id    private Long userId;    //当前请求的线程对象    private Thread thread;    //绑定处理器    private TaskProcessor taskProcessor;    //任务状态    private Integer status;    //开始时间    private Date startTime;    //结束时间    private Date endTime;     private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);     private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);     @Override    public void run() {        compile();    }     /**     * 编译     */    public void compile() {        try {            //取出一个设备            TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();            //取出一个任务            CompileTask compileTask = TaskQueue.waitTasks.take();            //任务和设备绑定            compileTask.setTaskProcessor(taskProcessor);            //放入            TaskQueue.executeTasks.put(compileTask);            System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);            //切换用户数据源            dataSourceService.switchDataSource(userId);            //添加进度            dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());        } catch (InterruptedException e) {            System.err.println( e.getMessage());        }    } }

任务处理器 TaskProcessor

import lombok.Data; import java.util.Date; @Datapublic class TaskProcessor {     /**     * 释放     */    public  static Boolean release(CompileTask task)  {        Boolean flag=false;        Thread thread=task.getThread();        synchronized (thread) {            try {                if(null!=task.getTaskProcessor()){                    TaskQueue.taskProcessors.put(task.getTaskProcessor());                    TaskQueue.executeTasks.remove(task);                    task.setEndTime(new Date());                    long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();                    flag=true;                    System.out.println("用户"+task.getClazzId()+"耗时"+intervalMilli+"ms");                }            } catch (InterruptedException e) {                e.printStackTrace();            }            return flag;        }    } }

Controller控制器接口实现

import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import org.springblade.core.tool.api.R;import org.springblade.gis.multithread.TaskProcessor;import org.springblade.gis.multithread.TaskQueue;import org.springblade.gis.multithread.CompileTask;import org.springframework.web.bind.annotation.*; import java.util.Date;  @RestController@RequestMapping("task")@Api(value = "数据编译任务", tags = "数据编译任务")public class CompileTaskController {     @ApiOperation(value = "添加等待请求 @author Tarzan Liu")    @PostMapping("compile/{clazzId}")    public R compile(@PathVariable("clazzId") Long clazzId) {        CompileTask checkUser=TaskQueue.getWaitTask(clazzId);        if(checkUser!=null){            return  R.fail("已经正在排队!");        }        checkUser=TaskQueue.getExecuteTask(clazzId);        if(checkUser!=null){            return  R.fail("正在执行编译!");        }        //获取当前的线程        Thread thread=Thread.currentThread();        //创建当前的用户请求对象        CompileTask compileTask =new CompileTask();        compileTask.setThread(thread);        compileTask.setClazzId(clazzId);        compileTask.setStartTime(new Date());        //将当前用户请求对象放入队列中        try {            TaskQueue.waitTasks.put(compileTask);        } catch (InterruptedException e) {            e.printStackTrace();        }        TaskQueue.exec.execute(compileTask);        return R.data(TaskQueue.waitTasks.size()-1);    }     @ApiOperation(value = "查询当前任务前还有多少任务等待 @author Tarzan Liu")    @PostMapping("sort/{clazzId}")    public R sort(@PathVariable("clazzId") Long clazzId) {        return R.data(TaskQueue.getSort(clazzId));    }     @ApiOperation(value = "查询当前任务预估时长 @author Tarzan Liu")    @PostMapping("estimate/time/{clazzId}")    public R estimatedTime(@PathVariable("clazzId") Long clazzId) {        return R.data(TaskQueue.estimatedTime(clazzId));    }     @ApiOperation(value = "任务释放 @author Tarzan Liu")    @PostMapping("release/{clazzId}")    public R release(@PathVariable("clazzId") Long clazzId) {        CompileTask task=TaskQueue.getExecuteTask(clazzId);        if(task==null){            return  R.fail("资源释放异常");        }        return R.status(TaskProcessor.release(task));    }     @ApiOperation(value = "执行 @author Tarzan Liu")    @PostMapping("exec")    public R exec() {        Long start=System.currentTimeMillis();        for (Long i = 1L; i < 100; i++) {            compile(i);        }        System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms");        return R.status(true);    }}

接口测试

根据任务id查询该任务前还有多少个任务待执行

根据任务id查询该任务预估执行完成的剩余时间,单位秒

补充知识

BlockingQueue

BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

阻塞与非阻塞

入队

offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞

put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞

offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞

被唤醒

等待时间超时

当前线程被中断

出队

poll():如果没有元素,直接返回null;如果有元素,出队

take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞

poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:

被唤醒

等待时间超时

当前线程被中断

到此,关于"JAVA多线程怎么实现用户任务排队并预估排队时长"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

任务 队列 线程 处理 处理器 用户 阻塞 时间 时长 对象 接口 设备 编译 元素 学习 查询 单位 数据 理论 系统 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 c 判断数据库是否存在 徐州游戏软件开发公司 软件开发团队管理类书籍推荐 方舟端游不用服务器可以联机吗 2022年网络安全工作计划 西安邮电大学数据库应用报告 萧山税务局网络安全领导小组 上海琰瑢互联网科技有限公司 淘宝客软件开发视频教程 中国物种信息数据库 第三张电子商务网络技术 博雅数据库2020浙江 汉字王软件开发 sll证书位置在服务器那里 心动网络安全工程师 服务器为什么这么便宜又好用 软件开发免征营业税 与web服务器安全有关的 微信自带浏览器带数据库 网络安全会议纪要 红线意识 河南第三方软件开发多少钱 应用宝怎么看服务器 洛阳市商通网络技术有限公司 如何将电脑作为家用服务器 修改数据库的引擎 投资项目数据库app 增设网络技术等培训内容 轻量应用服务器控制台查看及管理 网络安全应该怎么设置 B2B电子商务从网络技术
0