天天看點

5_03_GLib庫入門與實踐_線程

簡介

GLib提供了對線程、互斥鎖、遞歸鎖、讀寫鎖、條件量、線程私有資料、線程一次性初始化、位鎖及指針位鎖的操作。

資料結構

struct GThread // 線程

union GMutex // 互斥鎖

struct GRecMutex // 遞歸鎖

struct GRWLock // 讀寫鎖

struct GCond // 條件變量

struct GPrivate // 線程私有資料

struct GOnce // 線程一次性初始化

函數清單

gpointer 	(*GThreadFunc) ()
GThread * 	g_thread_new ()
GThread * 	g_thread_try_new ()
GThread * 	g_thread_ref ()
void 	g_thread_unref ()
gpointer 	g_thread_join ()
void 	g_thread_yield ()
void 	g_thread_exit ()
GThread * 	g_thread_self ()
void 	g_mutex_init ()
void 	g_mutex_clear ()
void 	g_mutex_lock ()
gboolean 	g_mutex_trylock ()
void 	g_mutex_unlock ()
GMutexLocker * 	g_mutex_locker_new ()
void 	g_mutex_locker_free ()
#define 	G_LOCK_DEFINE()
#define 	G_LOCK_DEFINE_STATIC()
#define 	G_LOCK_EXTERN()
#define 	G_LOCK()
#define 	G_TRYLOCK()
#define 	G_UNLOCK()
void 	g_rec_mutex_init ()
void 	g_rec_mutex_clear ()
void 	g_rec_mutex_lock ()
gboolean 	g_rec_mutex_trylock ()
void 	g_rec_mutex_unlock ()
void 	g_rw_lock_init ()
void 	g_rw_lock_clear ()
void 	g_rw_lock_writer_lock ()
gboolean 	g_rw_lock_writer_trylock ()
void 	g_rw_lock_writer_unlock ()
void 	g_rw_lock_reader_lock ()
gboolean 	g_rw_lock_reader_trylock ()
void 	g_rw_lock_reader_unlock ()
void 	g_cond_init ()
void 	g_cond_clear ()
void 	g_cond_wait ()
gboolean 	g_cond_timed_wait ()
gboolean 	g_cond_wait_until ()
void 	g_cond_signal ()
void 	g_cond_broadcast ()
#define 	G_PRIVATE_INIT()
gpointer 	g_private_get ()
void 	g_private_set ()
void 	g_private_replace ()
#define 	g_once()
gboolean 	g_once_init_enter ()
void 	g_once_init_leave ()
void 	g_bit_lock ()
gboolean 	g_bit_trylock ()
void 	g_bit_unlock ()
void 	g_pointer_bit_lock ()
gboolean 	g_pointer_bit_trylock ()
void 	g_pointer_bit_unlock ()
guint 	g_get_num_processors ()
           

函數功能分類

線程

GThread * 	g_thread_new ()
GThread * 	g_thread_try_new ()
GThread * 	g_thread_ref ()
void 	g_thread_unref ()
gpointer 	g_thread_join ()
void 	g_thread_yield ()
void 	g_thread_exit ()
GThread * 	g_thread_self ()
           

互斥鎖

void 	g_mutex_init ()
void 	g_mutex_clear ()
void 	g_mutex_lock ()
gboolean 	g_mutex_trylock ()
void 	g_mutex_unlock ()
GMutexLocker * 	g_mutex_locker_new ()
void 	g_mutex_locker_free ()
#define 	G_LOCK_DEFINE()
#define 	G_LOCK_DEFINE_STATIC()
#define 	G_LOCK_EXTERN()
#define 	G_LOCK()
#define 	G_TRYLOCK()
#define 	G_UNLOCK()
           

遞歸鎖

void 	g_rec_mutex_init ()
void 	g_rec_mutex_clear ()
void 	g_rec_mutex_lock ()
gboolean 	g_rec_mutex_trylock ()
void 	g_rec_mutex_unlock ()
           

讀寫鎖

void 	g_rw_lock_init ()
void 	g_rw_lock_clear ()
void 	g_rw_lock_writer_lock ()
gboolean 	g_rw_lock_writer_trylock ()
void 	g_rw_lock_writer_unlock ()
void 	g_rw_lock_reader_lock ()
gboolean 	g_rw_lock_reader_trylock ()
void 	g_rw_lock_reader_unlock ()
           

條件變量

void 	g_cond_init ()
void 	g_cond_clear ()
void 	g_cond_wait ()
gboolean 	g_cond_timed_wait ()
gboolean 	g_cond_wait_until ()
void 	g_cond_signal ()
void 	g_cond_broadcast ()
           

線程私有資料

#define 	G_PRIVATE_INIT()
gpointer 	g_private_get ()
void 	g_private_set ()
void 	g_private_replace ()
           

一次性初始化

#define 	g_once()
gboolean 	g_once_init_enter ()
void 	g_once_init_leave ()
           

位鎖

void 	g_bit_lock ()
gboolean 	g_bit_trylock ()
void 	g_bit_unlock ()
           

指針位鎖

void 	g_pointer_bit_lock ()
gboolean 	g_pointer_bit_trylock ()
void 	g_pointer_bit_unlock ()
guint 	g_get_num_processors ()
           

函數功能說明及綜合示範

多線程程式設計

與标準的線程相關函數,Glib提供的函數少了pthread_detach和pthread_attr_xxx函數,前者作用是将線程标記為分離狀态,後者作用是設定線程屬性。

GThread * 	g_thread_new ()
GThread * 	g_thread_try_new ()
GThread * 	g_thread_ref ()
void 	g_thread_unref ()
gpointer 	g_thread_join ()
void 	g_thread_yield ()
void 	g_thread_exit ()
GThread * 	g_thread_self ()
           

g_thread_try_new:嘗試建立一個線程,當建立失敗,會産生錯誤原因。

g_thread_ref/g_thread_unref:每一個Glib對象内部都有引用計數這個概念,當引用計數減到0時,對象會自動釋放。

g_thread_join:線程連接配接函數。此函數一般有兩個作用:1.等待線程結束;2.,線程結束時,對線程資源進行回收,否則線程會變成僵屍線程。

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_thread

#include <glib.h>

gint global_count = 0;
static gpointer _thread_func(gpointer data)
{
    gint i = 0;
    gpointer retval = NULL;

    for(i=0;i<10000;i++) {
        global_count += i;
    }
    g_print("count:%d \n", global_count);
}

void test_thread_join(void)
{
    GThread *th1,*th2,*th3;
    th1 = g_thread_new("th1", _thread_func, NULL);
    th2 = g_thread_new("th2", _thread_func, NULL);
    th3 = g_thread_new("th3", _thread_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/join", test_thread_join);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_thread 
/thread/join: count:49995000 
count:99990000 
count:149985000 
OK
[[email protected]_6 build]# ./glib_threads_thread 
/thread/join: count:49995000 
count:99990000 
count:111230911 
OK
[[email protected]_6 build]# ./glib_threads_thread 
/thread/join: count:49995000 
count:99990000 
count:149985000 
OK
           

運作多次,大部分次數都符合預期,隻有少部分次數不符合預期。很明顯,這裡需要線程同步。

互斥鎖

互斥鎖相關的操作有以下幾個函數:

void 	g_mutex_init ()
void 	g_mutex_clear ()
void 	g_mutex_lock ()
gboolean 	g_mutex_trylock ()
void 	g_mutex_unlock ()
GMutexLocker * 	g_mutex_locker_new ()
void 	g_mutex_locker_free ()
#define 	G_LOCK_DEFINE()
#define 	G_LOCK_DEFINE_STATIC()
#define 	G_LOCK_EXTERN()
#define 	G_LOCK()
#define 	G_TRYLOCK()
#define 	G_UNLOCK()
           

針對上一節的程式,我們使用互斥鎖進行線程同步。

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_mutex

#include <glib.h>

gint global_count = 0;
GMutex *global_mutex = NULL;
static gpointer _thread_func(gpointer data)
{
    gint i = 0;
    gpointer retval = NULL;

    g_mutex_lock(global_mutex);
    for(i=0;i<10000;i++) {
        global_count += i;
    }
    g_print("count:%d \n", global_count);
    g_mutex_unlock(global_mutex);

}

void test_thread_mutex(void)
{
    GThread *th1,*th2,*th3;
    global_mutex = g_new0(GMutex, 1);

    g_mutex_init(global_mutex);
    th1 = g_thread_new("th1", _thread_func, NULL);
    th2 = g_thread_new("th2", _thread_func, NULL);
    th3 = g_thread_new("th3", _thread_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
    g_mutex_clear(global_mutex);
    g_free(global_mutex);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/mutex", test_thread_mutex);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_mutex 
/thread/mutex: count:49995000 
count:99990000 
count:149985000 
OK
[[email protected]_6 build]# ./glib_threads_mutex 
/thread/mutex: count:49995000 
count: 149985000 
count:99990000
OK
           

可以看到,盡管無法控制單個線程的最終執行結果,但還是可以保證每個線程執行的結果是49995000的整數倍。

互斥鎖建立:G_LOCK_DEFINE和G_LOCK_DEFINE_STATIC

G_LOCK_DEFINE可以對GMutex變量或其他任意類型的變量初始化。

G_LOCK_DEFINE_STATIC可以建立一個靜态對象的互斥鎖。

示例一:建立GMutex變量互斥鎖

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_mutex_init_gmutex

#include <glib.h>

gint global_count = 0;
GMutex global_mutex;

G_LOCK_DEFINE(global_mutex);

static gpointer _thread_func(gpointer data)
{
    gint i = 0;
    gpointer retval = NULL;

    G_LOCK(global_mutex);
    for(i=0;i<10000;i++) {
        global_count += i;
    }
    g_print("count:%d \n", global_count);
    G_UNLOCK(global_mutex);
}

void test_thread_mutex(void)
{
    GThread *th1,*th2,*th3;

    th1 = g_thread_new("th1", _thread_func, NULL);
    th2 = g_thread_new("th2", _thread_func, NULL);
    th3 = g_thread_new("th3", _thread_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/mutex", test_thread_mutex);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_mutex_init_gmutex
/thread/mutex: count:49995000
count:99990000
count:149985000
OK
           

示例二:建立char類型變量互斥鎖

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_mutex_init_char

#include <glib.h>

gint global_count = 0;
gchar global_mutex = 0;

G_LOCK_DEFINE(global_mutex);

static gpointer _thread_func(gpointer data)
{
    gint i = 0;
    gpointer retval = NULL;

    G_LOCK(global_mutex);
    for(i=0;i<10000;i++) {
        global_count += i;
    }
    g_print("count:%d \n", global_count);
    G_UNLOCK(global_mutex);

}

void test_thread_mutex(void)
{
    GThread *th1,*th2,*th3;

    th1 = g_thread_new("th1", _thread_func, NULL);
    th2 = g_thread_new("th2", _thread_func, NULL);
    th3 = g_thread_new("th3", _thread_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/mutex", test_thread_mutex);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_mutex_init_char
/thread/mutex: count:49995000
count:99990000
count:149985000
OK
           

示例三:建立靜态變量互斥鎖

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_mutex_init_static

#include <glib.h>

gint global_count = 0;

G_LOCK_DEFINE_STATIC(global_mutex);

static gpointer _thread_func(gpointer data)
{
    gint i = 0;
    gpointer retval = NULL;

    G_LOCK(global_mutex);
    for(i=0;i<10000;i++) {
        global_count += i;
    }
    g_print("count:%d \n", global_count);
    G_UNLOCK(global_mutex);
}

void test_thread_mutex(void)
{
    GThread *th1,*th2,*th3;

    th1 = g_thread_new("th1", _thread_func, NULL);
    th2 = g_thread_new("th2", _thread_func, NULL);
    th3 = g_thread_new("th3", _thread_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/mutex", test_thread_mutex);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_mutex_init_static
/thread/mutex: count:49995000
count:99990000
count:149985000
OK
           

遞歸鎖

遞歸鎖的操作函數如下:

void 	g_rec_mutex_init ()
void 	g_rec_mutex_clear ()
void 	g_rec_mutex_lock ()
gboolean 	g_rec_mutex_trylock ()
void 	g_rec_mutex_unlock ()
           

遞歸鎖和非遞歸鎖的差別:

同一個線程可以多次擷取同一個遞歸鎖,不會産生死鎖。而如果一個線程多次擷取同一個非遞歸鎖,則會産生死鎖。

Linux下pthread_mutex_t預設是非遞歸鎖,可以顯示地設定鎖的PTHREAD_MUTEX_RECURSIVE屬性,将pthread_mutex_t變為遞歸鎖。

遞歸鎖比較複雜,更容易出問題,一般推薦使用非遞歸鎖。

下面示範非遞歸鎖和遞歸鎖的用法對比,在同一線程,調用非遞歸鎖和遞歸鎖。

同一線程調用非遞歸鎖

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_mutex_non_rec_err

#include <glib.h>

#define GLIB_THREADS_FORCE_QUIT 1  // force quit

gint global_count = 0;
GMutex *global_mutex = NULL;
GTimer *timer = NULL;
static gpointer _thread_func(gpointer data)
{
    gint i = 0;
    gpointer retval = NULL;

    g_mutex_lock(global_mutex);
    for(i=0;i<10000;i++) {
        global_count += i;
    }
    g_usleep(GPOINTER_TO_INT(data)*1000*1000);
    g_print("count:%d, elapsed:%f \n", global_count, g_timer_elapsed(timer, NULL));
    
    g_print("before lock a non-recursion mutex again \n");
    g_mutex_lock(global_mutex);
    g_print("do something... \n");
    g_mutex_unlock(global_mutex);
    g_print("unlock a non-recursion mutex \n");

    g_mutex_unlock(global_mutex);
}

void glib_threads_mutex_non_rec_err(void)
{
    GThread *th1,*th2,*th3;

    timer = g_timer_new();
    g_timer_start(timer);

    global_mutex = g_new0(GMutex, 1);

    g_mutex_init(global_mutex);
    th1 = g_thread_new("th1", _thread_func, GINT_TO_POINTER(3));
    th2 = g_thread_new("th2", _thread_func, GINT_TO_POINTER(2));
    th3 = g_thread_new("th3", _thread_func, GINT_TO_POINTER(1));

#if GLIB_THREADS_FORCE_QUIT
    g_usleep(5*1000*1000);  // sleep 5s
    g_print("ERROR! quit with exception! \n");
    exit(-1);
#endif
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
    g_mutex_clear(global_mutex);
    g_free(global_mutex);

    g_timer_destroy(timer);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/mutex", glib_threads_mutex_non_rec_err);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_mutex_non_rec_err
/thread/mutex: count:49995000, elapsed:2.000509
before lock a non-recursion mutex again
ERROR! quit with exception!
           

由于非遞歸鎖在重複加鎖之後程式無法退出,會一直卡死,影響自動化運作,是以上述代碼加了一個強制退出機制,休眠5秒将會強制退出整個程式。

同一線程調用遞歸鎖

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_rec_mutex

#include <glib.h>

gint global_count = 0;
GRecMutex *global_rec_mutex = NULL;
GTimer *timer = NULL;
static gpointer _thread_func(gpointer data)
{
    gint i = 0;
    gpointer retval = NULL;

    g_rec_mutex_lock(global_rec_mutex);
    g_print("---sleep %d lock \n", GPOINTER_TO_INT(data));

    for(i=0;i<10000;i++) {
        global_count += i;
    }
    g_usleep(GPOINTER_TO_INT(data)*1000*1000);
    g_print("count:%d, elapsed:%f \n", global_count, g_timer_elapsed(timer, NULL));

    g_print("----sleep %d unlock \n", GPOINTER_TO_INT(data));

    g_print("before lock a recursion mutex \n");
    g_rec_mutex_lock(global_rec_mutex);
    g_rec_mutex_unlock(global_rec_mutex);
    g_print("after unlock a recursion mutex \n");

    g_rec_mutex_unlock(global_rec_mutex);

}

void test_thread_rec_mutex(void)
{
    GThread *th1,*th2,*th3;

    timer = g_timer_new();
    g_timer_start(timer);

    global_rec_mutex = g_new0(GRecMutex, 1);

    g_rec_mutex_init(global_rec_mutex);
    th1 = g_thread_new("th1", _thread_func, GINT_TO_POINTER(3));
    th2 = g_thread_new("th2", _thread_func, GINT_TO_POINTER(2));
    th3 = g_thread_new("th3", _thread_func, GINT_TO_POINTER(1));
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
    g_rec_mutex_clear(global_rec_mutex);
    g_free(global_rec_mutex);

    g_timer_destroy(timer);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/rec_mutex", test_thread_rec_mutex);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_rec_mutex
/thread/rec_mutex: ---sleep 3 lock
count:49995000, elapsed:3.002168
----sleep 3 unlock
before lock a recursion mutex
after unlock a recursion mutex
---sleep 1 lock
count:99990000, elapsed:4.004852
----sleep 1 unlock
before lock a recursion mutex
after unlock a recursion mutex
---sleep 2 lock
count:149985000, elapsed:6.005817
----sleep 2 unlock
before lock a recursion mutex
after unlock a recursion mutex
OK
           

讀寫鎖

讀寫鎖适用場景:多個線程都會對資源進行讀取,隻有少量線程對資源進行寫入,且讀取頻繁,寫入不頻繁。

讀寫鎖的操作函數見下:

void 	g_rw_lock_init ()
void 	g_rw_lock_clear ()
void 	g_rw_lock_writer_lock ()
gboolean 	g_rw_lock_writer_trylock ()
void 	g_rw_lock_writer_unlock ()
void 	g_rw_lock_reader_lock ()
gboolean 	g_rw_lock_reader_trylock ()
void 	g_rw_lock_reader_unlock ()
           

下面舉例示範讀寫鎖。該程式建立了4個線程,兩個寫線程,兩個讀線程,每個線程占用給定時間的鎖,然後休眠指定時間,把CPU讓給其他線程。

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_rwlock

#include <glib.h>

gint global_count = 0;
GRWLock *global_rwlock = NULL;
GTimer *timer = NULL;

static gpointer _thread_writer_small_func(gpointer data)
{
    gint i = 0;

    for(i=0;i<5;i++) {
        g_rw_lock_writer_lock(global_rwlock);
        g_print("writer small lock \n");
        global_count += i;
        g_print("writer small global_count: %d elapsed:%f \n", global_count, g_timer_elapsed(timer, NULL));
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
        g_print("writer small unlock \n");
        g_rw_lock_writer_unlock(global_rwlock);
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
    }
}

static gpointer _thread_writer_large_func(gpointer data)
{
    gint i = 0;

    for(i=0;i<5;i++) {
        g_rw_lock_writer_lock(global_rwlock);
        g_print("writer large lock \n");
        global_count += i*100;
        g_print("writer large global_count: %d elapsed:%f \n", global_count, g_timer_elapsed(timer, NULL));
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
        g_print("writer large unlock \n");
        g_rw_lock_writer_unlock(global_rwlock);
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
    }
}

static gpointer _thread_reader1_func(gpointer data)
{
    gint i = 0;

    for(i=0;i<5;i++) {
        g_rw_lock_reader_lock(global_rwlock);
        g_print("reader1 lock \n");
        g_print("reader1 global_count: %d elapsed:%f \n", global_count, g_timer_elapsed(timer, NULL));
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
        g_print("reader1 unlock \n");
        g_rw_lock_reader_unlock(global_rwlock);
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
    }
}

static gpointer _thread_reader2_func(gpointer data)
{
    gint i = 0;

    for(i=0;i<5;i++) {
        g_rw_lock_reader_lock(global_rwlock);
        g_print("reader2 lock \n");
        g_print("reader2 global_count: %d elapsed:%f \n", global_count, g_timer_elapsed(timer, NULL));
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
        g_print("reader2 unlock \n");
        g_rw_lock_reader_unlock(global_rwlock);
        g_usleep((GPOINTER_TO_INT(data)*100*1000));
    }
}

void test_thread_rwlock(void)
{
    GThread *th1,*th2,*th3,*th4;

    timer = g_timer_new();
    g_timer_start(timer);

    global_rwlock = g_new0(GRWLock, 1);

    g_print("\n");

    g_rw_lock_init(global_rwlock);
    th1 = g_thread_new("th1", _thread_writer_small_func, GINT_TO_POINTER(1));
    th2 = g_thread_new("th2", _thread_writer_large_func, GINT_TO_POINTER(2));
    th3 = g_thread_new("th3", _thread_reader1_func, GINT_TO_POINTER(1));
    th4 = g_thread_new("th4", _thread_reader2_func, GINT_TO_POINTER(2));
    g_thread_join(th1);
    g_thread_join(th2);
    g_thread_join(th3);
    g_thread_join(th4);
    g_rw_lock_clear(global_rwlock);
    g_free(global_rwlock);

    g_timer_destroy(timer);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/rwlock", test_thread_rwlock);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_rwlock 
/thread/rwlock: 
writer small lock 
writer small global_count: 0 elapsed:0.000306 
writer small unlock 
writer large lock 
writer large global_count: 0 elapsed:0.101299 
writer large unlock 
writer small lock 
writer small global_count: 1 elapsed:0.303936 
writer small unlock 
reader1 lock 
reader1 global_count: 1 elapsed:0.406466 
reader2 lock 
reader2 global_count: 1 elapsed:0.406485 
reader1 unlock 
reader1 lock 
reader1 global_count: 1 elapsed:0.609238 
reader2 unlock 
reader1 unlock 
writer small lock 
writer small global_count: 3 elapsed:0.711543 
writer small unlock 
writer large lock 
writer large global_count: 103 elapsed:0.813941 
writer large unlock 
writer small lock 
writer small global_count: 106 elapsed:1.016799 
writer small unlock 
reader2 lock 
reader2 global_count: 106 elapsed:1.119176 
reader1 lock 
reader1 global_count: 106 elapsed:1.119212 
reader1 unlock 
reader1 lock 
reader1 global_count: 106 elapsed:1.321583 
reader2 unlock 
reader1 unlock 
writer large lock 
writer large global_count: 306 elapsed:1.423905 
writer large unlock 
writer small lock 
writer small global_count: 310 elapsed:1.626801 
writer small unlock 
reader1 lock 
reader1 global_count: 310 elapsed:1.727450 
reader2 lock 
reader2 global_count: 310 elapsed:1.727507 
reader1 unlock 
reader2 unlock 
writer large lock 
writer large global_count: 610 elapsed:1.929952 
writer large unlock 
reader2 lock 
reader2 global_count: 610 elapsed:2.130435 
reader2 unlock 
writer large lock 
writer large global_count: 1010 elapsed:2.332422 
writer large unlock 
reader2 lock 
reader2 global_count: 1010 elapsed:2.534962 
reader2 unlock 
OK
           

通過結果可以看出:

  • 讀鎖與讀鎖之間不互斥
  • 讀鎖與寫鎖之間互斥
  • 寫鎖與寫鎖之間互斥

條件量

條件量的操作函數如下:

// 初始化
void 	g_cond_init ()

// 銷毀
void 	g_cond_clear ()

// 等待條件量
void 	g_cond_wait ()

// 等待條件量,逾時則傳回
gboolean 	g_cond_timed_wait ()

// 等待條件量,等到該信号或逾時均退出
gboolean 	g_cond_wait_until ()

// 喚醒一個等待的線程,如果有多個線程則按入隊順序喚醒第一個
void 	g_cond_signal ()

// 喚醒所有等待的線程
void 	g_cond_broadcast ()
           

互斥鎖隻有兩種狀态:鎖定和非鎖定。條件量允許A線程等待B線程發送信号。A線程因等待條件變量信号而挂起。B線程可以發送條件變量信号給A線程或其他多個線程。為了避免出現喚醒丢失問題,條件變量和互斥鎖常一起使用。

使用規則

pthread_cond_init LinuxThreads沒有實作條件變量的cond_attr屬性

pthread_cond_destroy 當有線程在該條件變量上等待,則銷毀失敗,傳回EBUSY

pthread_cond_signal如果沒有線程阻塞在條件變量上,調用本函數不會起任何作用

pthread_cond_broadcast會喚醒所有阻塞到本條件變量上的線程

pthread_cond_wait前需要由本線程加鎖,因為其本身包含解鎖和加鎖兩個動作。

/*************pthread_cond_wait()的使用方法**********/
pthread_mutex_lock(&mutex);    /*lock*/
pthread_cond_wait(&cond, &mutex);/*lock-->unlock-->wait() return-->lock*/
pthread_mutex_unlock(&mutex); /*unlock*/
/*****************************************************/
           

在調用pthread_cond_wait前先調用mutex_lock,線程獲得鎖,進入條件等待,當條件不成立的時候:

  • cond_wait函數先解鎖mutex,然後進入休眠等待信号激活;
  • 當有信号激活時,線程獲得互斥鎖,然後執行相關操作,最後釋放鎖;
  • 如果有其他線程阻塞在該條件變量上,則其他線程開始競争剛才釋放的鎖,得到鎖的線程開始執行。

下面示範兩個消費者一個生産者情況下,條件變量的用法。

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_cond

#include <glib.h>

GMutex *mutex;
GCond *cond;

gpointer consumer1(gpointer data)
{
    g_mutex_lock(mutex);
    g_print("consumer1 lock\n");

    g_cond_wait(cond, mutex);
    g_print("consumer1 get cond \n");

    g_print("consumer1 unlock\n");
    g_mutex_unlock(mutex);
}

gpointer consumer2(gpointer data)
{
    g_mutex_lock(mutex);
    g_print("consumer2 lock\n");

    g_cond_wait(cond, mutex);
    g_print("consumer2 get cond \n");

    g_print("consumer2 unlock\n");
    g_mutex_unlock(mutex);
}

gpointer producer(gpointer data)
{
    g_mutex_lock(mutex);
    g_print("producer lock\n");
    g_print("producer unlock\n");
    g_mutex_unlock(mutex);

    g_cond_broadcast(cond);  //通知所有等待條件變量的線程
    // g_cond_signal(cond);     //隻通知一個等待條件變量的線程
}

void test_thread_cond(void)
{
    GThread *thid1,*thid2,*thid3;

    g_print("\n");

    mutex = g_new0(GMutex, 1);
    g_mutex_init(mutex);

    cond = g_new0(GCond, 1);
    g_cond_init(cond);

    thid1 = g_thread_new("thid1", consumer1, NULL);
    thid2 = g_thread_new("thid2", consumer2, NULL);
    sleep(1);
    thid3 = g_thread_new("thid3", producer, NULL);

    g_thread_join(thid1);
    g_thread_join(thid2);
    g_thread_join(thid3);

    g_cond_clear(cond);
    g_mutex_clear(mutex);

    g_free(mutex);
    g_free(cond);
}


gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/cond", test_thread_cond);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_cond
/thread/cond:
consumer1 lock
consumer2 lock
producer lock
producer unlock
consumer1 get cond
consumer1 unlock
consumer2 get cond
consumer2 unlock
OK
[[email protected]_6 build]# ./glib_threads_cond
/thread/cond:
consumer2 lock
consumer1 lock
producer lock
producer unlock
consumer2 get cond
consumer2 unlock
consumer1 get cond
consumer1 unlock
OK
           

多次運作程式,會有以上兩種結果,消費者1或消費者2先執行,得到鎖,然後進入條件量等待狀态,另一個消費者也得到了鎖,進入條件量等待,先等待條件量的線程先得到條件量,然後解鎖并退出線程。

以消費者1先得到鎖為例,消費者1得到鎖後,并沒有解鎖,但消費者2卻得到了鎖,消費者2不解鎖的情況下,生産者得到了鎖,并可以解鎖,究其原因,是因為pthread_cond_wait函數本身有對鎖先解鎖後加鎖的操作,消費者1得到鎖後,發現條件量不滿足,就會先解鎖,等待其他線程執行。

g_cond_broadcast通知所有等待條件量的線程,g_cond_signal隻通知一個等待條件量的線程。上述程式,如果将g_cond_broadcast改為g_cond_signal,則隻會有一個線程執行結束,另一個線程由于得不到鎖,會一直處于等待條件量狀态。

線程私有資料

全局變量可以在所有線程被通路,但一個線程改變全局變量的值,其他線程通路該全局變量時,該值也會發生改變。

線程私有資料表面上看起來是一個全局變量,所有線程使用同樣的變量名通路它,但它在每個線程又是單獨存儲的,一個線程改變線程私有資料的值,不會影響到其他線程。

線程私有資料可以存儲任何類型的資料,但在所有線程存儲的資料類型需要一緻,不可以一個線程存儲的是整型,另一個線程存儲的是字元串類型。

線程私有資料的操作函數如下:

#define 	G_PRIVATE_INIT()
gpointer 	g_private_get ()
void 	g_private_set ()
void 	g_private_replace ()
           

下面舉例示範。

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_private

#include <glib.h>

GPrivate private = G_PRIVATE_INIT (g_free);

static gpointer _thread_1_func(gpointer data)
{
    gpointer *pri_val = NULL;
    pri_val = g_private_get(&private);
    g_print("pri_val(ori): %s \n", (gchar *)pri_val);

    g_private_set(&private, g_strdup("abcd"));
    pri_val = g_private_get(&private);
    g_print("pri_val(set): %s \n", (gchar *)pri_val);
}

static gpointer _thread_2_func(gpointer data)
{
    gpointer *pri_val = NULL;
    pri_val = g_private_get(&private);
    g_print("pri_val(ori): %s \n", (gchar *)pri_val);

    g_private_set(&private, g_strdup("123456"));
    pri_val = g_private_get(&private);
    g_print("pri_val(set): %s \n", (gchar *)pri_val);

    g_private_replace(&private, g_strdup("AABBC"));
    pri_val = g_private_get(&private);
    g_print("pri_val(replace): %s \n", (gchar *)pri_val);
}

void test_thread_private(void)
{
    GThread *th1,*th2;

    g_print("\n");

    th1 = g_thread_new("th1", _thread_1_func, NULL);
    th2 = g_thread_new("th2", _thread_2_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/private", test_thread_private);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]# ./glib_threads_private 
/thread/private: 
pri_val(ori): (null) 
pri_val(set): abcd 
pri_val(ori): (null) 
pri_val(set): 123456 
pri_val(replace): AABBC 
OK
           

一次性初始化

多線程程式有時有這樣的需求:不管建立了多少線程,有些初始化動作隻能發生一次。舉例來說,所有線程共享的互斥鎖,必須隻能初始化一次。如果由主線程來建立新線程,那隻需要在

主線程初始化即可,但對于庫函數,由于調用者在初次調用庫函數前可能已經建立了這些線程,就無法這樣處理,是以需要這樣的庫函數:無論首次為任何線程所調用,都會執行初始化動作。

可以通過g_once函數實作一次性初始化,其操作函數如下:

#define 	g_once()
gboolean 	g_once_init_enter ()
void 	g_once_init_leave ()
           

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_once

#include <glib.h>

gint global_count = 0;
GRWLock *global_rwlock = NULL;

static gpointer _thread_once_func(gpointer data)
{
    g_print("Will be called only once! \n");

    return GINT_TO_POINTER(9999);
}

static gint _thread_inner_init_func(void)
{
    static GOnce my_once = G_ONCE_INIT;

    g_once(&my_once, _thread_once_func, NULL);

    g_print("g_once retval: %d \n", GPOINTER_TO_INT(my_once.retval));

    return my_once.status;
}

static gpointer _thread_inc_func(gpointer data)
{
    gint i = 0;
    gint status = 0;

    status = _thread_inner_init_func();
    g_print("%s: status %d \n", __FUNCTION__, status);

    g_rw_lock_reader_lock(global_rwlock);
    global_count ++;
    g_rw_lock_reader_unlock(global_rwlock);
}

static gpointer _thread_dec_func(gpointer data)
{
    gint i = 0;
    gint status = 0;

    status = _thread_inner_init_func();
    g_print("%s: status %d \n", __FUNCTION__, status);

    g_rw_lock_reader_lock(global_rwlock);
    global_count --;
    g_rw_lock_reader_unlock(global_rwlock);
}

void test_thread_once(void)
{
    GThread *th1,*th2;

    global_rwlock = g_new0(GRWLock, 1);

    g_print("\n");

    g_rw_lock_init(global_rwlock);
    th1 = g_thread_new("th1", _thread_inc_func, NULL);
    th2 = g_thread_new("th2", _thread_dec_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
    g_rw_lock_clear(global_rwlock);
    g_free(global_rwlock);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/once", test_thread_once);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]./glib_threads_once 
/thread/once: 
Will be called only once! 
g_once retval: 9999 
_thread_dec_func: status 2 
g_once retval: 9999 
_thread_inc_func: status 2 
OK

           

一次性初始化時執行多個函數。

g_once隻能執行一個函數,如果需要執行多個函數,可以使用下面的函數。

如果把要執行的代碼封裝到一個函數,仍可以使用g_once。

g_once_init_enter
g_once_init_leave
           

示例代碼如下:

源碼見

glib_examples\glib_threads\glib_threads_once_section

#include <glib.h>

gint global_count = 0;
GRWLock *global_rwlock = NULL;

static gpointer _thread_once_section1_init(gpointer data)
{
    g_print("section1 init \n");
}

static gpointer _thread_once_section2_init(gpointer data)
{
    g_print("section2 init \n");
}

static gsize _thread_inner_init_func(void)
{
    static gsize initialization_value = 0;
    gsize setup_value = 0;
    
    if (g_once_init_enter (&initialization_value)) {

        // initialization code here
        _thread_once_section1_init(NULL);
        _thread_once_section2_init(NULL);

        g_once_init_leave (&initialization_value, 9999);
    }

    return initialization_value;
}

static gpointer _thread_inc_func(gpointer data)
{
    gint i = 0;
    gint status = 0;

    status = _thread_inner_init_func();
    g_print("%s: status %d \n", __FUNCTION__, status);

    g_rw_lock_reader_lock(global_rwlock);
    global_count ++;
    g_rw_lock_reader_unlock(global_rwlock);
}

static gpointer _thread_dec_func(gpointer data)
{
    gint i = 0;
    gint status = 0;

    status = _thread_inner_init_func();
    g_print("%s: status %d \n", __FUNCTION__, status);

    g_rw_lock_reader_lock(global_rwlock);
    global_count --;
    g_rw_lock_reader_unlock(global_rwlock);
}

void test_thread_once(void)
{
    GThread *th1,*th2;

    global_rwlock = g_new0(GRWLock, 1);

    g_print("\n");

    g_rw_lock_init(global_rwlock);
    th1 = g_thread_new("th1", _thread_inc_func, NULL);
    th2 = g_thread_new("th2", _thread_dec_func, NULL);
    g_thread_join(th1);
    g_thread_join(th2);
    g_rw_lock_clear(global_rwlock);
    g_free(global_rwlock);
}

gint main(gint argc, gchar **argv)
{
    g_test_init(&argc, &argv, NULL);

    g_test_add_func("/thread/once", test_thread_once);

    return g_test_run();
}
           

運作結果:

[[email protected]_6 build]
[[email protected]_6 build]# ./glib_threads_once_section 
/thread/once: 
section1 init 
section2 init 
_thread_dec_func: status 9999 
_thread_inc_func: status 9999 
OK
[[email protected]_6 build]# ./glib_threads_once_section 
/thread/once: 
section1 init 
section2 init 
_thread_inc_func: status 9999 
_thread_dec_func: status 9999 
OK
           

可以看到,無論inc還是dec線程哪個先運作,g_once_init_enter/g_once_init_leave中間的代碼段都隻會被調用一次。

繼續閱讀