天天看点

wrk源文件阅读

1:wrk是一个开源的基准测试工具

https://github.com/wg/wrk

2:代码详解

int main(int argc, char **argv) {
    char *url, **headers = zmalloc(argc * sizeof(char *));
    struct http_parser_url parts = {};

    // 在这里解析命令行参数, 
    if (parse_args(&cfg, &url, &parts, headers, argc, argv)) {
        usage();
        exit(1);
    }

    // parts这个结构体标志了哪些位置被设置了,这个位置的index, 此位置下的结构体的偏移值及长度
    // copy_url_part 函数将url中的这一部分返回出来, 得到对应的schema, post, port, service
    char *schema  = copy_url_part(url, &parts, UF_SCHEMA);
    char *host    = copy_url_part(url, &parts, UF_HOST);
    char *port    = copy_url_part(url, &parts, UF_PORT);
    char *service = port ? port : schema;

    // https做一些东西
    if (!strncmp("https", schema, 5)) {
        if ((cfg.ctx = ssl_init()) == NULL) {
            fprintf(stderr, "unable to initialize SSL\n");
            ERR_print_errors_fp(stderr);
            exit(1);
        }
        sock.connect  = ssl_connect;
        sock.close    = ssl_close;
        sock.read     = ssl_read;
        sock.write    = ssl_write;
        sock.readable = ssl_readable;
    }

    // 捕获信号,防止异常退出造成僵尸进程
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT,  SIG_IGN);

    // 两个stats结构体的元素, stats结构体记录count, limit, min, max, data
    statistics.latency  = stats_alloc(cfg.timeout * 1000);
    statistics.requests = stats_alloc(MAX_THREAD_RATE_S);

    // thread结构体里有connections, addr等socket相关的元素
    thread *threads     = zcalloc(cfg.threads * sizeof(thread));

    // 还未看这一块,是解析lua脚本的
    lua_State *L = script_create(cfg.script, url, headers);
    if (!script_resolve(L, host, service)) {
        char *msg = strerror(errno);
        fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
        exit(1);
    }

    cfg.host = host;

    // cmd参数设置了多少个线程,就在这里开多少个线程
    for (uint64_t i = 0; i < cfg.threads; i++) {
        thread *t      = &threads[i];

        // 参数时setsize, 设置事件的个数
        t->loop        = aeCreateEventLoop(10 + cfg.connections * 3);

        // 单个线程连接数
        t->connections = cfg.connections / cfg.threads;

        t->L = script_create(cfg.script, url, headers);
        script_init(L, t, argc - optind, &argv[optind]);

        if (i == 0) {
            cfg.pipeline = script_verify_request(t->L);
            cfg.dynamic  = !script_is_static(t->L);
            cfg.delay    = script_has_delay(t->L);
            if (script_want_response(t->L)) {
                parser_settings.on_header_field = header_field;
                parser_settings.on_header_value = header_value;
                parser_settings.on_body         = response_body;
            }
        }

        // t->loop不为空, 且开启线程成功, 线程处理函数thread_main, 函数参数t
        // thread_main功能
        // 1: connect_socket创建socket并使用此fd, 调用aeCreateFileEvent, 向select/epoll 中添加fd
        // 2: aeCreateTimeEvent 创建时间事件
        // 3: 调用aeMain(loop), 在aeMain()中调用aeProcessEvents, 在aeProcessEvents()中处理loop中的fd,
        //  得到需要处理的读fd,写fd,并处理,返回处理的个数
        // 4: aeDeleteEventLoop()处理内存垃圾,释放内存
        if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
            char *msg = strerror(errno);
            fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
            exit(2);
        }
    }

    struct sigaction sa = {
        .sa_handler = handler,
        .sa_flags   = 0,
    };
    sigfillset(&sa.sa_mask);
    sigaction(SIGINT, &sa, NULL);

    // 标准输出:运行开始
    char *time = format_time_s(cfg.duration);
    printf("Running %s test @ %s\n", time, url);
    printf("  %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);

    uint64_t start    = time_us();
    uint64_t complete = 0;
    uint64_t bytes    = 0;
    errors errors     = { 0 };

    sleep(cfg.duration);
    stop = 1;

    // 主线程退让, 让子线程完成工作
    for (uint64_t i = 0; i < cfg.threads; i++) {
        thread *t = &threads[i];
        pthread_join(t->thread, NULL);

        complete += t->complete;
        bytes    += t->bytes;

        errors.connect += t->errors.connect;
        errors.read    += t->errors.read;
        errors.write   += t->errors.write;
        errors.timeout += t->errors.timeout;
        errors.status  += t->errors.status;
    }

    // 标准输出: 结果
    uint64_t runtime_us = time_us() - start;
    long double runtime_s   = runtime_us / 1000000.0;
    long double req_per_s   = complete   / runtime_s;
    long double bytes_per_s = bytes      / runtime_s;

    if (complete / cfg.connections > 0) {
        int64_t interval = runtime_us / (complete / cfg.connections);
        stats_correct(statistics.latency, interval);
    }

    print_stats_header();
    print_stats("Latency", statistics.latency, format_time_us);
    print_stats("Req/Sec", statistics.requests, format_metric);
    if (cfg.latency) print_stats_latency(statistics.latency);

    char *runtime_msg = format_time_us(runtime_us);

    printf("  %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
    if (errors.connect || errors.read || errors.write || errors.timeout) {
        printf("  Socket errors: connect %d, read %d, write %d, timeout %d\n",
               errors.connect, errors.read, errors.write, errors.timeout);
    }

    if (errors.status) {
        printf("  Non-2xx or 3xx responses: %d\n", errors.status);
    }

    printf("Requests/sec: %9.2Lf\n", req_per_s);
    printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));

    if (script_has_done(L)) {
        script_summary(L, runtime_us, complete, bytes);
        script_errors(L, &errors);
        script_done(L, statistics.latency, statistics.requests);
    }

    return 0;
}
           

继续阅读