天天看點

使用redis的zset實作高效分頁查詢(附完整代碼)

一、需求

移動端系統裡有使用者和文章,文章可設定權限對部分使用者開放。現要實作的功能是,使用者浏覽自己能看的最新文章,并可以上滑分頁檢視。

二、資料庫表設計

 涉及到的資料庫表有:使用者表TbUser、文章表TbArticle、使用者可見文章表TbUserArticle。其中,TbUserArticle的結構和資料如下圖,字段有:自增長主鍵id、使用者編号uid、文章編号aid。

使用redis的zset實作高效分頁查詢(附完整代碼)

自增長主鍵和分布式增長主鍵如何選(題外讨論):

TbUserArticle的主鍵是自增id,它有個缺陷是,當你的資料庫有主從複制時,主從庫的自增可能因死鎖等原因導緻不同步。不過,我們可以知道,這裡的TbUserArticle的主鍵id不會用在其它表裡,是以可以是自增id。不像使用者表的主鍵,它就不能用自增id,因為使用者表主鍵(uid)會經常出現在其它表中,當主從庫自增不一緻時,很多有uid字段的表資料在從庫中就不正确了。使用者表主鍵最好是用分布式增長主鍵算法生成的id(比如Snowflake雪花算法)。

那麼你可能就要說了,TbUserArticle的主鍵為什麼不直接用雪花算法産生,不管有沒有用,先讓主從庫主鍵值一緻總是有恃無恐。要知道,雪花算法産生的id一般是18位,而redis的zset的score是double類型,隻能表達到16位"整數"部分(精确的說是9007199254740992=2的53次方)。是以,TbUserArticle的主鍵選擇自增id。那麼能不能産生一個16位(具體是53bit)的分布式增長id用于支援zset的score呢,當然也是可以的,因為目前的雪花算法是可以根據實際系統環境壓縮bit位的,怎麼壓縮bit位呢,有許多方案,以後有需要我可以把它寫出來。

建議:主鍵一般都要選自增id或分布式增長id,這種主鍵好處多多,它符合自增長(實體存儲時都是在末尾追加資料,減少資料移動)、唯一性、長度小、查詢快的特性,是聚集索引的很好選擇。

三、redis緩存設計-zset

zset的作法及其優點說明:

1.zset的score倒序取數可以很好的滿足取最新資料的需求。

2.用TbUserArticle的文章編号當value,用自增長id當score。自增id的唯一性可很友善的取下一頁資料,直接取小于上次最後一筆的score即可(用lastScore表示)。而如果用文章的時間做score,則要考慮兩筆文章的時間是同分同秒問題,當lastScore落在同分同秒的兩篇文章之間時,就尴尬了,雖然有解,但麻煩了一點。有時的場景你用不了自增id當score,隻能用文章時間,那怎麼解決呢,方案就是當是同分同秒時,再根據文章編号做比較就好了,zset的score相同時,也是再根據value排序的,這塊的代碼實作請看下文第五點,隻需稍微改點代碼即可。

3.當新增或重新添加一項時,zset也會保持score排序。而如果用的是redis的list,一般就得從db重載緩存,新增進來的資料項就算是最新的,也不敢直接添加到list第一筆,因為并發情況下,保證不了最新就是在第一筆;至于重新添加進非最新項,那更是要從db取數重新裝載緩存(一般是直接删除緩存,要用的時候才裝載)。

4.第一次從db加載資料到zset時,可隻取前N筆到zset。因為我們移動端的資料浏覽,一般是隻看最新N筆,當看到昨天浏覽過的資料一般就不會再往下浏覽。

5.控制zset為固定長度,防止一直增長,一是減少緩存開銷,二是隊列長度越短操作性能越高。而且redis服務端有兩個參數:zset-max-ziplist-entries(zset隊列長度,預設值128)和 zset-max-ziplist-value(zset每項大小,預設值64位元組),它們的作用是,當zset長度小于128,且每個元素的大小小于64位元組時,會啟用ziplist(壓縮雙向連結清單),它的記憶體空間可以減少8倍左右,而且操作性能也更快。如果不滿足這兩個條件則是普通的skiplist(跳躍表)。另,資料結構hash和list預設長度是512。如果系統有100萬個使用者,每個使用者都有自己的隊列緩存,那麼使用ziplist将節省非常大的記憶體空間,并提升很大的性能。

注意,當從zset移除一項資料,則看場景是否需要清空隊列。否則有可能添加進來了一項很舊的資料,它會跑到緩存隊列最底部,如果此舊資料比db中未進隊列的資料還舊,那麼隊列中的資料就不正确了。(此時,使用者滑到緩存最後一頁時,就有可能浏覽到這項不正确的資料,為什麼是“有可能”,因為當取到zset最後一筆,很可能不夠一頁(一頁10筆計算的話,90%會取不夠一頁),而不夠一頁就會從db直接取一頁,從db直接取就不會有這項不正确的資料。而當zset又添加進一項新資料,末端那筆舊資料就會被T出隊列(因為隊列保持固定長度),zset資料又恢複正确了。不管怎樣,這種問題幾率雖不高,也是有解決方案,可搞個臨界點處理此問題,不細說,否則又是長篇大論,最好的方案就是根據實際場景設計,比如從zset隊列移除資料的情況多不多)。而如果添加到zset的資料都是最新資料,則不會有此問題。

 當用唯一主鍵id做score時,這可是非常有用,你可以直接根據id定位到項了,至于如何大用它,我會再出篇部落格。

四、代碼實作 

從redis緩存按頁取數一般要考慮的點:

1.當根據cacheKey未取到資料時(可能是緩存過期了導緻redis無此cacheKey資料),則觸發重載資料(reload):從db取limit N筆資料,裝載到redis zset隊列中,并直接取N筆的第一頁資料傳回;

2.如果db本身也無對應資料,則添加"no_db_mark"辨別到cacheKey隊列中,下次請求則不會再觸發db重載資料;

3.當取到緩存末尾時,從db取一頁資料直接傳回。這種情況是很少的,要根據業務場景合理規劃緩存長度。

上代碼:

代碼注釋比較詳細和有用,請直接看代碼。

其中,批量添加資料到zset的函數AddItemsToZset很有用,它使用lua一次性添加多筆資料到zset(注意,使用lua時,要保證lua執行快,否則它會阻塞其它指令的執行),經測試:AddItemsToZset添加1w筆資料,隻需要39ms;10w筆需要448ms。因為我們隻取前N筆資料到緩存,是以一般不會添加超過1w筆。

另一個通用有用的函數是GetPageDataByLastScoreFromRedis,它支援從指定的score開始取pageSize筆資料,即支援了zset分頁。它是第二頁(及之後)的取數,而如果取第一頁取數,則直接用redis原生函數即可redis.GetRangeWithScoresFromSortedSetDesc(cacheKey, 0, pageSize - 1);。

/// <summary>
    /// 分頁取數幫助類
    /// </summary>
    public class PageDataHelper
    {
        public readonly static string NoDbDataMark = "no_db_data";//在zset中辨別db也無資料
        public static RedisHandle RedisClient = new RedisHandle();//redis操作對象示例
        public static DbHandleBase DbHandle = new SqlServerHandle("Data Source=.;Initial Catalog=Test;User Id=sa;Password=123ewq;");//db操作對象示例
        /// <summary>
        /// 按頁取數。傳回文章編号清單。
        /// </summary>
        /// <param name="lastInfo">上一頁最後一筆的score,如果為空,則說明是取第一頁。</param>
        /// <param name="getPast">true,使用者上滑浏覽下一頁資料;false,使用者上滑浏覽最新一頁資料</param>
        /// <returns>傳回key-value清單,key就是文章編号,value就是自增id(可用于lastScore)</returns>
        public static IDictionary<string, double> GetUserPageData(string uid, int pageSize, string lastInfo, bool getPast)
        {
            long lastScore = 0;
            //1.解析lastInfo資訊。->getPast為false,則固定取最新第一頁資料,不用解析。lastInfo為空,則也不用解析,預設第一頁
            if (getPast && !string.IsNullOrWhiteSpace(lastInfo))
            {
                lastScore = long.Parse(lastInfo);//外層有try..catch..
            }
            string cacheKey = $"usr:art:{uid}";
            bool isFirstPage = lastScore <= 0;
            using (IRedisClient redis = RedisClient.GetRedisClient())
            {
                if (isFirstPage)
                {
                    //2.第一頁取數
                    var items = redis.GetRangeWithScoresFromSortedSetDesc(cacheKey, 0, pageSize - 1);
                    if (items.Count == 0)
                    {
                        //2.1 無資料時,則從db reload資料
                        items = ReloadDataToRedis(redis, cacheKey, uid, pageSize);
                        if (items.Count == 0 && pageSize > 0)
                        {
                            //如果db中也無資料,則向zset中添加一筆NoDbDataMark辨別
                            redis.AddItemToSortedSet(cacheKey, NoDbDataMark, double.MaxValue);
                        }
                    }
                    else if (items.Count == 1 && items.ContainsKey(NoDbDataMark))
                    {
                        //2.2如果取到的是NoDbDataMark辨別,則說明是空資料,則要Clear,傳回空清單
                        items.Clear();
                    }
                    //設定緩存有效期,要根據業務場景合理設定緩存有效期,這邊以7天為例。
                    redis.ExpireEntryIn(cacheKey, new TimeSpan(7, 0, 0, 0));
                    //2.3 第一頁,有多少就傳回多少資料。資料如果不夠一頁,說明本身資料不夠。
                    return items;
                }
                else
                {
                    //3.第二頁(及之後)取數
                    var items = GetPageDataByLastScoreFromRedis(redis, cacheKey, pageSize, lastScore);
                    if (items.Count < pageSize)
                    {
                        //3.1 如果取不夠資料時,就到db取。如果db也不能取到一頁資料,前端會顯示無更多資料,不會一直db取。
                        return GetPageDataByLastScoreFromDb(uid, pageSize, lastScore);
                    }
                    //3.2 如果緩存資料足夠,則傳回緩存的資料。
                    return items;
                }
            }
        }
        public static Dictionary<string, double> ReloadDataToRedis(IRedisClient redis, string cacheKey, string uid, int pageSize, string bizId = "")
        {
            //1.db取數 取top 1000筆資料。不需要全取到緩存。
            IEnumerable<dynamic> models;
            using (var conn = DbHandle.CreateConnectionAndOpen())
            {
                var sql = $"select top 1000 id,aid from TbUserArticle where uid=@uid order by id desc;";// limit 1000;";
                models = conn.Query<dynamic>(sql, new { uid = uid });
            }
            if (models.Count() <= 0) return new Dictionary<string, double>();
            //2.資料加載到redis緩存。
            var itemsParam = new Dictionary<string, double>();
            foreach (dynamic model in models)
            {
                itemsParam.Add((string)model.aid, (double)model.id);
            }
            //使用lua一次性添加資料到緩存。lua語句要執行快,經測試添加1w筆資料,隻需要39ms;10w筆需要448ms。因為sql中有limit,是以一般不會添加超過1w筆。
            //因為是原子性操作、并且是zset結構,這邊不需要加鎖。db取到資料應第一時間加載到redis。
            AddItemsToZset(redis, cacheKey, itemsParam, true, true);
            if (pageSize <= 0) return null;
            //3.直接由models傳回第一頁資料。
            return models.Take(pageSize).ToDictionary(x => (string)x.aid, y => (double)y.id);
        }

        public static Dictionary<string, double> GetPageDataByLastScoreFromDb(string uid, int pageSize, double lastScore)
        {
            //db取一頁資料。
            var sql = $"select top {pageSize} id,aid from TbUserArticle where uid=@uid and id<{lastScore}order by id desc;";// limit {pageSize};";
            using (var conn = DbHandle.CreateConnectionAndOpen())
            {
                return conn.Query<dynamic>(sql, new { uid = uid }).ToDictionary(x => (string)x.aid, y => (double)y.id);
            }
        }
        #region 通用函數
        /// <summary>
        /// ZSet第一頁之後的取數,從lastScore開始取pageSize筆資料(第一頁之後才有lastScore)。
        /// 使用lua,保證原子性操作。
        /// </summary>
        public static Dictionary<string, double> GetPageDataByLastScoreFromRedis(IRedisClient redis, string zsetKey, int pageSize, double lastScore)
        {
            //ZREVRANGEBYSCORE: from lastScore to '-inf'.
            var luaBody = @"local sets = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], '-inf', 'WITHSCORES');
            local result = {};
            local index=0;
            local pageSize=ARGV[2]*1;
            local lastScore=ARGV[1]*1;
            for i = 1, #sets, 2 do 
                if index>=pageSize then
                    break;
                end
                if (lastScore>sets[i+1]*1) then
                    table.insert(result, sets[i]);
                    table.insert(result, sets[i+1]);
                    index=index+1;
                end
            end
            return result";
            //ARGV[1]:lastScore ARGV[2]:pageSize
            var list = redis.ExecLuaAsList(luaBody, new string[] { zsetKey }, new string[] { lastScore.ToString(), pageSize.ToString() });
            var result = new Dictionary<string, double>();
            for (var i = 0; i < list.Count; i += 2)
            {
                result.Add(list[i], Convert.ToDouble(list[i + 1]));
            }
            return result;
        }
        /// <summary>
        /// 添加一項到zset緩存中。
        /// </summary>
        /// <param name="item">要添加到zset的資料項</param>
        /// <param name="maxCount">控制zset最大長度,如果為0,則不控制。</param>
        /// <returns></returns>
        public static string AddItemToZset(IRedisClient redis, string zsetKey, KeyValuePair<string, double> item, int maxCount = 0)
        {
            var items = new Dictionary<string, double>() { { item.Key, item.Value } };
            return AddItemsToZset(redis, zsetKey, items);
        }
        /// <summary>
        /// 添加多項到zset緩存中。
        /// </summary>
        /// <param name="items">要添加到zset的資料清單</param>
        /// <param name="hasCacheExpire">緩存zsetKey是否有設定緩存有效期。如果有設定緩存有效期,則當緩存中無資料時,可能是緩存過期;而如果緩存無有效期,緩存中無資料,就是db和緩存都無資料</param>
        /// <param name="isReload">是否是reload情況,true重載情況;false追加</param>
        /// <param name="maxCount">控制zset最大長度,如果為0,則不控制。</param>
        /// <returns></returns>
        public static string AddItemsToZset(IRedisClient redis, string zsetKey, Dictionary<string, double> items, bool hasCacheExpire = true
            , bool isReload = false, int maxCount = 0)
        {
            //!isReload,是因為如果isReload=true情況無資料,則也要進來重載隊列為無資料(即,如果之前有資料要重載為無資料)
            if (!isReload && items.Count <= 0) return null;
            var argArr = new List<string>(items.Count * 2 + 2);//lua參數數組
            //var hasCacheExpire = cacheValidTime != null;
            //第一個lua參數是hasCacheExpire
            argArr.Add(hasCacheExpire ? "1" : "0");
            //第二個lua參數是maxCount
            argArr.Add(maxCount.ToString());
            //組合lua其它參數清單:ZADD的參數
            foreach (var item in items)
            {
                //Add score。 //ZADD KEY_NAME SCORE1 VALUE1
                argArr.Add(item.Value.ToString());
                argArr.Add(item.Key);
            }
            #region lua
            /*
            * 以下lua指令說明。
            * 1.ZREVRANGE從大到小取第一筆資料firstMark;
            * 2.緩存有設定有效期時(hasCacheExpire=1),如果第一筆資料firstMark為nil,則說明清單是空(失效key、未生成key),則不做任何處理,直接傳回字元串not_exist_key。因為可能是使用者失效資料,使用者長期未通路,則不添加,後繼來通路時重載資料。
            * 3.如果firstMark辨別為no_db_data,則是被api辨別為db沒資料,而此時因要ZADD資料進來,是以要把此辨別删除。其中,ZREMRANGEBYRANK從小到大删除,-1是倒數第一筆。
            * 4.ZADD資料進來
            * 5.KeepLength保持隊列長度操作。如果隊列長度(由ZCARD擷取)超過指定的maxCount,則從隊列第一筆開始删除多餘元素,即score最小開始删除。
            * 6.maxCount為>0才KeepLength。傳回數值:curCount - maxCount。(可以用傳回值簡單算出隊列目前長度curCount)。如果傳回值小于等于0則說明沒有觸發删除操作。
            * 7.maxCount為<=0時,直接傳回'no_remove'。
            */
            //清空原來,重新加載資料的情況
            const string reloadLua = "redis.call('DEL', KEYS[1]) ";
            //追加資料到zset的情況
            const string addToLua =
            @"local firstMark = redis.call('ZREVRANGE',KEYS[1],0,0);
            local hasCacheExpire=ARGV[1]*1;
            if hasCacheExpire==1 and firstMark and firstMark[1]==nil then
                return 'not_exist_key';
            end
            if firstMark and firstMark[1]=='{0}' then
                redis.call('ZREMRANGEBYRANK', KEYS[1], -1,-1);
            end";
            const string constAllLua =
            @"{0}
            for i=3, #ARGV, 2 
                do redis.call('ZADD', KEYS[1], ARGV[i], ARGV[i+1]);  
            end
            local maxCount=ARGV[2]*1;
            if maxCount>0 then
              local curCount= redis.call('ZCARD', KEYS[1]);
              local removeCount=curCount - maxCount;
              if removeCount>0 then
                redis.call('ZREMRANGEBYRANK', KEYS[1], 0,removeCount-1);    
              end  
              return removeCount;
            end
            return 'no_remove';";
            #endregion
            var luaBody = string.Format(constAllLua, isReload ? reloadLua : string.Format(addToLua, NoDbDataMark));
            var luaResult = redis.ExecLuaAsString(luaBody, new string[] { zsetKey }, argArr.ToArray());
            return luaResult;
        }
        #endregion
    }      

五、用時間做score,同分同秒問題解決

如果是用時間做score,會有同分同秒問題,比如在TbUserArticle裡增加了“時間”欄位。解決方法代碼隻需稍作微改,參數除了lastScore(此時是“時間”),還需要傳lastAid(文章編号)。

1. 緩存處理修改,隻動了以下紅色粗體字。(注:當zset的兩筆資料score相同時,是再根據value排序的):

public static Dictionary<string, double> GetPageDataByLastScoreFromRedis(IRedisClient redis, string zsetKey, int pageSize, double lastScore,string lastAid)
        {
            //ZREVRANGEBYSCORE: from lastScore to '-inf'.
            var luaBody = @"local sets = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], '-inf', 'WITHSCORES');
            local result = {};
            local index=0;
            local pageSize=ARGV[2]*1;
            local lastScore=ARGV[1]*1;
            local lastAid=ARGV[3];
            for i = 1, #sets, 2 do 
                if index>=pageSize then
                    break;
                end
                if (lastScore>sets[i+1]*1) or (lastScore==sets[i+1]*1 and lastAid>sets[i]) then
                    table.insert(result, sets[i]);
                    table.insert(result, sets[i+1]);
                    index=index+1;
                end
            end
            return result";
            //ARGV[1]:lastScore ARGV[2]:pageSize
            var list = redis.ExecLuaAsList(luaBody, new string[] { zsetKey }, new string[] { lastScore.ToString(), pageSize.ToString(), lastAid });
            var result = new Dictionary<string, double>();
            for (var i = 0; i < list.Count; i += 2)
            {
                result.Add(list[i], Convert.ToDouble(list[i + 1]));
            }
            return result;
        }      

2.db取數修改

reload sql

$"select top 1000 時間,aid from TbUserArticle where uid=@uid order by 時間 desc,aid desc;";

db中取一頁的sql

$"select top {pageSize} 時間,aid from TbUserArticle where uid=@uid and (時間<{lastScore} or (時間={lastScore} and aid<'{lastAid}')) order by 時間 desc,aid desc;";

這樣就可以了,中心思想就是:當“時間={lastScore} ”,那麼就增加文章編号比較條件。

分享、互相交流學習

繼續閱讀