天天看點

在Rust web服務中使用Redis 言簡意赅

作者:鸨哥學Java

Redis一直是網絡生态系統的重要組成部分,它經常用作緩存、消息代理或簡單地用作資料存儲。

在這篇文章中,我們将示範如何在一個Rust web應用程式中使用Redis。

我們将探索兩種種使用Redis的方法:

  • 使用同步連接配接池
  • 使用異步連接配接池

對于同步池,我們使用基于r2d2庫的r2d2-redis。我們在異步解決方案中使用mobc,還有許多其他異步連接配接池,如deadpool和bb8,它們都以類似的方式工作。

話不多說,讓我們開始吧!

建立一個項目:

cargo new rust-redis-web-example           

在Cargo.toml中加入依賴:

[dependencies]

tokio = { version = "1.19", features = ["full"] }

warp = "0.3.2"

redis = "0.21"

r2d2_redis = "0.14"

mobc-redis = "0.7"

mobc = "0.7"

thiserror = "1.0"
           

首先,讓我們設定一些共享類型,在main.rs中:

type WebResult= std::result::Result;

type Result= std::result::Result;




const REDIS_CON_STRING: &str = "redis://127.0.0.1/";
           

定義這兩個Result類型是為了節省一些輸入,并表示内部Errors (Result)和外部Errors (WebResult)。

接下來,定義這個内部error類型并為其實作Reject,以便它可以處理從程式傳回的HTTP錯誤。

#[derive(Error, Debug)]

pub enum Error {

    #[error("mobc error: {0}")]

    MobcError(#[from] MobcError),

    #[error("r2d2 error: {0}")]

    R2D2Error(#[from] R2D2Error),

}




#[derive(Error, Debug)]

pub enum MobcError {

    #[error("could not get redis connection from pool : {0}")]

    RedisPoolError(mobc::Error),

    #[error("error parsing string from redis result: {0}")]

    RedisTypeError(mobc_redis::redis::RedisError),

    #[error("error executing redis command: {0}")]

    RedisCMDError(mobc_redis::redis::RedisError),

    #[error("error creating Redis client: {0}")]

    RedisClientError(mobc_redis::redis::RedisError),

}




#[derive(Error, Debug)]

pub enum R2D2Error {

    #[error("could not get redis connection from pool : {0}")]

    RedisPoolError(r2d2_redis::r2d2::Error),

    #[error("error parsing string from redis result: {0}")]

    RedisTypeError(r2d2_redis::redis::RedisError),

    #[error("error executing redis command: {0}")]

    RedisCMDError(r2d2_redis::redis::RedisError),

    #[error("error creating Redis client: {0}")]

    RedisClientError(r2d2_redis::redis::RedisError),

}




impl warp::reject::Reject for Error {}
           

上面定義了通用的錯誤類型和我們将實作的每一種使用Redis方法的錯誤類型。錯誤本身隻是處理連接配接、池的建立和指令執行等 錯誤。

使用r2d2(同步)

r2d2 crate是第一個被廣泛使用的連接配接池,它現在仍然被廣泛使用。Redis的連接配接池是r2d2-redis crate。

在src目錄下建立r2d2_pool.rs檔案,因為我們現在使用的是連接配接池,是以這個池的建立也需要在r2d2子產品中處理。

use crate::{R2D2Error::*, Result, REDIS_CON_STRING};

use r2d2_redis::redis::{Commands, FromRedisValue};

use r2d2_redis::{r2d2, RedisConnectionManager};

use std::time::Duration;




pub type R2D2Pool = r2d2::Pool;

pub type R2D2Con = r2d2::PooledConnection;




const CACHE_POOL_MAX_OPEN: u32 = 16;

const CACHE_POOL_MIN_IDLE: u32 = 8;

const CACHE_POOL_TIMEOUT_SECONDS: u64 = 1;

const CACHE_POOL_EXPIRE_SECONDS: u64 = 60;




pub fn connect() -> Result> {

    let manager = RedisConnectionManager::new(REDIS_CON_STRING).map_err(RedisClientError)?;

    r2d2::Pool::builder()

        .max_size(CACHE_POOL_MAX_OPEN)

        .max_lifetime(Some(Duration::from_secs(CACHE_POOL_EXPIRE_SECONDS)))

        .min_idle(Some(CACHE_POOL_MIN_IDLE))

        .build(manager)

        .map_err(|e| RedisPoolError(e).into())

}
           

定義一些常量來配置池,如打開和空閑連接配接,連接配接逾時和連接配接的生命周期,池本身是使用RedisConnectionManager建立的,傳遞給它的參數是redis連接配接字元串。

不要太擔心配置值,大多數連接配接池都有一些預設值,這些預設值将适用于基本應用程式。

我們需要一種方法來獲得連接配接池,然後向Redis設定和擷取值。

pub fn get_con(pool: &R2D2Pool) -> Result{

    pool.get_timeout(Duration::from_secs(CACHE_POOL_TIMEOUT_SECONDS))

        .map_err(|e| {

            eprintln!("error connecting to redis: {}", e);

            RedisPoolError(e).into()

        })

}




pub fn set_str(pool: &R2D2Pool, key: &str, value: &str, ttl_seconds: usize) -> Result<()> {

    let mut con = get_con(&pool)?;

    con.set(key, value).map_err(RedisCMDError)?;

    if ttl_seconds > 0 {

        con.expire(key, ttl_seconds).map_err(RedisCMDError)?;

    }

    Ok(())

}




pub fn get_str(pool: &R2D2Pool, key: &str) -> Result{

    let mut con = get_con(&pool)?;

    let value = con.get(key).map_err(RedisCMDError)?;

    FromRedisValue::from_redis_value(&value).map_err(|e| RedisTypeError(e).into())

}
           

我們嘗試從池中擷取連接配接,并配置逾時時間。在set_str和get_str中,每次調用這些函數時都會調用get_con。

使用mobc(異步)

在src目錄下建立r2d2_pool.rs檔案,讓我們定義配置并建立連接配接池。

use crate::{MobcError::*, Result, REDIS_CON_STRING};

use mobc::{Connection, Pool};

use mobc_redis::redis::{AsyncCommands, FromRedisValue};

use mobc_redis::{redis, RedisConnectionManager};

use std::time::Duration;




pub type MobcPool = Pool;

pub type MobcCon = Connection;




const CACHE_POOL_MAX_OPEN: u64 = 16;

const CACHE_POOL_MAX_IDLE: u64 = 8;

const CACHE_POOL_TIMEOUT_SECONDS: u64 = 1;

const CACHE_POOL_EXPIRE_SECONDS: u64 = 60;




pub async fn connect() -> Result{

    let client = redis::Client::open(REDIS_CON_STRING).map_err(RedisClientError)?;

    let manager = RedisConnectionManager::new(client);

    Ok(Pool::builder()

        .get_timeout(Some(Duration::from_secs(CACHE_POOL_TIMEOUT_SECONDS)))

        .max_open(CACHE_POOL_MAX_OPEN)

        .max_idle(CACHE_POOL_MAX_IDLE)

        .max_lifetime(Some(Duration::from_secs(CACHE_POOL_EXPIRE_SECONDS)))

        .build(manager))

}
           

這和r2d2非常相似,這不是巧合;許多連接配接池庫都從r2d2出色的API中獲得了靈感。

async fn get_con(pool: &MobcPool) -> Result{

    pool.get().await.map_err(|e| {

        eprintln!("error connecting to redis: {}", e);

        RedisPoolError(e).into()

    })

}




pub async fn set_str(pool: &MobcPool, key: &str, value: &str, ttl_seconds: usize) -> Result<()> {

    let mut con = get_con(&pool).await?;

    con.set(key, value).await.map_err(RedisCMDError)?;

    if ttl_seconds > 0 {

        con.expire(key, ttl_seconds).await.map_err(RedisCMDError)?;

    }

    Ok(())

}




pub async fn get_str(pool: &MobcPool, key: &str) -> Result{

    let mut con = get_con(&pool).await?;

    let value = con.get(key).await.map_err(RedisCMDError)?;

    FromRedisValue::from_redis_value(&value).map_err(|e| RedisTypeError(e).into())

}
           

現在看起來應該很熟悉了,傳入池并在開始時擷取連接配接,但這一次采用異步方式,使用async和await。

下一步就是把它們結合到一個warp web應用中,修改main.rs:

use std::convert::Infallible;




use mobc_pool::MobcPool;

use r2d2_pool::R2D2Pool;

use thiserror::Error;

use warp::{Rejection, Filter, Reply};




mod r2d2_pool;

mod mobc_pool;




type WebResult= std::result::Result;

type Result= std::result::Result;




const REDIS_CON_STRING: &str = "redis://127.0.0.1/";




#[tokio::main]

async fn main() {

    let mobc_pool = mobc_pool::connect().await.expect("can create mobc pool");

    let r2d2_pool = r2d2_pool::connect().expect("can create r2d2 pool");




    let mobc_route = warp::path!("mobc")

        .and(with_mobc_pool(mobc_pool.clone()))

        .and_then(mobc_handler);




    let r2d2_route = warp::path!("r2d2")

        .and(with_r2d2_pool(r2d2_pool.clone()))

        .and_then(r2d2_handler);




    let routes = mobc_route.or(r2d2_route);




    warp::serve(routes).run(([0, 0, 0, 0], 8080)).await;

}




fn with_mobc_pool(

    pool: MobcPool,

) -> impl Filter+ Clone {

    warp::any().map(move || pool.clone())

}




fn with_r2d2_pool(

    pool: R2D2Pool,

) -> impl Filter+ Clone {

    warp::any().map(move || pool.clone())

}




async fn mobc_handler(pool: MobcPool) -> WebResult{

    mobc_pool::set_str(&pool, "mobc_hello", "mobc_world", 60)

        .await

        .map_err(|e| warp::reject::custom(e))?;

    let value = mobc_pool::get_str(&pool, "mobc_hello")

        .await

        .map_err(|e| warp::reject::custom(e))?;

    Ok(value)

}




async fn r2d2_handler(pool: R2D2Pool) -> WebResult{

    r2d2_pool::set_str(&pool, "r2d2_hello", "r2d2_world", 60)

        .map_err(|e| warp::reject::custom(e))?;

    let value = r2d2_pool::get_str(&pool, "r2d2_hello").map_err(|e| warp::reject::custom(e))?;

    Ok(value)

}
           

用Docker啟動一個本地Redis執行個體:

docker run -p 6379:6379 redis           

接下來,運作 cargo run。

使用curl測試:

curl http://localhost:8080/r2d2

curl http://localhost:8080/mobc           

繼續閱讀