天天看點

Spring 多線程

Spring 通過任務執行器(TaskExecutor)來實作多線程和并發程式設計。使用ThreadPoolTaskExecutor可以實作一個基于線程池的TaskExecutor。而實際上開發中的任務一般是非阻塞的,即異步的,是以我們要在配置類中通過@EnableAsync注解來開啟對異步任務的支援,并通過在實際執行的Bean的方法上使用@Async注解來聲明一個異步任務。

配置類:

package com.chenfeng.xiaolyuh.thread.config;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration // 聲明目前類是一個配置類,相當于Spring配置的XML檔案
@ComponentScan(basePackages={"com.chenfeng.xiaolyuh.thread"})
@EnableAsync// 利用@EnableAsync注解開啟異步任務的支援
// 配置類實作AsyncConfigurer接口并重寫getAsyncExecutor方法,并傳回ThreadPoolTaskExecutor,這樣我們就獲得了一個基于線程池TaskExecutor
public class ThreadConfig implements AsyncConfigurer {

  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    // 核心線程數 
    taskExecutor.setCorePoolSize(5); 
    // 最大線程數
    taskExecutor.setMaxPoolSize(50); 
    // 隊列最大長度
    taskExecutor.setQueueCapacity(1000);
    // 線程池維護線程所允許的空閑時間(機關秒)
    taskExecutor.setKeepAliveSeconds(120);
    // 線程池對拒絕任務(無線程可用)的處理政策 ThreadPoolExecutor.CallerRunsPolicy政策 ,調用者的線程會執行該任務,如果執行器已關閉,則丢棄.
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    
    taskExecutor.initialize();
    return taskExecutor;
  }

  @Bean
  public Executor getThreadPool() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(1000);
    taskExecutor.setQueueCapacity(1000);
    taskExecutor.initialize();
    return taskExecutor;
  }

  @Override
  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return null;
  }
    
}      

任務執行類:

package com.chenfeng.xiaolyuh.thread.service;

import java.util.concurrent.Executor;

import javax.annotation.Resource;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 多線程的測試類
 * @ClassName DemoELService
 * @author yuhao.wang
 * @Date 2017年3月10日 下午3:41:18
 * @version 1.0.0
 */
@Service
public class DemoThreadService {
  
  @Resource(name = "getThreadPool")
  private Executor executor;
  
  @Async
  // 通過@Async注解方法表名這個方法是一個異步方法,如果注解在類級别,則表名該類的所有方法都是異步的,
  // 而這裡的方法自動被注入使用ThreadPoolTaskExecutor作為TaskExecutor
  public void executeAsyncTask(Integer i) {
    System.out.println("執行異步任務:" + i);
  }
  
  public void executeAsyncTaskPlus(Integer i) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        System.out.println("執行異步任務+1:" + i);
      }
    });
  }
}      
package com.chenfeng.xiaolyuh.test;

import org.junit.After;
import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.chenfeng.xiaolyuh.thread.config.ThreadConfig;
import com.chenfeng.xiaolyuh.thread.service.DemoThreadService;

/**
 * Created by yuhao.wang on 2017/3/9.
 */
public class SpringThreadTest {
  AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ThreadConfig.class);

  @Test
  public void contextTest() {
    DemoThreadService demoThreadService = context.getBean(DemoThreadService.class);
    for (int i = 0; i < 1000; i++) {
      demoThreadService.executeAsyncTaskPlus(i);
      demoThreadService.executeAsyncTask(i);
    }
  }

  @After
  public void closeContext() {
    context.close();
  }

}      

繼續閱讀