文章目錄
- 學習目标
- 1. canal
- 1.1 canal簡介
- 1.2 環境部署
- 1.2.1 mysql開啟binlog模式
- 1.2.2 canal服務端安裝配置
- 1.2.3 canal常見錯誤處理
- 1.3 資料監控微服務
- 2. 首頁廣告緩存更新
- 2.1 需求分析
- 2.2 實作思路
- 2.3 代碼實作
- 2.3.1 發送消息到mq
- 2.3.2 從mq中提取消息執行更新
- 3. 商品上架索引庫導入資料
- 3.1 需求分析
- 3.2 實作思路
- 3.3 代碼實作
- 3.3.1 配置rabbitMQ
- 3.3.2 發送上架商品id到Mq
- 3.3.3 索引庫環境準備
- 3.3.4 搜尋微服務搭建
- 3.3.5 建立索引庫結構
- 3.3.6 建立ES操作的Dao接口
- 3.3.7 搜尋微服務批量導入資料邏輯
- 3.3.8 根據spuId導入索引庫資料
- 3.3.9 接收mq消息執行導入
- 4. 商品下架索引庫删除資料
- 4.1 需求分析
- 4.2 實作思路
學習目标
- 能夠完成canal環境的搭建與資料監控微服務的開發
- 能夠完成首頁廣告緩存更新的功能,掌握OkHttpClient的基本使用方法
- 能夠完成商品上架索引庫導入資料功能,能夠畫出流程圖和說出實作思路
- 能夠完成商品下架索引庫删除資料功能,能夠畫出流程圖和說出實作思路
該工程使用lua+nginx+rabbitmq+redis等技術的第一個主要目的是實作輪播圖的讀取,具體圖解:
其中nginx的作用包括了從資料庫中查詢資料,也包括了将資料庫中的資料更新到redis緩存當中去。
1. canal
1.1 canal簡介
canal可以用來監控資料庫資料的變化,進而獲得新增資料,或者修改的資料。
canal是應阿裡巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。
阿裡系公司開始逐漸的嘗試基于資料庫的日志解析,擷取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。
原理相對比較簡單:
- canal模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql master發送dump協定
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始為byte流)
1.2 環境部署
1.2.1 mysql開啟binlog模式
(1)檢視目前mysql是否開啟binlog模式。
SHOW VARIABLES LIKE '%log_bin%'
- 1
如果log_bin的值為OFF是未開啟,為ON是已開啟。
(2)修改/etc/my.cnf 需要開啟binlog模式。
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
修改完成之後,重新開機mysqld的服務。
(3) 進入mysql
mysql -h localhost -u root -p
(4)建立賬号 用于測試使用
使用root賬号建立使用者并授予權限
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
1.2.2 canal服務端安裝配置
(1)下載下傳位址canal
https://github.com/alibaba/canal/releases/tag/canal-1.0.24
(2)下載下傳之後 上傳到linux系統中,解壓縮到指定的目錄/usr/local/canal
解壓縮之後的目錄結構如下:
(3)修改 exmaple下的執行個體配置
vi conf/example/instance.properties
修改如圖所示的幾個參數。
一定要注釋掉下面這個參數,這樣就會掃描全庫
#canal.instance.defaultDatabaseName =
(3)啟動服務:
[root@localhost canal]# ./bin/startup.sh
(4)檢視日志:
cat /usr/local/canal/logs/canal/canal.log
這樣就表示啟動成功了。
在暢購項目實行的過程中使用的是docker容器化部署了canal。
1.2.3 canal常見錯誤處理
錯誤資訊如下:
2019-06-17 19:35:20.918 [New I/O server worker #1-2] ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x7f2e9be3, /192.168.200.56:52225 => /192.168.200.128:11111], exception=java.io.IOException: Connection reset by peer
解決辦法:
進入mysql中執行下面語句檢視binlog所在位置
mysql> show master status;
顯示如下:
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 | 120 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
如果file中binlog檔案不為 mysql-bin.000001 可以重置mysql
mysql> reset master;
檢視canal配置檔案
vim usr/local/canal/conf/example/meta.dat
找到對應的binlog資訊更改一緻即可, 或者删除這個meta.dat也可以.
"journalName":"mysql-bin.000001","position":43581207,"
1.3 資料監控微服務
當使用者執行資料庫的操作的時候,binlog 日志會被canal捕獲到,并解析出資料。我們就可以将解析出來的資料進行相應的邏輯處理。
我們這裡使用的一個開源的項目,它實作了springboot與canal的內建。比原生的canal更加優雅。
https://github.com/chenqian56131/spring-boot-starter-canal
使用前需要将starter-canal安裝到本地倉庫。
我們可以參照它提供的canal-test,進行代碼實作。
(1)建立工程子產品changgou_canal,pom引入依賴
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
(2)建立包com.changgou.canal ,包下建立啟動類
@SpringBootApplication
@EnableCanalClient
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
}
(3)添加配置檔案application.properties
canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
(4)建立com.changgou.canal.listener包,包下建立類
@CanalEventListener
public class BusinessListener {
@ListenPoint(schema = "changgou_business", table = {"tb_ad"})
public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("廣告資料發生變化");
rowData.getBeforeColumnsList().forEach((c) -> System.err.println("更改前資料: " + c.getName() + " :: " + c.getValue()));
rowData.getAfterColumnsList().forEach((c) -> System.err.println("更改後資料: " + c.getName() + " :: " + c.getValue()));
}
}
測試:啟動資料監控微服務,修改changgou_business的tb_ad表,觀察控制台輸出。
2. 首頁廣告緩存更新
2.1 需求分析
當tb_ad(廣告)表的資料發生變化時,更新redis中的廣告資料。
2.2 實作思路
(1)修改資料監控微服務,監控tb_ad表,當發生增删改操作時,提取position值(廣告位置key),發送到rabbitmq
(2)從rabbitmq中提取消息,通過OkHttpClient調用ad_update來實作對廣告緩存資料的更新。
2.3 代碼實作
2.3.1 發送消息到mq
(1)在rabbitmq管理背景建立隊列 ad_update_queue ,用于接收廣告更新通知
(2)引入rabbitmq起步依賴
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
(3)配置檔案application.properties 添加内容
spring.rabbitmq.host=192.168.200.128
(4)修改BusinessListener類
@CanalEventListener
public class BusinessListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@ListenPoint(schema = "changgou_business", table = {"tb_ad"})
public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("廣告資料發生變化");
//修改前資料
for(CanalEntry.Column column: rowData.getBeforeColumnsList()) {
if(column.getName().equals("position")){
System.out.println("發送消息到mq ad_update_queue:"+column.getValue());
rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //發送消息到mq
break;
}
}
//修改後資料
for(CanalEntry.Column column: rowData.getAfterColumnsList()) {
if(column.getName().equals("position")){
System.out.println("發送消息到mq ad_update_queue:"+column.getValue());
rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //發送消息到mq
break;
}
}
}
}
(5)測試,運作資料監控微服務canal,新增、修改或删除tb_ad表資料,修改後觀察控制台輸出和rabbitmq管理界面中ad_update_queue是否接收到消息
2.3.2 從mq中提取消息執行更新
(1)changgou_service_business工程pom.xml引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.9.0</version>
</dependency>
(2)在spring節點下添加rabbitmq配置
spring:
rabbitmq:
host: 192.168.200.128
(3)com.changgou.business包下建立listener包,包下建立類
@Component
@RabbitListener(queues = "ad_update_queue")
public class AdListener {
/**
* 擷取更新廣告通知
* @param message
*/
@RabbitHandler
public void updateAd(String message){
System.out.println("接收到消息:"+message);
String url = "http://192.168.200.128/ad_update?position="+message;
OkHttpClient okHttpClient = new OkHttpClient();
final Request request = new Request.Builder()
.url(url)
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
e.printStackTrace();//顯示錯誤資訊
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("調用成功"+response.message());
}
});
}
}
(4)測試,啟動eureka和business微服務,觀察控制台輸出和資料同步效果。
3. 商品上架索引庫導入資料
3.1 需求分析
商品上架将商品的sku清單導入或更新索引庫。
3.2 實作思路
(1)當調用商品微服務中商品上架方法時候, 則商品微服務連接配接mysql資料庫根據SPU的主鍵ID更改SPU表中is_marketable狀态字段的值為1(已上架).
(2)在rabbitmq管理背景建立商品上架交換器(Exchanges)。使用分列模式(fanout)的交換器是考慮商品上架會有很多種邏輯需要處理,導入索引庫隻是其中一項,另外還有商品詳細頁靜态化等操作。這樣我們可以建立導入索引庫的隊列和商品詳細頁靜态化隊列并與商品上架交換器進行綁定。
(3)商品微服務将需要上架的SPU的主鍵ID發送給rabbitMq的商品上架交換器, 交換器則将資料根據路由規則發給對應的索引庫上架隊列和靜态頁上架隊列.
(4) 搜尋微服務從rabbitmq的索引庫上架隊列中提取spu的id,通過feign調用商品微服務得到sku的清單,并且通過調用elasticsearch的進階restAPI 将sku清單導入到索引庫。
(5) 靜态頁微服務從rabbitmq的靜态頁上架隊列中提取SPU的id, 通過feign調用商品微服務得到sku, 分類, 品牌, spu等各種資料資訊, 然後根據模闆生成靜态化頁面.
3.3 代碼實作
3.3.1 配置rabbitMQ
(1)在rabbitmq背景建立交換器goods_up_exchange(類型為fanout),建立隊列search_add_queue綁定交換器goods_up_exchange
a. 添加交換器
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-qJX0abSk-1584274503577)(images/6-1.png)]
b. 添加搜尋上架隊列
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-p2ak2lFW-1584274503577)(images/6-2.png)]
c. 添加靜态頁上架隊列
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ig0H5I3B-1584274503577)(images/6-111.png)]
d. 交換器中綁定搜尋上架隊列和靜态頁上架隊列
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-5BjYB56q-1584274503577)(images/6-3.png)]
3.3.2 發送上架商品id到Mq
(1) changgou_service_goods工程引入依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2) changgou_service_goods工程的application.yml中添加rabbitmq配置
spring:
rabbitmq:
host: 192.168.200.128
(3) changgou_service_goods工程的SpuServiceImpl中更改方法
@Autowired
private RabbitMessagingTemplate rabbitTemplate;
@Override
public void put(String id) {
/**
* 1. 更改資料庫中的上架狀态
*/
Spu spu = spuMapper.selectByPrimaryKey(id);
if(!spu.getStatus().equals("1")){
throw new RuntimeException("未通過稽核的商品不能上架!");
}
spu.setIsMarketable("1");//上架狀态
spuMapper.updateByPrimaryKeySelective(spu);
/**
* 2. 将資料發送到rabbitmq中
*/
rabbitTemplate.convertAndSend("goods_up_exchange","",id);
}
3.3.3 索引庫環境準備
(1)elasticsearch 6.5.2安裝
(2)ik中文分詞器安裝
(3)kibana-6.5.2 安裝
3.3.4 搜尋微服務搭建
(1)建立changgou_service_search子產品,pom.xml引入依賴
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_goods_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_search_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)changgou_service_search的application.yml
server:
port: 9009
spring:
application:
name: search
rabbitmq:
host: 192.168.200.128
redis:
host: 192.168.200.128
main:
allow-bean-definition-overriding: true #當遇到同樣名字的時候,是否允許覆寫注冊
data:
elasticsearch:
cluster-name: elasticsearch
cluster-nodes: 192.168.200.128:9300
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:6868/eureka
instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true
client:
config:
default: #配置全局的feign的調用逾時時間 如果 有指定的服務配置 預設的配置不會生效
connectTimeout: 600000 # 指定的是 消費者 連接配接服務提供者的連接配接逾時時間 是否能連接配接 機關是毫秒
readTimeout: 600000 # 指定的是調用服務提供者的 服務 的逾時時間() 機關是毫秒
#hystrix 配置
hystrix:
command:
default:
execution:
timeout:
#如果enabled設定為false,則請求逾時交給ribbon控制
enabled: false
isolation:
strategy: SEMAPHORE
elasticSearch的配置是我們自己定義的,後邊的連接配接工廠類會用到
(3)建立com.changgou包,包下建立SearchApplication
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients(basePackages = "com.itheima.feign")
public class SearchApplication {
public static void main(String[] args) {
SpringApplication.run(SearchApplication.class);
}
}
3.3.5 建立索引庫結構
- 在changgou_service_api項目下建立changgou_service_search_api項目
- pom.xml檔案引入依賴
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
- 建立包com.changgou.pojo
- 建立和ES索引庫映射實體類SkuInfo.java
@Document(indexName = "skuinfo", type = "docs")
public class SkuInfo implements Serializable {
//商品id,同時也是商品編号
@Id
@Field(index = true, store = true, type = FieldType.Keyword)
private Long id;
//SKU名稱
@Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart")
private String name;
//商品價格,機關為:元
@Field(index = true, store = true, type = FieldType.Double)
private Long price;
//庫存數量
@Field(index = true, store = true, type = FieldType.Integer)
private Integer num;
//商品圖檔
@Field(index = false, store = true, type = FieldType.Text)
private String image;
//商品狀态,1-正常,2-下架,3-删除
@Field(index = true, store = true, type = FieldType.Keyword)
private String status;
//建立時間
private Date createTime;
//更新時間
private Date updateTime;
//是否預設
@Field(index = true, store = true, type = FieldType.Keyword)
private String isDefault;
//SPUID
@Field(index = true, store = true, type = FieldType.Long)
private Long spuId;
//類目ID
@Field(index = true, store = true, type = FieldType.Long)
private Long categoryId;
//類目名稱
@Field(index = true, store = true,type = FieldType.Keyword)
private String categoryName;
//品牌名稱
@Field(index = true, store = true,type = FieldType.Keyword)
private String brandName;
//規格
private String spec;
//規格參數
private Map<String, Object> specMap;
......get和set方法......
3.3.6 建立ES操作的Dao接口
public interface SearchMapper extends ElasticsearchRepository<SkuInfo,Long> {
}
- 1
- 2
- 3
3.3.7 搜尋微服務批量導入資料邏輯
(1) changgou_service_goods_api建立com.changgou.feign 包,包下建立接口
@FeignClient(name="goods")
@RequestMapping("/sku")
public interface SkuFeign {
/***
* 多條件搜尋品牌資料
* @param searchMap
* @return
*/
@GetMapping(value = "/search" )
public Result findList(@RequestParam Map searchMap);
}
(2)changgou_service_search項目下建立 com.changgou.search.service包包下建立接口EsManagerService
public interface EsManagerService {
/**
* 建立索引庫結構
*/
public void createIndexAndMapping();
/**
* 根據spuid導入資料到ES索引庫
* @param spuId 商品id
*/
public void importDataToESBySpuId(String spuId);
/**
* 導入全部資料到ES索引庫
*/
public void importAll();
}
(2)建立com.changgou.search.service包,包下建立服務實作類
@Service
public class EsManagerServiceImpl implements EsManagerService {
@Autowired
private SearchMapper searchMapper;
@Autowired
private SkuFeign skuFeign;
@Autowired
private ElasticsearchTemplate esTemplate;
/**
* 建立索引庫結構
*/
@Override
public void createIndexAndMapping() {
//建立索引
esTemplate.createIndex(SkuInfo.class);
//建立映射
esTemplate.putMapping(SkuInfo.class);
}
/**
* 根據spuid導入資料到ES索引庫
* @param spuId 商品id
*/
@Override
public void importDataToESBySpuId(String spuId) {
List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);
List<SkuInfo> skuInfos = JSON.parseArray(JSON.toJSONString(skuList), SkuInfo.class);
for (SkuInfo skuInfo : skuInfos) {
skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class));
}
searchMapper.saveAll(skuInfos);
}
/**
* 導入全部資料到ES索引庫
*/
@Override
public void importAll() {
Map paramMap = new HashMap();
paramMap.put("status", "1");
Result result = skuFeign.findList(paramMap);
List<SkuInfo> skuInfos = JSON.parseArray(JSON.toJSONString(result.getData()), SkuInfo.class);
for (SkuInfo skuInfo : skuInfos) {
skuInfo.setPrice(skuInfo.getPrice());
skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class));
}
searchMapper.saveAll(skuInfos);
}
}
3.3.8 根據spuId導入索引庫資料
(1)changgou_service_search項目下, 建立com.changgou.controller包, 包下建立SearchController
@RestController
@RequestMapping("/sku_search")
public class SearchController {
@Autowired
private EsManagerService esManagerService;
@Autowired
private SearchService searchService;
@GetMapping("/createIndexAndMapping")
public Result createIndexAndMapping() {
esManagerService.createIndexAndMapping();
return new Result(true, StatusCode.OK, "建立成功");
}
/**
* 導入所有稽核通過的庫存資料到ES索引庫
* @return
*/
@GetMapping("/importAll")
public Result importAllDataToES() {
esManagerService.importAll();
return new Result(true, StatusCode.OK, "導入資料成功!");
}
/**
* 全文檢索
* @return
*/
@GetMapping
public Map search(@RequestParam Map<String, String> paramMap) throws Exception {
Map resultMap = searchService.search(paramMap);
return resultMap;
}
}
3.3.9 接收mq消息執行導入
(1)changgou_service_search工程的pom.xml檔案中引入依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)changgou_service_search工程建立com.changgou.listener包,包下建立類
@Component
@RabbitListener(queues = "search_add_queue")
public class SpuAddListener {
@Autowired
private EsManagerService esManagerService;
@RabbitHandler
public void addDataToES(String spuId) {
System.out.println("===接收到需要商品上架的spuId為======" + spuId);
esManagerService.importDataToESBySpuId(spuId);
}
}
測試:
注意: 測試前将ES中現有的SkuInfo索引庫删除幹淨
(1)啟動環境 eureka 、elasticsearch 、canal服務端、canal資料監控微服務、rabbitmq
(2)啟動商品微服務、搜尋微服務
(3)先通路 http://localhost:9009/sku_search/createIndexAndMapping 建立索引庫結構
( 4 ) 修改tb_spu某記錄的is_marketable值為1,觀察控制台輸出,啟動kibana查詢記錄是否導入成功
4. 商品下架索引庫删除資料
4.1 需求分析
商品下架後将商品從索引庫中移除。
4.2 實作思路
與商品上架的實作思路非常類似。
(1)當管理者操作商品微服務, 調用下架操作時, 首先更新mysql的SPU表中的is_marketable狀态為0(下架)。
(2)在rabbitmq管理背景建立商品下架交換器(Exchanges)。使用分列模式(Fanout)的交換器是考慮商品下架會有很多種邏輯需要處理,索引庫删除資料隻是其中一項,另外還有删除商品詳細頁等操作。
(3)搜尋微服務從rabbitmq的的隊列中提取spu的id,通過調用elasticsearch的進階restAPI 将相關的sku清單從索引庫删除。