博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java 消息队列-Java并发编程 阻塞队列
阅读量:4599 次
发布时间:2019-06-09

本文共 3681 字,大约阅读时间需要 12 分钟。

自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:  ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。  LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。  PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。  DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

功能说明:由于部署阿里云的SLS日志服务,写了个日志接口,考虑到系统日志量提交有可能会阻塞,这里引入了线程池(生存者与消费者模式)来维护日志队列的发送。一个发送消息模块将消息发送到消息队列中,无需等待返回结果,发送模块继续执行其他任务。消息队列中的指令由线程池中的线程来处理。使用一个Queue来存放线程池溢出时的任务

 

public class ThreadPoolManager {		private static ThreadPoolManager tpm = new ThreadPoolManager(); 		// 线程池维护线程的最少数量 	private final static int CORE_POOL_SIZE = 4; 		// 线程池维护线程的最大数量 	private final static int MAX_POOL_SIZE = 10; 		// 线程池维护线程所允许的空闲时间 	private final static int KEEP_ALIVE_TIME = 0; 		// 线程池所使用的缓冲队列大小	private final static int WORK_QUEUE_SIZE = 10; 		// 消息缓冲队列 	Queue
msgQueue = new LinkedList
(); // 访问消息缓存的调度线程 // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中 final Runnable accessBufferThread = new Runnable() { @Override public void run() { if(hasMoreAcquire()){ String msg = ( String ) msgQueue.poll(); Runnable task = new AccessDBThread( msg ); threadPool.execute( task ); } } }; final RejectedExecutionHandler handler = new RejectedExecutionHandler(){ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行"); msgQueue.offer((( AccessDBThread ) r ).getMsg() ); } }; // 管理数据库访问的线程池 @SuppressWarnings({ "rawtypes", "unchecked" }) final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler); // 调度线程池 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 100 ); @SuppressWarnings("rawtypes") final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBufferThread, 0, 1, TimeUnit.SECONDS); public static ThreadPoolManager newInstance() { return tpm; } private ThreadPoolManager(){} private boolean hasMoreAcquire(){ return !msgQueue.isEmpty(); } public void addLogMsg( String msg ) { Runnable task = new AccessDBThread( msg ); threadPool.execute( task ); }}

  


//线程池中工作的线程public class AccessDBThread implements Runnable {	private String msg; 		public AccessDBThread() {		super();	}	public AccessDBThread(String msg) {		this.msg = msg;	}	public String getMsg() {		return msg;	}	public void setMsg(String msg) {		this.msg = msg;	}	@Override	public void run() {				// 向数据库中添加Msg变量值 		System.out.println("Added the message: "+msg+" into the Database"); 	}}

//TestDriver.java是一个驱动测试,sendMsg方法不间断的向ThreadPoolManager发送数据public class TestDriver {	ThreadPoolManager tpm = ThreadPoolManager.newInstance(); 		public void sendMsg( String msg ) {				tpm.addLogMsg( msg + "记录一条日志 " ); 	}		public static void main(String[] args) {		for(int i=0;i<1000;i++){			new TestDriver().sendMsg( Integer.toString( i ) ); 		}				//new TestDriver().sendMsg("发起一条对象" ); 	}}

 

参考资料:

http://www.cnblogs.com/dolphin0520/p/3958019.html

http://www.cnblogs.com/dolphin0520/p/3932906.html
http://www.360doc.com/content/11/1126/19/1332348_167587893.shtml
http://www.360doc.com/content/06/0812/16/8473_179457.shtml
http://ifeve.com/java-blocking-queue/

 

转载于:https://www.cnblogs.com/molao-doing/articles/4445045.html

你可能感兴趣的文章
链式队列
查看>>
mapreduce 多种输入
查看>>
从JDBC到commons-dbutils
查看>>
数据访问:Implementing Efficient Transactions
查看>>
csapp读书笔记
查看>>
Android 框架揭秘 --读书笔记
查看>>
对于Dubbo一些面试题自己的答案
查看>>
In Place Algorithm
查看>>
iOS真机调试步骤
查看>>
[国嵌攻略][169][嵌入式播放器移植]
查看>>
vue-v-for
查看>>
易游数据劫持获取和易游取数据工具
查看>>
剑指offer——最小的K个数
查看>>
python将PNG格式的图片转化成为jpg
查看>>
C++学习笔记-const
查看>>
C学习笔记-小程序(长期更新)
查看>>
关于listView 中的聚焦问题
查看>>
Shiro学习(总结)
查看>>
C指针解析 ------ 指针的算术运算
查看>>
rabbitmq安装与高可用集群配置
查看>>