天天看點

一個并行搜尋算法

抄自《實戰Java高并發程式設計》

package understanding;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

//并行查找算法
public class ParallelSearch {

	//要找的數組
	static int[] arr = {1,2,4,5,6,3,4,5,6,7,8};
	
	//cachedThreadPool,按需建立線程池,有就用,沒有就建立,過了一段時間還空閑就撤銷線程
	//Creates a thread pool that creates new threads as needed, 
	//but will reuse previously constructed threads when they are available. 
	//These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. 
	//Calls to execute will reuse previously constructed threads if available. 
	//If no existing thread is available, a new thread will be created and added to the pool. 
	//Threads that have not been used for sixty seconds are terminated and removed from the cache. 
	//Thus, a pool that remains idle for long enough will not consume any resources. 
	//Note that pools with similar properties but different details (for example, timeout parameters) 
	//may be created using ThreadPoolExecutor constructors.
	static ExecutorService pool = Executors.newCachedThreadPool();
	static final int threadNum = 2;
	
	//存放下标,表示要找的值在這個位置
	static AtomicInteger result = new AtomicInteger(-1);
	
	static class SearchTask implements Callable<Integer>{

		//分别表示開始下标,結束下标,要搜尋的值
		int begin,end,searchValue;
		
		//構造方法
		SearchTask(int b,int e,int searchValue){
			this.begin = b;
			this.end = e;
			this.searchValue = searchValue;
		}
		
		@Override
		public Integer call() throws Exception {
			return search(begin, end, searchValue);
		}
		
	}

	//搜尋任務要調用的搜尋方法,這個方法是關鍵。
	public static int search(int b,int e,int val){
		
		for(int i=b;i<e;i++){
			if(result.get()>=0){	///已經找到結果了,無需再找,直接傳回
				return result.get();
			}
			//沒有找到,找找看
			if(arr[i]==val){	//找到了,利用CAS操作設定result的值
				if(!result.compareAndSet(-1, i)){  //設定失敗,表示原來的值已經不是-1,說明已經找到了一個結果
					return result.get();
				}
				return i;	//設定成功,傳回i
			}
		}
		
		return -1;
	}
	
	public static int pSearch(int searchValue) throws InterruptedException, ExecutionException{
		int subArrSize = arr.length/threadNum+1;
		List<Future<Integer>> re = new ArrayList<Future<Integer>>();
		for(int i=0;i<arr.length;i+=subArrSize){
			int end = i+subArrSize;
			if(end>=arr.length)
				end = arr.length;
			re.add(pool.submit(new SearchTask(i, end, searchValue)));
		}
		for(Future<Integer> futrue:re){
			if(futrue.get()>=0){
				return futrue.get();
			}
		}
		return -1;
	}
	
	public static void main(String[] args) {
		try {
			int res = pSearch(3);
			System.out.println(res);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
	}

}