天天看点

go-cron源码分析(一、由初始化引起的一些其他事)

最近在实现go的定时功能,所以先来了解一波cron的实现,看看是否可以借鉴。

1.1 Cron用法

我们先来看看用法:

c := cron.New()     // 新建一个定时器任务
c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })   // 绑定定时任务
c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
c.AddFunc("@hourly",      func() { fmt.Println("Every hour, starting an hour from now") })
c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") })
c.Start()    // 开始调度
..
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
c.AddFunc("@daily", func() { fmt.Println("Every day") })
..
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())  // 看一下下一个任务和上一个任务
..
c.Stop()  // Stop the scheduler (does not stop any jobs already running).      

1.2 cron.new()

创建一个cron的对象,没看源码之前,一直觉得new()就可以创建一个对象了,没想到看了源码之后才发现,这个是需要带参的。

// New returns a new Cron job runner, modified by the given options.
// New返回一个新的Cron作业运行器,由给定选项修改。
//
// Available Settings
// 可用的设置
//
//   Time Zone
//     Description: The time zone in which schedules are interpreted
//     描述:解释时间表的时区
//     Default:     time.Local
//
//   Parser
//     Description: Parser converts cron spec strings into cron.Schedules.
//     描述:解析器将cron规范字符串转换为cron. schedules。
//     Default:     Accepts this spec: https://en.wikipedia.org/wiki/Cron
//
//   Chain
//     Description: Wrap submitted jobs to customize behavior.
//     描述:对提交的作业进行包装以定制行为。
//     Default:     A chain that recovers panics and logs them to stderr.
//
// See "cron.With*" to modify the default behavior.
func New(opts ...Option) *Cron {
  c := &Cron{
    entries:   nil,
    chain:     NewChain(),
    add:       make(chan *Entry),
    stop:      make(chan struct{}),
    snapshot:  make(chan chan []Entry),
    remove:    make(chan EntryID),
    running:   false,
    runningMu: sync.Mutex{},
    logger:    DefaultLogger,
    location:  time.Local,
    parser:    standardParser,
  }
  for _, opt := range opts {
    opt(c)      // 可变参数都是函数,这样执行
  }
  return c
}      

这个函数接受可变参数,这个可变参数其实是函数,由下面的opt©来执行。

1.2.1 time zone

下面我们先看简单的time zone:

// 代码路径 option_test.go
func TestWithLocation(t *testing.T) { // 其实这是一个测试用例
  c := New(WithLocation(time.UTC))  
  if c.location != time.UTC {
    t.Errorf("expected UTC, got %v", c.location)
  }
}

// WithLocation overrides the timezone of the cron instance.
func WithLocation(loc *time.Location) Option {
  return func(c *Cron) {    // 这个函数也比较简单,就是返回一个匿名函数,这个匿名函数去修改c的location的值
    c.location = loc
  }
}      

1.2.2 Parser

func TestWithParser(t *testing.T) {
  var parser = NewParser(Dow)     // 主动申请一个解析器,这个比较难,之后在看
  c := New(WithParser(parser))        // 通过这个函数修改默认的解析器
  if c.parser != parser {
    t.Error("expected provided parser")
  }
}

// WithParser overrides the parser used for interpreting job schedules.
func WithParser(p ScheduleParser) Option {    // 这个解析器也比较简单
  return func(c *Cron) {
    c.parser = p
  }
}

// 不过在这里我看到一个比较有意思的解析器,有带秒的解析器
// WithSeconds overrides the parser used for interpreting job schedules to
// include a seconds field as the first one.
func WithSeconds() Option {           // 我们上面的例子,并没有秒的定时任务,有了这个自定义解析器,我们就可以实现秒定时了
  return WithParser(NewParser(
    Second | Minute | Hour | Dom | Month | Dow | Descriptor,
  ))
}      

实现秒定时:

import (
    "fmt"
    "github.com/robfig/cron/v3"
    "time"
)

func main() {
    fmt.Println("time ", time.Now())
    c := cron.New(cron.WithSeconds())   // 指定自定义解析器
    // 这种绑定就是1秒执行一次
    c.AddFunc("*/1 * * * * *", func() { fmt.Println("Every hour on the half hour", time.Now()) })
    c.Start()

    select {}

}      

1.2.3 Chain

这个chain并没有简单的例子,可以等到分析Chain再详细说明

// WithChain specifies Job wrappers to apply to all jobs added to this cron.
// Refer to the Chain* functions in this package for provided wrappers.
func WithChain(wrappers ...JobWrapper) Option {   // 只是提供了一个简单的修改chain
  return func(c *Cron) {
    c.chain = NewChain(wrappers...)
  }
}      

1.2.4 logger

修改日志,这个可以好好分析分析了

func TestWithVerboseLogger(t *testing.T) {
  var buf syncWriter
  var logger = log.New(&buf, "", log.LstdFlags)     // 自己自定义了一个log
  c := New(WithLogger(VerbosePrintfLogger(logger)))   // 绑定自己指定以的log
  if c.logger.(printfLogger).logger != logger {
    t.Error("expected provided logger")
  }

  c.AddFunc("@every 1s", func() {})
  c.Start()
  time.Sleep(OneSecond)
  c.Stop()
  out := buf.String()
  if !strings.Contains(out, "schedule,") ||
    !strings.Contains(out, "run,") {
    t.Error("expected to see some actions, got:", out)
  }
}

// VerbosePrintfLogger wraps a Printf-based logger (such as the standard library
// "log") into an implementation of the Logger interface which logs everything.
func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {  // 需要自己使用Printf函数
  return printfLogger{l, true}   // 这是返回一个Logger对象
}

// WithLogger uses the provided logger.
func WithLogger(logger Logger) Option {
  return func(c *Cron) {        // 实际调用的也是这样修改
    c.logger = logger   
  }
}      

那我们实现一下自己的日志库,能不能打印出来

package main

// 需要拉取项目:go get github.com/robfig/cron/[email protected]

import (
  "fmt"
  "github.com/robfig/cron/v3"
  "time"
)

type MyLog struct {
  buff string
}

func (this *MyLog)Printf(format string, v ...interface{}) {  // 看了下日志源码,只要实现这个接口,就可以传参
  this.buff = fmt.Sprintf("mylog ", time.Now(), "hahahh")
}


func main() {
  fmt.Println("time ", time.Now())
  var mylog MyLog
  // cron.VerbosePrintfLogger(&mylog) 这个就是把自己的日志,转成cron的日志
  c := cron.New(cron.WithSeconds(), cron.WithLogger(cron.VerbosePrintfLogger(&mylog)))
  //c.AddFunc("*/1 * * * * *", func() { fmt.Println("Every hour on the half hour", time.Now()) })
  c.AddFunc("*/1 * * * * *", func() { fmt.Println("log ", mylog.buff) })  // 1秒打一个
  c.Start()


  select {}

}      

看看输出:

time  2022-11-03 20:22:31.0890216 +0800 CST m=+0.001725601
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:32.0028412 +0800 CST m=+0.915545201, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:33.0008532 +0800 CST m=+1.913557201, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:34.0109866 +0800 CST m=+2.923690601, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:35.0028254 +0800 CST m=+3.915529401, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:36.0023282 +0800 CST m=+4.915032201, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:37.0020753 +0800 CST m=+5.914779301, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:38.0028339 +0800 CST m=+6.915537901, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:39.0023824 +0800 CST m=+7.915086401, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:40.0028723 +0800 CST m=+8.915576301, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:41.0037012 +0800 CST m=+9.916405201, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:42.0035357 +0800 CST m=+10.916239701, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:43.0024844 +0800 CST m=+11.915188401, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:44.0039952 +0800 CST m=+12.916699201, string=hahahh)
log  mylog %!(EXTRA time.Time=2022-11-03 20:22:45.002594 +0800 CST m=+13.915298001, string=hahahh)      

虽然日志没有什么实际用处,但可以熟悉一个cron的日志使用。

1.3 Logger.go

看着字数很少,就跟着分析一下日志吧,上面说过只要实现Printf函数即可,那我们就来看看。

1.3.1 DefaultLogger

首先来看看缺省日志

// DefaultLogger is used by Cron if none is specified.
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))

// 分析PrintfLogger这个函数
// PrintfLogger wraps a Printf-based logger (such as the standard library "log")
// into an implementation of the Logger interface which logs errors only.
// PrintfLogger将基于printf的记录器(如标准库“日志”)封装到logger接口的实现中,该实现只记录错误。
// 这个函数接受一个Printf函数作为接口l,我们上面的log对象中就有Printf函数,可以追踪进去看看
func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  return printfLogger{l, false}   // 这是一个结构体,返回结构题的对象
}

// 这里还差了一个Logger对象是啥?
// Logger is the interface used in this package for logging, so that any backend
// can be plugged in. It is a subset of the github.com/go-logr/logr interface.
type Logger interface {
  // Info logs routine messages about cron's operation.
  Info(msg string, keysAndValues ...interface{})
  // Error logs an error condition.
  Error(err error, msg string, keysAndValues ...interface{})
}      

看到这里就明白了,原来搞了这么半天,原来是为了实现Logger接口,是通过那两个函数来实现的

1.3.2 printfLogger结构体

// 通过分析Logger接口,这个必然会出Info Error两个函数,才能显示Logger接口
type printfLogger struct {
  logger  interface{ Printf(string, ...interface{}) }
  logInfo bool
}

func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) {  // 果然有
  if pl.logInfo {
    keysAndValues = formatTimes(keysAndValues)
    pl.logger.Printf(
      formatString(len(keysAndValues)),
      append([]interface{}{msg}, keysAndValues...)...)
  }
}

func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) {  // 果然有
  keysAndValues = formatTimes(keysAndValues)
  pl.logger.Printf(
    formatString(len(keysAndValues)+2),
    append([]interface{}{msg, "error", err}, keysAndValues...)...)
}      

自此我们日志系统分析完成。

1.3.3 cron支持zap日志

因为我们项目一般都是使用zap做为日志管理,那zap能不能作为cron的日志呢?

答案明显是可以的,下面我来写一个demo:

package main

// 需要拉取项目:go get github.com/robfig/cron/[email protected]

import (
  "fmt"
  "github.com/robfig/cron/v3"
  "go.uber.org/zap"
  "time"
)

type MyZap struct {
  logger *zap.Logger
}

// 自己把zap封装了一层,然后自己实现了Info接口
func (this *MyZap)Info(format string,  keysAndValues ...interface{}) {
  this.logger.Info(fmt.Sprintf(format, keysAndValues))
}

// 这也是自己实现了Error接口
func (this *MyZap)Error(err error, format string,  keysAndValues ...interface{}) {
  this.logger.Error(fmt.Sprintf(format, keysAndValues))
}

// new了一个对象
func NewMyzap() *MyZap{
  mzap := &MyZap{}
  mzap.logger, _ = zap.NewProduction()
  return mzap
}

func main() {
  fmt.Println("time ", time.Now())

  mzap := NewMyzap()
  c := cron.New(cron.WithSeconds(), cron.WithLogger(mzap))  // 直接把mzap对象指针传入,不用那个转函数函数了,因为我直接实现了Logger接口
  c.AddFunc("*/1 * * * * *", func() { fmt.Println("log ", mylog.buff) })
  c.Start()

  select {}

}      

1.4 总结