
本文作者:微笑面對生活
上一章節講了基本的整合和各種Exchange的使用,這章主要來實作一個單機的簡單的搶票系統,麻雀雖小但五髒俱全,為什麼用它做搶票系統大家應該也懂,為了削峰和異步處理。
在這個項目裡我用的是
springboot
的2版本,ORM選用
JPA
快速開發,JSON工具使用阿裡的
fastjson
,當然,mq用的是
rabbitMQ
。導入的是
springboot
內建的依賴。
1. 配置部分
1.1 pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
1.2 application.properties
server.port=10000
spring.datasource.url=jdbc:mysql://xxxxx/xxxxx?characterEncoding=utf-8
spring.datasource.username=xxx
spring.datasource.password=xxxx
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.jpa.properties.hibernate.hbm2ddl.auto=update
spring.jpa.show-sql=true
spring.rabbitmq.host=localhost
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.port=5672
我隻是很有針對性的對
mq
和
datasource
進行了配置。
1.3 資料表
create table if not result
(
id int auto_increment primary key,
ticket_id int null,
user_id int null
);
create table if not exists ticket
(
id int auto_increment primary key,
name varchar(255) null,
content varchar(255) null,
user_name varchar(20) null,
count int default '6666' not null
);
根據資料表可以Generate出JavaBean,不貼JavaBean了。
1.4 項目架構
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── fantj
│ │ │ └── springbootjpa
│ │ │ ├── AMQP.java
│ │ │ ├── controller
│ │ │ │ └── TicketController.java
│ │ │ ├── mq
│ │ │ │ ├── Message.java
│ │ │ │ ├── MQConstants.java
│ │ │ │ ├── MQReceiver.java
│ │ │ │ └── MQSender.java
│ │ │ ├── pojo
│ │ │ │ ├── Result.java
│ │ │ │ └── Ticket.java
│ │ │ ├── repostory
│ │ │ │ ├── ResultRepository.java
│ │ │ │ └── TicketRepository.java
│ │ │ └── service
│ │ │ ├── ResultServiceImpl.java
│ │ │ ├── ResultService.java
│ │ │ ├── TicketServiceImpl.java
│ │ │ └── TicketService.java
│ │ └── resources
│ │ ├── application.properties
│ │ └── rebel.xml
2. 啟動類
@SpringBootApplication
@EntityScan("com.fantj.springbootjpa.pojo")
@EnableRabbit
public class AMQP {
public static void main(String[] args) {
SpringApplication.run(AMQP.class, args);
}
}
注意這個
@EnableRabbit
注解,它會開啟對rabbit注解的支援。
3. controller
很簡單的一個controller類,實作查詢和搶票功能。
@RestController
@RequestMapping("/ticket")
public class TicketController {
@Autowired
private TicketService ticketService;
@Autowired
private MQSender mqSender;
@RequestMapping("/get/{id}")
public Ticket getByid(@PathVariable Integer id){
return ticketService.findById(id);
}
@RequestMapping("/reduce/{id}/{userId}")
public String reduceCount(@PathVariable Integer id,
@PathVariable Integer userId){
Message message = new Message(id,userId);
ticketService.reduceCount(id);
mqSender.sendMessage(new Message(message.getTicketId(),message.getUserId()));
return "搶票成功!";
}
}
注意
private MQSender mqSender;
這是我的
rabbit
發送消息的類。
4. Service
接口我就不再這裡貼出,直接貼實作類。
4.1 ResultServiceImpl.java
@Service
public class ResultServiceImpl implements ResultService{
@Autowired
private ResultRepository resultRepository;
@Override
public void add(Result result) {
resultRepository.add(result.getTicketId(), result.getUserId());
}
@Override
public Result findOneByUserId(Integer userId) {
return resultRepository.findByUserId(userId);
}
}
4.2 TicketServiceImpl.java
@Service
public class TicketServiceImpl implements TicketService{
@Autowired
private TicketRepository repository;
@Override
public Ticket findById(Integer id) {
return repository.findTicketById(id);
}
@Override
public Ticket reduceCount(Integer id) {
repository.reduceCount(id);
return findById(id);
}
}
這兩個都是很普通的service實作類,沒有新加入的東西。
5. Dao
5.1 ResultRepository.java
@Repository
public interface ResultRepository extends JpaRepository<Result,Integer> {
@Transactional
@Modifying
@Query(value = "insert into result(ticket_id,user_id) values(?1,?2) ",nativeQuery = true)
void add(@Param("ticketId") Integer ticketId,@Param("userId") Integer userId);
Result findByUserId(Integer userId);
}
5.2 TicketRepository.java
@Repository
public interface TicketRepository extends JpaRepository<Ticket,Integer>{
/**
* 減少庫存
*/
@Transactional
@Modifying
@Query(value = "update ticket t set t.count=t.count+(-1) where id=?1",nativeQuery = true)
int reduceCount(Integer id);
/**
* 查詢資訊
*/
Ticket findTicketById(Integer id);
}
到了這裡,你會發現,md哪裡有用mq的痕迹…
6. MQ
剩下的全是mq的處理。
6.1 Message.java
這個類用來封裝mq傳輸的消息對象,我們使用它來對傳輸的byte進行編解碼,得到我們想要的資料。
@Data
public class Message implements Serializable {
private Integer ticketId;
private Integer userId;
public Message() {
}
public Message(Integer ticketId, Integer userId) {
this.ticketId = ticketId;
this.userId = userId;
}
}
6.2 MQConstants.java
這是一個常量類,用來定義和儲存 queue
的名字,雖然裡面隻有一個常量,好習慣要從小事做起。
public class MQConstants {
public static final String QUEUE= "qiangpiao";
}
6.3 MQSender.java
這是消息發送類,用來給queue發送資料。
@Service
@Slf4j
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(Message message){
String msg = JSONObject.toJSONString(message);
log.info("send message : "+msg);
amqpTemplate.convertAndSend(MQConstants.QUEUE,msg);
}
}
AmqpTemplate
是springboot架構提供給我們使用的amqp操作模闆,利用它我們能更友善的調用和處理業務。
我們在Controller層調用它,來完成消息入隊的操作,完成削峰和異步處理,大大增加了系統并發和強健性。
6.4 MQReceiver.java
這是消息接收類,用來從queue裡擷取資料。
@Service
@Slf4j
public class MQReceiver {
@Autowired
private TicketService ticketService;
@Autowired
private ResultService resultService;
@RabbitListener(queues = MQConstants.QUEUE)
public void receive(String message){
log.info("receive msg : "+message);
JSONObject jsonObject = JSONObject.parseObject(message);
System.out.println(jsonObject);
Message msg = JSONObject.toJavaObject(jsonObject, Message.class);
Integer ticketId = msg.getTicketId();
Integer userId = msg.getUserId();
// 減庫存
Ticket ticket = ticketService.reduceCount(ticketId);
if (ticket.getCount() <= 0){
return;
}
// 判斷是否已經搶過
Result oneByUserId = resultService.findOneByUserId(userId);
if (oneByUserId != null){
return;
}
resultService.add(new Result(ticketId,userId));
}
}
在這個類中,
@RabbitListener(queues = MQConstants.QUEUE)
标記的是監聽方法,該方法會從queue中擷取到String資料。
之後我們需要将其複原為JavaBean,取出我們該要的屬性,繼續處理業務:
查詢票剩餘量
->
判斷是否已搶到過
->
減庫存
->
增加搶票資料
。 (我這裡寫的有點草率,應該先查餘量…,不過不重要,本章重點在過一遍springboot與rabbitmq的整合)。
運作效果
我對該搶票功能做了一個9999請求,我本來做3k并發,電腦沒那麼多句柄,實作不了,最後做了1k并發的壓測。
這是rabbitMQ 自帶Managerment模闆上的截圖:
壓測報告:
Server Software:
Server Hostname: 127.0.0.1
Server Port: 10000
Document Path: /ticket/reduce/1/10
Document Length: 13 bytes
Concurrency Level: 1000
Time taken for tests: 423.101 seconds
Complete requests: 9999
Failed requests: 0
Total transferred: 1459854 bytes
HTML transferred: 129987 bytes
Requests per second: 23.63 [#/sec] (mean)
Time per request: 42314.334 [ms] (mean)
Time per request: 42.314 [ms] (mean, across all concurrent requests)
Transfer rate: 3.37 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 2 6.8 0 29
Processing: 217 40197 7390.7 41984 58488
Waiting: 217 40197 7390.8 41984 58488
Total: 246 40199 7384.8 41985 58488
Percentage of the requests served within a certain time (ms)
50% 41984
66% 42670
75% 42744
80% 42758
90% 42801
95% 42828
98% 42850
99% 42868
100% 58488 (longest request)
注意
- 本項目沒有考慮線程安全的問題,事實上線程是不安全的,線程安全問題後面會說。
- 本項目隻是為了mq的削峰和異步處理,最直覺的就是資料庫可以稱住高并發,一般來講,資料庫連接配接這塊是稱不住的。
- mq在分布式下的問題後面會說。