`
snake1987
  • 浏览: 71919 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java并发学习之二:线程池(三)

    博客分类:
  • java
阅读更多
没找到什么好工具,也没得到好建议,好技巧,只能自己摸索了

一步步优化把

先是优化测试代码

加的内容不多,但挺关键的
测试要模拟的是一个平稳的任务下发过程(如同现实生活中的),所以一下子将所有任务丢到队列中是很不对的,而且数量一大,堆就满了
经过多次测试,决定通过让主线程每发送n个,就休息一段时间来降低主线程的下发频率
以达到平稳的目的

这样设置之后,再适当调一下,就可以让cpu利用率达到一个相对稳定的值了,从而可以更好地通过工具进行观察
public class TestThreadPoolTest1 {


	public static void main(String[] args) throws InterruptedException
	{
		//testEasyRunnableThreadPool(new ThreadPoolTest1(10), 1000000, 10);//22553768552

		//testEasyRunnableThreadPool(Executors.newFixedThreadPool(10), 1000000, 10);//2946477652
		
		testEasyRunnableThreadPool(new ThreadPoolTest2(8), 10000000, 8);
	}
	
	/**
	 * 一个产生随机数的方法,防止jvm优化
	 * @param seed
	 * @return
	 */
	static int getRandomNum(int seed)
	{
		seed ^= (seed << 6);
		seed ^= (seed >>> 21);
		seed ^= (seed << 7);
		return seed;
	}
	
	/**
	 * 任务是执行一个简单的计算,只占用cpu,没有io和其他阻塞的方法
	 */
	static void testEasyRunnableThreadPool(Executor pool,int tryTime,int threadNum) throws InterruptedException
	{
		final AtomicInteger count = new AtomicInteger(0);
		
		//construct runnable
		Runnable command = new Runnable() {			
			public void run() {
				final int addTime = 10000;
				long sum = 0;
				int temp = this.hashCode() ^ (int)System.currentTimeMillis();
				for(int i = 0;i<addTime;i++)
				{
					sum += (temp = getRandomNum(temp));					
				}
			}
		};		
		testThreadPool(tryTime, pool, command);
	}
	
	static void testThreadPool(int tryNum,Executor pool,final Runnable command) throws InterruptedException
	{
		final CountDownLatch latch = new CountDownLatch(tryNum);
		final Random r = new Random();
		Runnable wrapper = new Runnable() {			
			public void run() {
				command.run();
				//想测试并发,在并发中加入适当的同步操作是无法避免的,只能减少
				//,在这,只是做了一个简单的countdown,影响不大
				latch.countDown();
			}
		};
		long startTime = System.nanoTime();
		//加一层循环
		for(int j = 0;j<100000;j++)
		{
			for(int i = 0;i<tryNum/100000;i++)
			{
				pool.execute(wrapper);							
			}
			//主线程休息
			Thread.sleep(0, r.nextInt(50));
		}
		long dispatchTime = System.nanoTime();
		//加一个全部任务全部下发完的时间,与最终时间做对比
		System.out.println(dispatchTime - startTime);
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime-startTime);
	}

}



然后是线程池代码

基本的改动是:
1.去掉了预启动线程
2.将wait,notify改为了用LockSupport,这样少了一个synchronized的消耗
3.将threadExecute方法改为了非阻塞
4.将command置为volatile
5.理顺了由busyThreadsNum的改变过程和改变的含义
public class ThreadPoolTest2 implements Executor {
	//等待队列
	Queue<Runnable> waitingQueue = null;
	
	ConcurrentLinkedQueue<ThreadNode> freeThread;
	//相当于一个freeThread的状态,根据状态决定行为,原则上将freeThread.size()+busyThreadsNum=MAXTHREADNUM
	private AtomicInteger busyThreadsNum = new AtomicInteger(0);
	//最大线程数
	final int MAXTHREADNUM;
	
	public ThreadPoolTest2 (int threadNum)
	{
		this.MAXTHREADNUM = threadNum;
		init(MAXTHREADNUM,new ConcurrentLinkedQueue<Runnable>());
	}	

	private void init(int threadNum,ConcurrentLinkedQueue<Runnable> queue)
	{
		freeThread = new ConcurrentLinkedQueue<ThreadNode>();
		waitingQueue = queue;		
	}
	
	//去掉了synchronized
	private void threadExecute(Runnable command)
	{
		for(;;)
		{
			//得到开始的值
			int expect = busyThreadsNum.get();
			//由于else中先执行了busyThreadsNum.incrementAndGet();才添加,所以会出现
			//busyThreadsNum为MAXTHREADNUM,但实际的运行数量少于busyThreadsNum,少1,但这种
			//现象是本应如此的
			if(expect == MAXTHREADNUM)
			{
				waitingQueue.add(command);
				return;
			}
			else
			{
				busyThreadsNum.incrementAndGet();
				ThreadNode t = freeThread.poll();
				if(t == null)
				{
					t = new ThreadNode();
					t.setCommand(command);
					t.start();
					return;
				}
				t.setCommand(command);				
				LockSupport.unpark(t);
			}
		}
	}
	
	private class ThreadNode extends Thread
	{
		//加了线程保证可见性,因为根据处理逻辑,只会有一个线程进行修改,所以只需要保证可见性就可以了
		//同时,由于之前用的不是LockSupport,所以wait或者await还有notify或者signal都是需要加锁的
		//之前没加volatile,是由于使用了额外的同步(在wait,notify之前用了synchronized),所以
		//相当于加入了一个synchronized with关系,所以command变为可见了(setCommand操作发生在同步之前)
		volatile Runnable command = null;
		Exception e = null;
		
		public ThreadNode() {
			
		}
		
		Exception getException()
		{
			return e;
		}
		
		void setCommand(Runnable c)
		{
			command = c;
		}
		
		
		@Override
		public void run() {
			try {
				for(;;)
				{
					if(command == null)
					{					
						ThreadPoolTest2.this.waitThread(this);
						continue;
					}
					command.run();				
					command = ThreadPoolTest2.this.getCommand();			
				}
			}catch (InterruptedException e) {
			}
		}
	}
	
	Runnable getCommand() throws InterruptedException
	{
		return waitingQueue.poll();
	}
	
	void waitThread(ThreadNode t) throws InterruptedException
	{
		//先往队列中放,再减少busyThreadsNum
		//因为即使busyThreadsNum已经为MAXTHREADNUM了,但其实队列中有空闲的线程
		//这也是允许的,造成的问题最严重不过往等待任务队列里面添加了不该等待,而是马上执行的任务
		//而且只是瞬间的
		freeThread.offer(t);
		busyThreadsNum.getAndDecrement();
		LockSupport.park(t);
	}
	
	protected void beforeExecute()
	{
		
	}

	public void execute(Runnable command) {
		beforeExecute();		
		threadExecute(command);
		afterExecute();

	}
	
	protected void afterExecute()
	{
		
	}
}

1
0
分享到:
评论
2 楼 snake1987 2011-04-12  
leeing.org 写道
偶然路过,发现你的文章很不错!支持一下!

谢谢
1 楼 leeing.org 2011-04-09  
偶然路过,发现你的文章很不错!支持一下!

相关推荐

Global site tag (gtag.js) - Google Analytics