Redis一直是網絡生态系統的重要組成部分,它經常用作緩存、消息代理或簡單地用作資料存儲。
在這篇文章中,我們将示範如何在一個Rust web應用程式中使用Redis。
我們将探索兩種種使用Redis的方法:
- 使用同步連接配接池
- 使用異步連接配接池
對于同步池,我們使用基于r2d2庫的r2d2-redis。我們在異步解決方案中使用mobc,還有許多其他異步連接配接池,如deadpool和bb8,它們都以類似的方式工作。
話不多說,讓我們開始吧!
建立一個項目:
cargo new rust-redis-web-example
在Cargo.toml中加入依賴:
[dependencies]
tokio = { version = "1.19", features = ["full"] }
warp = "0.3.2"
redis = "0.21"
r2d2_redis = "0.14"
mobc-redis = "0.7"
mobc = "0.7"
thiserror = "1.0"
首先,讓我們設定一些共享類型,在main.rs中:
type WebResult= std::result::Result;
type Result= std::result::Result;
const REDIS_CON_STRING: &str = "redis://127.0.0.1/";
定義這兩個Result類型是為了節省一些輸入,并表示内部Errors (Result)和外部Errors (WebResult)。
接下來,定義這個内部error類型并為其實作Reject,以便它可以處理從程式傳回的HTTP錯誤。
#[derive(Error, Debug)]
pub enum Error {
#[error("mobc error: {0}")]
MobcError(#[from] MobcError),
#[error("r2d2 error: {0}")]
R2D2Error(#[from] R2D2Error),
}
#[derive(Error, Debug)]
pub enum MobcError {
#[error("could not get redis connection from pool : {0}")]
RedisPoolError(mobc::Error),
#[error("error parsing string from redis result: {0}")]
RedisTypeError(mobc_redis::redis::RedisError),
#[error("error executing redis command: {0}")]
RedisCMDError(mobc_redis::redis::RedisError),
#[error("error creating Redis client: {0}")]
RedisClientError(mobc_redis::redis::RedisError),
}
#[derive(Error, Debug)]
pub enum R2D2Error {
#[error("could not get redis connection from pool : {0}")]
RedisPoolError(r2d2_redis::r2d2::Error),
#[error("error parsing string from redis result: {0}")]
RedisTypeError(r2d2_redis::redis::RedisError),
#[error("error executing redis command: {0}")]
RedisCMDError(r2d2_redis::redis::RedisError),
#[error("error creating Redis client: {0}")]
RedisClientError(r2d2_redis::redis::RedisError),
}
impl warp::reject::Reject for Error {}
上面定義了通用的錯誤類型和我們将實作的每一種使用Redis方法的錯誤類型。錯誤本身隻是處理連接配接、池的建立和指令執行等 錯誤。
使用r2d2(同步)
r2d2 crate是第一個被廣泛使用的連接配接池,它現在仍然被廣泛使用。Redis的連接配接池是r2d2-redis crate。
在src目錄下建立r2d2_pool.rs檔案,因為我們現在使用的是連接配接池,是以這個池的建立也需要在r2d2子產品中處理。
use crate::{R2D2Error::*, Result, REDIS_CON_STRING};
use r2d2_redis::redis::{Commands, FromRedisValue};
use r2d2_redis::{r2d2, RedisConnectionManager};
use std::time::Duration;
pub type R2D2Pool = r2d2::Pool;
pub type R2D2Con = r2d2::PooledConnection;
const CACHE_POOL_MAX_OPEN: u32 = 16;
const CACHE_POOL_MIN_IDLE: u32 = 8;
const CACHE_POOL_TIMEOUT_SECONDS: u64 = 1;
const CACHE_POOL_EXPIRE_SECONDS: u64 = 60;
pub fn connect() -> Result> {
let manager = RedisConnectionManager::new(REDIS_CON_STRING).map_err(RedisClientError)?;
r2d2::Pool::builder()
.max_size(CACHE_POOL_MAX_OPEN)
.max_lifetime(Some(Duration::from_secs(CACHE_POOL_EXPIRE_SECONDS)))
.min_idle(Some(CACHE_POOL_MIN_IDLE))
.build(manager)
.map_err(|e| RedisPoolError(e).into())
}
定義一些常量來配置池,如打開和空閑連接配接,連接配接逾時和連接配接的生命周期,池本身是使用RedisConnectionManager建立的,傳遞給它的參數是redis連接配接字元串。
不要太擔心配置值,大多數連接配接池都有一些預設值,這些預設值将适用于基本應用程式。
我們需要一種方法來獲得連接配接池,然後向Redis設定和擷取值。
pub fn get_con(pool: &R2D2Pool) -> Result{
pool.get_timeout(Duration::from_secs(CACHE_POOL_TIMEOUT_SECONDS))
.map_err(|e| {
eprintln!("error connecting to redis: {}", e);
RedisPoolError(e).into()
})
}
pub fn set_str(pool: &R2D2Pool, key: &str, value: &str, ttl_seconds: usize) -> Result<()> {
let mut con = get_con(&pool)?;
con.set(key, value).map_err(RedisCMDError)?;
if ttl_seconds > 0 {
con.expire(key, ttl_seconds).map_err(RedisCMDError)?;
}
Ok(())
}
pub fn get_str(pool: &R2D2Pool, key: &str) -> Result{
let mut con = get_con(&pool)?;
let value = con.get(key).map_err(RedisCMDError)?;
FromRedisValue::from_redis_value(&value).map_err(|e| RedisTypeError(e).into())
}
我們嘗試從池中擷取連接配接,并配置逾時時間。在set_str和get_str中,每次調用這些函數時都會調用get_con。
使用mobc(異步)
在src目錄下建立r2d2_pool.rs檔案,讓我們定義配置并建立連接配接池。
use crate::{MobcError::*, Result, REDIS_CON_STRING};
use mobc::{Connection, Pool};
use mobc_redis::redis::{AsyncCommands, FromRedisValue};
use mobc_redis::{redis, RedisConnectionManager};
use std::time::Duration;
pub type MobcPool = Pool;
pub type MobcCon = Connection;
const CACHE_POOL_MAX_OPEN: u64 = 16;
const CACHE_POOL_MAX_IDLE: u64 = 8;
const CACHE_POOL_TIMEOUT_SECONDS: u64 = 1;
const CACHE_POOL_EXPIRE_SECONDS: u64 = 60;
pub async fn connect() -> Result{
let client = redis::Client::open(REDIS_CON_STRING).map_err(RedisClientError)?;
let manager = RedisConnectionManager::new(client);
Ok(Pool::builder()
.get_timeout(Some(Duration::from_secs(CACHE_POOL_TIMEOUT_SECONDS)))
.max_open(CACHE_POOL_MAX_OPEN)
.max_idle(CACHE_POOL_MAX_IDLE)
.max_lifetime(Some(Duration::from_secs(CACHE_POOL_EXPIRE_SECONDS)))
.build(manager))
}
這和r2d2非常相似,這不是巧合;許多連接配接池庫都從r2d2出色的API中獲得了靈感。
async fn get_con(pool: &MobcPool) -> Result{
pool.get().await.map_err(|e| {
eprintln!("error connecting to redis: {}", e);
RedisPoolError(e).into()
})
}
pub async fn set_str(pool: &MobcPool, key: &str, value: &str, ttl_seconds: usize) -> Result<()> {
let mut con = get_con(&pool).await?;
con.set(key, value).await.map_err(RedisCMDError)?;
if ttl_seconds > 0 {
con.expire(key, ttl_seconds).await.map_err(RedisCMDError)?;
}
Ok(())
}
pub async fn get_str(pool: &MobcPool, key: &str) -> Result{
let mut con = get_con(&pool).await?;
let value = con.get(key).await.map_err(RedisCMDError)?;
FromRedisValue::from_redis_value(&value).map_err(|e| RedisTypeError(e).into())
}
現在看起來應該很熟悉了,傳入池并在開始時擷取連接配接,但這一次采用異步方式,使用async和await。
下一步就是把它們結合到一個warp web應用中,修改main.rs:
use std::convert::Infallible;
use mobc_pool::MobcPool;
use r2d2_pool::R2D2Pool;
use thiserror::Error;
use warp::{Rejection, Filter, Reply};
mod r2d2_pool;
mod mobc_pool;
type WebResult= std::result::Result;
type Result= std::result::Result;
const REDIS_CON_STRING: &str = "redis://127.0.0.1/";
#[tokio::main]
async fn main() {
let mobc_pool = mobc_pool::connect().await.expect("can create mobc pool");
let r2d2_pool = r2d2_pool::connect().expect("can create r2d2 pool");
let mobc_route = warp::path!("mobc")
.and(with_mobc_pool(mobc_pool.clone()))
.and_then(mobc_handler);
let r2d2_route = warp::path!("r2d2")
.and(with_r2d2_pool(r2d2_pool.clone()))
.and_then(r2d2_handler);
let routes = mobc_route.or(r2d2_route);
warp::serve(routes).run(([0, 0, 0, 0], 8080)).await;
}
fn with_mobc_pool(
pool: MobcPool,
) -> impl Filter+ Clone {
warp::any().map(move || pool.clone())
}
fn with_r2d2_pool(
pool: R2D2Pool,
) -> impl Filter+ Clone {
warp::any().map(move || pool.clone())
}
async fn mobc_handler(pool: MobcPool) -> WebResult{
mobc_pool::set_str(&pool, "mobc_hello", "mobc_world", 60)
.await
.map_err(|e| warp::reject::custom(e))?;
let value = mobc_pool::get_str(&pool, "mobc_hello")
.await
.map_err(|e| warp::reject::custom(e))?;
Ok(value)
}
async fn r2d2_handler(pool: R2D2Pool) -> WebResult{
r2d2_pool::set_str(&pool, "r2d2_hello", "r2d2_world", 60)
.map_err(|e| warp::reject::custom(e))?;
let value = r2d2_pool::get_str(&pool, "r2d2_hello").map_err(|e| warp::reject::custom(e))?;
Ok(value)
}
用Docker啟動一個本地Redis執行個體:
docker run -p 6379:6379 redis
接下來,運作 cargo run。
使用curl測試:
curl http://localhost:8080/r2d2
curl http://localhost:8080/mobc