本篇首先介紹了一下熔斷、降級以及AOP的基本概念,然後從兩個流行的庫Polly和AspectCore的基本使用開始了解如何在.NET Core代碼中實作熔斷機制和AOP,最後通過結合Polly+AspectCore封裝了一個Hystrix來介紹了一下如何在ASP.NET Core程式中如何做到标簽式地快速實作熔斷降級機制。後續,會将Polly與Ocelot結合實踐API網關。
Tip: 此篇已加入.NET Core微服務基礎系列文章索引
一、熔斷、降級與AOP
1.1 啥是熔斷?
在廣義的解釋中,熔斷主要是指為控制股票、期貨或其他金融衍生産品的交易風險,為其單日價格波動幅度規定區間限制,一旦成交價觸及區間上下限,交易則自動中斷一段時間(“熔即斷”),或就此“躺平”而不得超過上限或下限(“熔而不斷”)。
而對于微服務來說,熔斷就是我們常說的“保險絲”,意為當服務出現某些狀況時,切斷服務,進而防止應用程式不斷地嘗試執行可能會失敗的操作造成系統的“雪崩”,或者大量的逾時等待導緻系統卡死等情況,很多地方也将其成為“過載保護”。
1.2 啥是降級?
降級的目的就是當某個服務提供者發生故障的時候,向調用方傳回一個替代響應或者錯誤響應。
例如:假設有一個短信服務,其調用聯通接口伺服器發送短信服務(假設這裡調用聯通接口最友善,最省事也最經濟)失敗之後,會嘗試改用移動短信伺服器(假設這裡調用移動伺服器比較不友善也不經濟)發送,如果移動伺服器調用也失敗,那麼還會嘗試改用電信短信伺服器(假設這裡調用電信伺服器最不省事和最不經濟),如果還失敗,則傳回“失敗”響應;
降級的另一個概念也可以看作是服務的“選擇性放棄”,比如在雙11或618等大型的電商活動日中,在高峰值的情形下,一般的電商系統都會采用部分服務的優先級降低或者幹脆延時或停止服務,以確定主要的服務能夠使用最大化的資源為客戶提供服務。等待峰值下降之後,再通過處理恢複那些降級的服務的原有優先級。
1.3 啥是AOP?
AOP(Aspect Oriented Programming)意為面向切面程式設計,它是指在運作時,動态地将代碼切入到類的指定方法、指定位置上的程式設計思想就是面向切面的程式設計。比如說,我們在兩個類中,可能都需要在每個方法中做日志。按面向對象的設計方法,我們就必須在兩個類的方法中都加入日志的内容。也許他們是完全相同的,但就是因為面向對象的設計讓類與類之間無法聯系,而不能将這些重複的代碼統一起來。而AOP就是為了解決這個問題而生的,一般而言,我們把切入到指定類指定方法的代碼片段稱為切面,而切入到哪些類、哪些方法則叫切入點。有了AOP,我們就可以把幾個類共有的代碼,抽取到一個切片中,等到需要時再切入對象中去,進而改變其原有的行為。
AOP是OOP(Object Oriented Programming)的補充,OOP從橫向上區分出一個個的類來,而AOP則從縱向上向對象中加入特定的代碼。有了AOP,OOP變得立體了。關于AOP的更多細節和讨論,可以浏覽知乎的這篇文章:《什麼是AOP?》
二、Polly的基本使用
2.1 Polly極簡介紹
Polly是一個被.NET基金會認可的彈性和瞬态故障處理庫,允許我們以非常順暢和線程安全的方式來執諸如行重試,斷路,逾時,故障恢複等政策,其主要功能如下:
- 功能1:重試(Retry)
- 功能2:斷路器(Circuit-Breaker)
- 功能3:逾時檢測(Timeout)
- 功能4:緩存(Cache)
- 功能5:降級(Fallback)
Polly的政策主要由“故障”和“動作”兩個部分組成,“故障”可以包括異常、逾時等情況,“動作”則包括Fallback(降級)、重試(Retry)、熔斷(Circuit-Breaker)等。政策則用來執行業務代碼,當業務代碼出現了“故障”中的情況時就開始執行“動作”。
2.2 Polly基礎使用
*.這裡隻介紹幾個我們需要用到的功能,其他功能請浏覽參考資料關于Polly的部分。
(1)通過NuGet安裝,最新版本:6.0.1
NuGet>Install-Package Polly
(2)FallBack => 當出現故障,則進入降級動作
public static void Case1()
{
ISyncPolicy policy = Policy.Handle<ArgumentException>()
.Fallback(() =>
{
Console.WriteLine("Error occured");
});
policy.Execute(() =>
{
Console.WriteLine("Job Start");
throw new ArgumentException("Hello Polly!");
Console.WriteLine("Job End");
});
}
執行結果如下圖所示:這裡捕捉的是ArgumentException, 如果想捕捉所有的Exception,請設定Policy.Handle<Exception>,不過這樣就擴大了範圍。
(3)Retry => 重試,比較容易了解
public static void Case2()
{
ISyncPolicy policy = Policy.Handle<Exception>().Retry(3);
try
{
policy.Execute(() =>
{
Console.WriteLine("Job Start");
if (DateTime.Now.Second % 10 != 0)
{
throw new Exception("Special error occured");
}
Console.WriteLine("Job End");
});
}
catch (Exception ex)
{
Console.WriteLine("There's one unhandled exception : " + ex.Message);
}
}
執行結果如下圖所示:可以看到,這裡重試了三次,仍然沒有滿足條件(DateTime.Now.Second % 10 == 0),是以進入了外部的未處理異常catch塊中。
(4)CircuitBreaker => 短路保護,當一塊業務代碼/服務 出現了N次錯誤,則把“熔斷器”(保險絲)熔斷,等待一段時間後才允許再次執行,在這段等待的時間内如果再執行則直接抛出BrokenCircuitException異常。這個也很好了解,比如我們的手機螢幕密碼,如果輸錯了N次之後,手機會拒絕我們再次輸入,而是讓我們等待20 ~ 30s 之後再輸入,如果等待之後再輸錯N次,則再次進入等待。
這裡假設我們設定一個短路保護政策:當發生了故障的時候,則重試了5次還是有故障(代碼中的6代表的是在執行短路保護政策之前允許6次故障),那麼久停止服務10s鐘,10s之後再允許重試。
public static void Case3()
{
// Stop for 10s after retry 6 times
ISyncPolicy policy = Policy.Handle<Exception>()
.CircuitBreaker(6, TimeSpan.FromSeconds(10));
while (true)
{
try
{
policy.Execute(() =>
{
Console.WriteLine("Job Start");
throw new Exception("Special error occured");
Console.WriteLine("Job End");
});
}
catch (Exception ex)
{
Console.WriteLine("There's one unhandled exception : " + ex.Message);
}
Thread.Sleep(500);
}
}
執行結果如下圖所示:出現了6次故障之後,直接給我們跑出了短路保護的異常,“The circuit is now open and is not allowing calls”.
(5)Timeout 與 Wrap => Wrap是指政策封裝,可以把多個ISyncPolicy合并到一起執行。Timeout則是指逾時處理,但是逾時政策一般不能直接使用,而是其其他政策封裝到一起使用。
這裡我們封裝兩個政策,一個是基本的Fallback,另一個則是逾時政策,如果調用執行時間超過2s則觸發Fallback。
這裡涉及到Polly中關于逾時的兩個政策:一個是悲觀政策(Pessimistic),一個是樂觀政策(Optimistic)。其中,悲觀政策逾時後會直接抛異常,而樂觀政策則不會,而隻是觸發CancellationTokenSource.Cancel函數,需要等待委托自行終止操作。一般情況下,我們都會用悲觀政策。
public static void Case4()
{
try
{
ISyncPolicy policyException = Policy.Handle<TimeoutRejectedException>()
.Fallback(() =>
{
Console.WriteLine("Fallback");
});
ISyncPolicy policyTimeout = Policy.Timeout(3, Polly.Timeout.TimeoutStrategy.Pessimistic);
ISyncPolicy mainPolicy = Policy.Wrap(policyTimeout, policyException);
mainPolicy.Execute(() =>
{
Console.WriteLine("Job Start...");
Thread.Sleep(5000);
//throw new Exception();
Console.WriteLine("Job End...");
});
}
catch (Exception ex)
{
Console.WriteLine($"Unhandled exception : {ex.GetType()} : {ex.Message}");
}
}
執行結果如下圖所示:
除此之外,Polly還提供了一些異步方法供調用以實作以上介紹的功能,比如在業務代碼中有一些Http的調用或者IO操作時,不妨用用異步操作來提高一點效率,可以看下面這個例子:
public static async void Case5()
{
Policy<byte[]> policy = Policy<byte[]>.Handle<Exception>()
.FallbackAsync(async c =>
{
Console.WriteLine("Executed Error!");
return new byte[0];
}, async r =>
{
Console.WriteLine(r.Exception);
});
policy = policy.WrapAsync(Policy.TimeoutAsync(20, TimeoutStrategy.Pessimistic,
async (context, timespan, task) =>
{
Console.WriteLine("Timeout!");
}));
var bytes = await policy.ExecuteAsync(async ()=>
{
Console.WriteLine("Start Job");
HttpClient httpClient = new HttpClient();
var result = await httpClient.GetByteArrayAsync("https://images2018.cnblogs.com/blog/381412/201806/381412-20180606230929894-145212290.png");
Console.WriteLine("Finish Job");
return result;
});
Console.WriteLine($"Length of bytes : {bytes.Length}");
}
至于Polly更多的功能和用法,可以參閱官方文檔,這裡不再贅述。
三、AspectCore的基本使用
3.1 為什麼要用AOP架構
從上面的例子可以看出,如果直接使用Polly,那麼就會造成我們的業務代碼中混雜大量的業務無關的代碼。是以,我們會使用AOP的方式來封裝Polly,嗯,首先我們先找一個支援的.NET Core的AOP架構吧,目前大家都在用AspectCore(國産,作者Lemon),它采用動态動态代理/織入,并且支援異步方法的攔截。
快快通過NuGet安裝一個吧:
NuGet>Install-Package AspectCore.Core
3.2 AspectCore的極簡使用
這裡假設我們要針對一個類的某些類的某些方法進行攔截,我們一般會經過一下幾個步驟:
(1)編寫一個攔截器,一般繼承自AbstractInterceptorAttribute
/// <summary>
/// 自定義攔截器
/// </summary>
public class CustomInterceptorAttribute : AbstractInterceptorAttribute
{
/// <summary>
/// 每個被攔截的方法中執行
/// </summary>
/// <param name="context"></param>
/// <param name="next"></param>
/// <returns></returns>
public override async Task Invoke(AspectContext context, AspectDelegate next)
{
try
{
Console.WriteLine("Before service call");
await next(context); // 執行被攔截的方法
}
catch (Exception)
{
Console.WriteLine("Service threw an exception");
throw;
}
finally
{
Console.WriteLine("After service call");
}
}
}
這裡我們通過為被攔截方法增加一些處理前和處理後的logic來實作AOP。
(2)編寫需要被代理攔截的類
/// <summary>
/// 實作AoP的兩個要求:
/// 1.public 類
/// 2.virtual 方法
/// </summary>
public class Person
{
[CustomInterceptor]
public virtual void Say(string message)
{
Console.WriteLine($"Service calling ... => {message}");
}
}
可以看到我們在要攔截的方法Say()的聲明之上加了一個Attribute:CustomInterceptor,正是我們之前新增的。
(3)通過AspectCore建立代理對象實作AOP
public class Program
{
public static void Main(string[] args)
{
ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder();
using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build())
{
Person p = proxyGenerator.CreateClassProxy<Person>();
p.Say("edisonchou.cnblogs.com");
}
Console.ReadKey();
}
}
代碼很清晰,不再解釋。直到這裡,我們看到了不管是Polly的使用,還是AspectCore的使用,都存在一些業務無關的聲明代碼,而且我們需要結合Polly和AspectCore才能完整地實作适合ASP.NET Core的熔斷降級元件,下面我們就來模仿Spring Cloud中的Hystrix(可以參考這一篇文章來了解Spring Cloud Hystrix是個啥玩意兒)
四、Polly+AspectCore的結合使用
4.1 封裝一個Hystrix
NuGet>Install-Package Polly
NuGet>Install-Package AspectCore.Core
NuGet>Install-Package Microsoft.Extensions.Caching.Memory
[AttributeUsage(AttributeTargets.Method)]
public class HystrixCommandAttribute : AbstractInterceptorAttribute
{
/// <summary>
/// 最多重試幾次,如果為0則不重試
/// </summary>
public int MaxRetryTimes { get; set; } = 0;
/// <summary>
/// 重試間隔的毫秒數
/// </summary>
public int RetryIntervalMilliseconds { get; set; } = 100;
/// <summary>
/// 是否啟用熔斷
/// </summary>
public bool IsEnableCircuitBreaker { get; set; } = false;
/// <summary>
/// 熔斷前出現允許錯誤幾次
/// </summary>
public int ExceptionsAllowedBeforeBreaking { get; set; } = 3;
/// <summary>
/// 熔斷多長時間(毫秒)
/// </summary>
public int MillisecondsOfBreak { get; set; } = 1000;
/// <summary>
/// 執行超過多少毫秒則認為逾時(0表示不檢測逾時)
/// </summary>
public int TimeOutMilliseconds { get; set; } = 0;
/// <summary>
/// 緩存多少毫秒(0表示不緩存),用“類名+方法名+所有參數ToString拼接”做緩存Key
/// </summary>
public int CacheTTLMilliseconds { get; set; } = 0;
private static ConcurrentDictionary<MethodInfo, Policy> policies
= new ConcurrentDictionary<MethodInfo, Policy>();
private static readonly IMemoryCache memoryCache
= new MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions());
/// <summary>
/// HystrixCommandAttribute
/// </summary>
/// <param name="fallBackMethod">降級的方法名</param>
public HystrixCommandAttribute(string fallBackMethod)
{
this.FallBackMethod = fallBackMethod;
}
public string FallBackMethod { get; set; }
public override async Task Invoke(AspectContext context, AspectDelegate next)
{
//一個HystrixCommand中保持一個policy對象即可
//其實主要是CircuitBreaker要求對于同一段代碼要共享一個policy對象
//根據反射原理,同一個方法的MethodInfo是同一個對象,但是對象上取出來的HystrixCommandAttribute
//每次擷取的都是不同的對象,是以以MethodInfo為Key儲存到policies中,確定一個方法對應一個policy執行個體
policies.TryGetValue(context.ServiceMethod, out Policy policy);
lock (policies)//因為Invoke可能是并發調用,是以要確定policies指派的線程安全
{
if (policy == null)
{
policy = Policy.NoOpAsync();//建立一個空的Policy
if (IsEnableCircuitBreaker)
{
policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak)));
}
if (TimeOutMilliseconds > 0)
{
policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic));
}
if (MaxRetryTimes > 0)
{
policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds)));
}
Policy policyFallBack = Policy
.Handle<Exception>()
.FallbackAsync(async (ctx, t) =>
{
AspectContext aspectContext = (AspectContext)ctx["aspectContext"];
var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
//不能如下這樣,因為這是閉包相關,如果這樣寫第二次調用Invoke的時候context指向的
//還是第一次的對象,是以要通過Polly的上下文來傳遞AspectContext
//context.ReturnValue = fallBackResult;
aspectContext.ReturnValue = fallBackResult;
}, async (ex, t) => { });
policy = policyFallBack.WrapAsync(policy);
//放入
policies.TryAdd(context.ServiceMethod, policy);
}
}
//把本地調用的AspectContext傳遞給Polly,主要給FallbackAsync中使用,避免閉包的坑
Context pollyCtx = new Context();
pollyCtx["aspectContext"] = context;
//Install-Package Microsoft.Extensions.Caching.Memory
if (CacheTTLMilliseconds > 0)
{
//用類名+方法名+參數的下劃線連接配接起來作為緩存key
string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType
+ "." + context.ServiceMethod + string.Join("_", context.Parameters);
//嘗試去緩存中擷取。如果找到了,則直接用緩存中的值做傳回值
if (memoryCache.TryGetValue(cacheKey, out var cacheValue))
{
context.ReturnValue = cacheValue;
}
else
{
//如果緩存中沒有,則執行實際被攔截的方法
await policy.ExecuteAsync(ctx => next(context), pollyCtx);
//存入緩存中
using (var cacheEntry = memoryCache.CreateEntry(cacheKey))
{
cacheEntry.Value = context.ReturnValue;
cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds);
}
}
}
else//如果沒有啟用緩存,就直接執行業務方法
{
await policy.ExecuteAsync(ctx => next(context), pollyCtx);
}
}
}
這個HystrixCommand并非我原創,而是引用的楊中科老師在.NET微服務中的代碼,大家也可以直接通過NuGet安裝這個封裝好的Package:
NuGet>Install-Package RuPeng.HystrixCore
這裡不再多講解代碼,因為都有注釋,大家通過一個案例調試以下就了解流程了。
4.2 在ASP.NET Core的使用
(1)為了簡化代理類對象的注入,不用在ASP.NET Core中再通過ProxyGeneratorBuilder進行注入,我們引入一個AspectCore的DI擴充包:
NuGet>Install-Package AspectCore.Extensions.DependencyInjection
(2)改寫Startup類的ConfigureService方法,把傳回值從void改為IServiceProvider
// This method gets called by the runtime. Use this method to add services to the container.
public IServiceProvider ConfigureServices(IServiceCollection services)
{
services.AddMvc();
.......
// AoP - AspectCore
RegisterServices(this.GetType().Assembly, services);
return services.BuildAspectCoreServiceProvider();
}
這裡BuildAspectCoreServiceProvider就是讓AspectCore接管注入。RegisterService方法如下所示:
private static void RegisterServices(Assembly asm, IServiceCollection services)
{
foreach (var type in asm.GetExportedTypes())
{
bool hasHystrixCommand = type.GetMethods().Any(m =>
m.GetCustomAttribute(typeof(HystrixCommandAttribute)) != null);
if (hasHystrixCommand)
{
services.AddSingleton(type);
}
}
}
這裡使用反射,篩選出那些帶有HystrixCommandAttribute的類進行注入,進而減少一行一行注入的代碼工作量。
(3)這裡假設我們需要進行熔斷保護的方法所在類是一個ProductService類,它主要的功能就是通過HttpClient去調用ProductService的某個API,它的定義如下:
public class ProductService
{
[HystrixCommand(nameof(GetAllProductsFallBackAsync),
IsEnableCircuitBreaker = true,
ExceptionsAllowedBeforeBreaking = 3,
MillisecondsOfBreak = 1000 * 5)]
public virtual async Task<string> GetAllProductsAsync(string productType)
{
Console.WriteLine($"-->>Starting get product type : {productType}");
string str = null;
str.ToString();
// to do : using HttpClient to call outer service to get product list
return $"OK {productType}";
}
public virtual async Task<string> GetAllProductsFallBackAsync(string productType)
{
Console.WriteLine($"-->>FallBack : Starting get product type : {productType}");
return $"OK for FallBack {productType}";
}
}
這裡假設我們主要針對GetAllProductsAsync這個方法進行熔斷保護,假設它會調用另一個Service的擷取産品的接口,這個接口會通路核心資料庫,其每天的通路量很大,我們對此接口進行熔斷保護,設定在啟用熔斷保護前允許兩次故障(這裡主要指異常),熔斷保護時間為5s。
在Controller中,通過構造函數注入:
[Produces("application/json")]
[Route("api/Client")]
public class ClientController : Controller
{
private readonly IClientService clientService;
private readonly ProductService productService;
public ClientController(IClientService _clientService, ProductService _productService)
{
clientService = _clientService;
productService = _productService;
}
[HttpGet("{id}")]
public async Task<string> Get(int id)
{
var product = await productService.GetAllProductsAsync("B");
return product;
}
}
為了能夠在控制台中看到熔斷的資訊,我們增加一句Console.WriteLine到HystrixCommandAttribute中:
// 啟用熔斷保護(CircuitBreaker)
if (IsEnableCircuitBreaker)
{
policy = policy.WrapAsync(Policy.Handle<Exception>()
.CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking,
TimeSpan.FromMilliseconds(MillisecondsOfBreak), (ex, ts) =>
{
// assuem to do logging
Console.WriteLine($"Service API OnBreak -- ts = {ts.Seconds}s, ex.message = {ex.Message}");
}, () => {}));
}
這樣當Polly啟用熔斷時便會在控制台中輸出一段消息,實際使用中可以往日志中寫一段日志資訊。
(4)開起内置伺服器進行測試
Step1.借助指令行啟動一個WebAPI程式
Step2.借助Postman/SoapUI等API測試工具,輸入我們的URL,測試結果如下圖所示:
可以看到我們通過在Postman中通路這個URL進而觸發Service中的異常,兩次異常之後,便進入了熔斷保護時間,此後5s内的通路都沒有再進行實際代碼的執行,而直接進入了Fallback方法執行降級後的邏輯。5s保護時間之後,則再次進入實際代碼的執行。目前,這個Hystrix還存在一些問題,需繼續完善,還無法正式投入使用,後續會結合Polly和Ocelot,在API網關處做統一熔斷保護。
五、小結
本篇首先介紹了一下熔斷、降級以及AOP的基本概念,然後從兩個流行的庫Polly和AspectCore的基本使用開始了解如何在.NET Core代碼中實作熔斷機制和AOP,最後通過結合Polly+AspectCore封裝了一個Hystrix來介紹了一下如何在ASP.NET Core程式中如何做到标簽式地快速實作熔斷降級機制。後續,會将Polly與Ocelot結合實踐API網關,在Ocelot網關處做統一熔斷保護。
參考資料
楊中科,《.NET微服務直播課課件(第二版)》
guwei,《談談我對服務熔斷、服務降級的了解》
Jeffcky,《已被.NET基金會認可的彈性和瞬态故障處理庫Polly介紹》
Lemon,《Asp.Net Core輕量級Aop解決方案:AspectCore》
Sunday_Xiao,《服務熔斷保護Spring Cloud Hystrix》
Catcher Wong, 《再談Circuit Breaker之使用Polly》
Polly官方文檔,https://github.com/App-vNext/Polly
AspectCore官方文檔,https://github.com/dotnetcore/AspectCore-Framework
作者:周旭龍
出處:http://edisonchou.cnblogs.com
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連結。