實作.Net程式中OpenTracing采樣和上報配置的自動更新
前言
OpenTracing是一個鍊路跟蹤的開放協定,已經有開源的.net實作:opentracing-csharp,同時支援.net framework和.net core,Github位址:
https://github.com/opentracing/opentracing-csharp。
這個庫支援多種鍊路跟蹤模式,不過僅提供了最基礎的功能,想用在實際項目中還需要做很多增強,還好也有人做了開源項目:opentracing-contrib,Github位址:
https://github.com/opentracing-contrib/csharp-netcoreopentracing-contrib中內建了一個名為Jaeger的類庫,這個庫實作了鍊路跟蹤資料的采樣和上報,支援将資料上傳到Jaeger進行分析統計。
為了同時保障性能和跟蹤關鍵資料,能夠遠端調整采樣率是很重要的,Jaeger本身也提供了遠端配置采樣率的支援。
不過我這裡用的阿裡雲鍊路跟蹤不支援,配置的設計也和想要的不同,是以自己做了一個采樣和上報配置的動态更新,也才有了這篇文章。
思路
使用Jaeger初始化Tracer大概是這樣的:
var tracer = new Tracer.Builder(serviceName)
.WithSampler(sampler)
.WithReporter(reporter)
.Build();
GlobalTracer.Register(tracer);
首先是提供目前服務的名字,然後需要提供一個采樣器,再提供一個上報器,Build下生成ITracer的一個執行個體,最後注冊到全局。
可以分析得出,采樣和上報配置的更新就是更新采樣器和上報器。
不過Tracer并沒有提供UpdateSampler和UdapteReporter的方法,被卡住了,怎麼辦呢?
前文提到Jaeger是支援采樣率的動态調整的,看看它怎麼做的:
private RemoteControlledSampler(Builder builder)
{
...
_pollTimer = new Timer(_ => UpdateSampler(), null, TimeSpan.Zero, builder.PollingInterval);
}
/// <summary>
/// Updates <see cref="Sampler"/> to a new sampler when it is different.
/// </summary>
internal void UpdateSampler()
{
try
{
SamplingStrategyResponse response = _samplingManager.GetSamplingStrategyAsync(_serviceName)
.ConfigureAwait(false).GetAwaiter().GetResult();
...
UpdateRateLimitingOrProbabilisticSampler(response);
}
catch (Exception ex)
{
...
}
}
private void UpdateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse response)
{
...
lock (_lock)
{
if (!Sampler.Equals(sampler))
{
Sampler.Close();
Sampler = sampler;
...
}
}
}
這裡隻留下關鍵代碼,可以看到核心就是:通過一個Timer定時擷取采樣政策,然後替換原來的Sampler。
這是一個很好了解的辦法,下邊就按照這個思路來搞。
方案
分别提供一個可更新的Sampler和可更新的Reporter,Build Tracer時使用這兩個可更新的類。這裡延續開源項目中Samper和Reporter的建立方式,給出這兩個類。
可更新的Sampler:
internal class UpdatableSampler : ValueObject, ISampler
{
public const string Type = "updatable";
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly string _serviceName;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly IMetrics _metrics;
internal ISampler Sampler { get; private set; }
private UpdatableSampler(Builder builder)
{
_serviceName = builder.ServiceName;
_loggerFactory = builder.LoggerFactory;
_logger = _loggerFactory.CreateLogger<UpdatableSampler>();
_metrics = builder.Metrics;
Sampler = builder.InitialSampler;
}
/// <summary>
/// Updates <see cref="Sampler"/> to a new sampler when it is different.
/// </summary>
public void UpdateSampler(ISampler sampler)
{
try
{
_lock.EnterWriteLock();
if (!Sampler.Equals(sampler))
{
Sampler.Close();
Sampler = sampler;
_metrics.SamplerUpdated.Inc(1);
}
}
catch (System.Exception ex)
{
_logger.LogWarning(ex, "Updating sampler failed");
_metrics.SamplerQueryFailure.Inc(1);
}
finally
{
_lock.ExitWriteLock();
}
}
public SamplingStatus Sample(string operation, TraceId id)
{
try
{
_lock.EnterReadLock();
var status= Sampler.Sample(operation, id);
return status;
}
finally
{
_lock.ExitReadLock();
}
}
public override string ToString()
{
try
{
_lock.EnterReadLock();
return $"{nameof(UpdatableSampler)}(Sampler={Sampler})";
}
finally
{
_lock.ExitReadLock();
}
}
public void Close()
{
try
{
_lock.EnterWriteLock();
Sampler.Close();
}
finally
{
_lock.ExitWriteLock();
}
}
protected override IEnumerable<object> GetAtomicValues()
{
yield return Sampler;
}
public sealed class Builder
{
internal string ServiceName { get; }
internal ILoggerFactory LoggerFactory { get; private set; }
internal ISampler InitialSampler { get; private set; }
internal IMetrics Metrics { get; private set; }
public Builder(string serviceName)
{
ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName));
}
public Builder WithLoggerFactory(ILoggerFactory loggerFactory)
{
LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
return this;
}
public Builder WithInitialSampler(ISampler initialSampler)
{
InitialSampler = initialSampler ?? throw new ArgumentNullException(nameof(initialSampler));
return this;
}
public Builder WithMetrics(IMetrics metrics)
{
Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
return this;
}
public UpdatableSampler Build()
{
if (LoggerFactory == null)
{
LoggerFactory = NullLoggerFactory.Instance;
}
if (InitialSampler == null)
{
InitialSampler = new ProbabilisticSampler();
}
if (Metrics == null)
{
Metrics = new MetricsImpl(NoopMetricsFactory.Instance);
}
return new UpdatableSampler(this);
}
}
}
可更新的Reporter:
internal class UpdatableReporter : IReporter
{
public const string Type = "updatable";
private readonly string _serviceName;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly IMetrics _metrics;
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
internal IReporter Reporter { get; private set; }
private UpdatableReporter(Builder builder)
{
_serviceName = builder.ServiceName;
_loggerFactory = builder.LoggerFactory;
_logger = _loggerFactory.CreateLogger<UpdatableReporter>();
_metrics = builder.Metrics;
Reporter = builder.InitialReporter;
}
/// <summary>
/// Updates <see cref="Reporter"/> to a new reporter when it is different.
/// </summary>
public void UpdateReporter(IReporter reporter)
{
try
{
_lock.EnterWriteLock();
if (!Reporter.Equals(reporter))
{
Reporter.CloseAsync(CancellationToken.None).ConfigureAwait(false).GetAwaiter().GetResult();
Reporter = reporter;
_metrics.SamplerUpdated.Inc(1);
}
}
catch (System.Exception ex)
{
_logger.LogWarning(ex, "Updating reporter failed");
_metrics.ReporterFailure.Inc(1);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Report(Span span)
{
try
{
_lock.EnterReadLock();
Reporter.Report(span);
}
finally
{
_lock.ExitReadLock();
}
}
public override string ToString()
{
try
{
_lock.EnterReadLock();
return $"{nameof(UpdatableReporter)}(Reporter={Reporter})";
}
finally
{
_lock.ExitReadLock();
}
}
public async Task CloseAsync(CancellationToken cancellationToken)
{
try
{
_lock.EnterWriteLock();
await Reporter.CloseAsync(cancellationToken);
}
finally
{
_lock.ExitWriteLock();
}
}
public sealed class Builder
{
internal string ServiceName { get; }
internal ILoggerFactory LoggerFactory { get; private set; }
internal IReporter InitialReporter { get; private set; }
internal IMetrics Metrics { get; private set; }
public Builder(string serviceName)
{
ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName));
}
public Builder WithLoggerFactory(ILoggerFactory loggerFactory)
{
LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
return this;
}
public Builder WithInitialReporter(IReporter initialReporter)
{
InitialReporter = initialReporter ?? throw new ArgumentNullException(nameof(initialReporter));
return this;
}
public Builder WithMetrics(IMetrics metrics)
{
Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
return this;
}
public UpdatableReporter Build()
{
if (LoggerFactory == null)
{
LoggerFactory = NullLoggerFactory.Instance;
}
if (InitialReporter == null)
{
InitialReporter = new NoopReporter();
}
if (Metrics == null)
{
Metrics = new MetricsImpl(NoopMetricsFactory.Instance);
}
return new UpdatableReporter(this);
}
}
}
注意這裡邊用到了讀寫鎖,因為要做到不停止服務的更新,而且大部分情況下都是讀,使用lock就有點大柴小用了。
現在初始化Tracer大概是這樣的:
1
2
3
4
5
6
7
8
9
10
11
12
sampler = new UpdatableSampler.Builder(serviceName)
.WithInitialSampler(BuildSampler(configuration))
.Build();
reporter = new UpdatableReporter.Builder(serviceName)
.WithInitialReporter(BuildReporter(configuration))
var tracer = new Tracer.Builder(serviceName)
.WithSampler(sampler)
.WithReporter(reporter)
.Build();
當配置發生改變時,調用sampler和reporter的更新方法:
private void OnTracingConfigurationChanged(TracingConfiguration newConfiguration, TracingConfigurationChangedInfo changedInfo)
{
...
((UpdatableReporter)_reporter).UpdateReporter(BuildReporter(newConfiguration));
((UpdatableSampler)_sampler).UpdateSampler(BuildSampler(newConfiguration));
...
}
這裡就不寫如何監聽配置的改變了,使用Timer或者阻塞查詢等等都可以。
後記
opentracing-contrib這個項目隻支援.net core,如果想用在.net framwork中還需要自己搞,這個方法會單獨寫一篇文章,這裡就不做介紹了。
原文位址
https://www.cnblogs.com/bossma/p/dotnet-opentracing-sampler-reporter-remote-configuration-autoupdate.html