天天看點

wrk源檔案閱讀

1:wrk是一個開源的基準測試工具

https://github.com/wg/wrk

2:代碼詳解

int main(int argc, char **argv) {
    char *url, **headers = zmalloc(argc * sizeof(char *));
    struct http_parser_url parts = {};

    // 在這裡解析指令行參數, 
    if (parse_args(&cfg, &url, &parts, headers, argc, argv)) {
        usage();
        exit(1);
    }

    // parts這個結構體标志了哪些位置被設定了,這個位置的index, 此位置下的結構體的偏移值及長度
    // copy_url_part 函數将url中的這一部分傳回出來, 得到對應的schema, post, port, service
    char *schema  = copy_url_part(url, &parts, UF_SCHEMA);
    char *host    = copy_url_part(url, &parts, UF_HOST);
    char *port    = copy_url_part(url, &parts, UF_PORT);
    char *service = port ? port : schema;

    // https做一些東西
    if (!strncmp("https", schema, 5)) {
        if ((cfg.ctx = ssl_init()) == NULL) {
            fprintf(stderr, "unable to initialize SSL\n");
            ERR_print_errors_fp(stderr);
            exit(1);
        }
        sock.connect  = ssl_connect;
        sock.close    = ssl_close;
        sock.read     = ssl_read;
        sock.write    = ssl_write;
        sock.readable = ssl_readable;
    }

    // 捕獲信号,防止異常退出造成僵屍程序
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT,  SIG_IGN);

    // 兩個stats結構體的元素, stats結構體記錄count, limit, min, max, data
    statistics.latency  = stats_alloc(cfg.timeout * 1000);
    statistics.requests = stats_alloc(MAX_THREAD_RATE_S);

    // thread結構體裡有connections, addr等socket相關的元素
    thread *threads     = zcalloc(cfg.threads * sizeof(thread));

    // 還未看這一塊,是解析lua腳本的
    lua_State *L = script_create(cfg.script, url, headers);
    if (!script_resolve(L, host, service)) {
        char *msg = strerror(errno);
        fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
        exit(1);
    }

    cfg.host = host;

    // cmd參數設定了多少個線程,就在這裡開多少個線程
    for (uint64_t i = 0; i < cfg.threads; i++) {
        thread *t      = &threads[i];

        // 參數時setsize, 設定事件的個數
        t->loop        = aeCreateEventLoop(10 + cfg.connections * 3);

        // 單個線程連接配接數
        t->connections = cfg.connections / cfg.threads;

        t->L = script_create(cfg.script, url, headers);
        script_init(L, t, argc - optind, &argv[optind]);

        if (i == 0) {
            cfg.pipeline = script_verify_request(t->L);
            cfg.dynamic  = !script_is_static(t->L);
            cfg.delay    = script_has_delay(t->L);
            if (script_want_response(t->L)) {
                parser_settings.on_header_field = header_field;
                parser_settings.on_header_value = header_value;
                parser_settings.on_body         = response_body;
            }
        }

        // t->loop不為空, 且開啟線程成功, 線程處理函數thread_main, 函數參數t
        // thread_main功能
        // 1: connect_socket建立socket并使用此fd, 調用aeCreateFileEvent, 向select/epoll 中添加fd
        // 2: aeCreateTimeEvent 建立時間事件
        // 3: 調用aeMain(loop), 在aeMain()中調用aeProcessEvents, 在aeProcessEvents()中處理loop中的fd,
        //  得到需要處理的讀fd,寫fd,并處理,傳回處理的個數
        // 4: aeDeleteEventLoop()處理記憶體垃圾,釋放記憶體
        if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
            char *msg = strerror(errno);
            fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
            exit(2);
        }
    }

    struct sigaction sa = {
        .sa_handler = handler,
        .sa_flags   = 0,
    };
    sigfillset(&sa.sa_mask);
    sigaction(SIGINT, &sa, NULL);

    // 标準輸出:運作開始
    char *time = format_time_s(cfg.duration);
    printf("Running %s test @ %s\n", time, url);
    printf("  %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);

    uint64_t start    = time_us();
    uint64_t complete = 0;
    uint64_t bytes    = 0;
    errors errors     = { 0 };

    sleep(cfg.duration);
    stop = 1;

    // 主線程退讓, 讓子線程完成工作
    for (uint64_t i = 0; i < cfg.threads; i++) {
        thread *t = &threads[i];
        pthread_join(t->thread, NULL);

        complete += t->complete;
        bytes    += t->bytes;

        errors.connect += t->errors.connect;
        errors.read    += t->errors.read;
        errors.write   += t->errors.write;
        errors.timeout += t->errors.timeout;
        errors.status  += t->errors.status;
    }

    // 标準輸出: 結果
    uint64_t runtime_us = time_us() - start;
    long double runtime_s   = runtime_us / 1000000.0;
    long double req_per_s   = complete   / runtime_s;
    long double bytes_per_s = bytes      / runtime_s;

    if (complete / cfg.connections > 0) {
        int64_t interval = runtime_us / (complete / cfg.connections);
        stats_correct(statistics.latency, interval);
    }

    print_stats_header();
    print_stats("Latency", statistics.latency, format_time_us);
    print_stats("Req/Sec", statistics.requests, format_metric);
    if (cfg.latency) print_stats_latency(statistics.latency);

    char *runtime_msg = format_time_us(runtime_us);

    printf("  %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
    if (errors.connect || errors.read || errors.write || errors.timeout) {
        printf("  Socket errors: connect %d, read %d, write %d, timeout %d\n",
               errors.connect, errors.read, errors.write, errors.timeout);
    }

    if (errors.status) {
        printf("  Non-2xx or 3xx responses: %d\n", errors.status);
    }

    printf("Requests/sec: %9.2Lf\n", req_per_s);
    printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));

    if (script_has_done(L)) {
        script_summary(L, runtime_us, complete, bytes);
        script_errors(L, &errors);
        script_done(L, statistics.latency, statistics.requests);
    }

    return 0;
}
           

繼續閱讀