筆者在上周參加阿裡雲開發者大會時,特别注意到一個現象就是Serverless這個概念被反複提及,其受關注程度提升明顯,筆者仔細看了一下,Serverless的核心理念就是函數式計算,開發者不需要再關注具體的子產品,雲上部署的粒度變成了程式函數,自動伸縮、擴容等工作完全由雲服務負責,能夠想象Serverless必将在未來引領時代潮流。
Serverless Computing,即”無伺服器計算”,其實這一概念在剛剛提出的時候并沒有獲得太多的關注,直到2014年AWS Lambda這一裡程碑式的産品出現。Serverless算是正式走進了雲計算的舞台。2018年5月,Google在KubeCon+CloudNative 2018期間開源了gVisor容器沙箱運作時并分享了它的設計理念和原則。随後2018年的Google Next大會上Google推出了自己的 Google Serverless平台 —— gVisor。同年AWS又放了顆大炮仗-Firecracker,這是一款基于Rust語言編寫的安全沙箱基礎元件,用于函數計算服務Lambda和托管的容器服務。而值得注意的是Google也并沒有死守自己一手締造的Go語言平台,而是選擇了Go與Rust的模式,據說Google在Rust方面也開始招兵買馬,也要用Rust重寫之前基于Go編寫的Serverless平台。
筆者寫本文的初衷,其實就是要回答為什麼在這個高并發大行其道的時代,以性能著稱的C語言和以安全高效聞名的Java都不香了呢?
高并發模式初探
在這個高并發時代最重要的設計模式無疑是生産者、消費者模式,比如著名的消息隊列kafka其實就是一個生産者消費者模式的典型實作。其實生産者消費者問題,也就是有限緩沖問題,可以用以下場景進行簡要描述,生産者生成一定量的産品放到庫房,并不斷重複此過程;與此同時,消費者也在緩沖區消耗這些資料,但由于庫房大小有限,是以生産者和消費者之間步調協調,生産者不會在庫房滿的情況放入端口,消費者也不會在庫房空時消耗資料。詳見下圖:

而如果在生産者與消費者之間完美協調并保持高效,這就是高并發要解決的本質問題。
C語言的高并發案例
筆者在前文
《這位創造了Github冠軍項目的老男人,堪稱10倍程式員本尊》曾經介紹過TDEngine的相關代碼,其中Sheduler子產品的相關排程算法就使用了生産、消費者模式進行消息傳遞功能的實作,也就是有多個生産者(producer)生成并不斷向隊列中傳遞消息,也有多個消費者(consumer)不斷從隊列中取消息。
後面我們也會說明類型功能在Go、Java等進階語言中類似的功能已經被封裝好了,但是在C語言中你就必須要用好互斥體( mutex)和信号量(semaphore)并協調他們之間的關系。由于C語言的實作是最複雜的,先來看結構體設計和他的注釋:
再來看Shceduler初始化函數,這裡需要特别說明的是,兩個信号量的建立,其中emptySem是隊列的可寫狀态,初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度,fullSem是隊列的可讀狀态,初始化時其值為0,即初始時隊列不可讀。具體代碼及我的注釋如下:
void *taosInitScheduler(int queueSize, int numOfThreads, char *label)
{
pthread_attr_t
attr;
SSchedQueue * pSched = (SSchedQueue *)
malloc
(
sizeof
(SSchedQueue));
memset
(pSched,
,
sizeof
(SSchedQueue));
pSched->queueSize = queueSize;
pSched->numOfThreads = numOfThreads;
strcpy
(pSched->label, label);
if
(pthread_mutex_init(&pSched->queueMutex,
NULL
) <
) {
pError(
"init %s:queueMutex failed, reason:%s"
, pSched->label, strerror(errno));
goto
_error;
}
//emptySem是隊列的可寫狀态,初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度。
if
(sem_init(&pSched->emptySem,
, (
unsigned
int
)pSched->queueSize) !=
) {
pError(
"init %s:empty semaphore failed, reason:%s"
, pSched->label, strerror(errno));
goto
_error;
}
//fullSem是隊列的可讀狀态,初始化時其值為0,即初始時隊列不可讀
if
(sem_init(&pSched->fullSem,
,
) !=
) {
pError(
"init %s:full semaphore failed, reason:%s"
, pSched->label, strerror(errno));
goto
_error;
}
if
((pSched->
queue
= (SSchedMsg *)
malloc
((
size_t
)pSched->queueSize *
sizeof
(SSchedMsg))) ==
NULL
) {
pError(
"%s: no enough memory for queue, reason:%s"
, pSched->label, strerror(errno));
goto
_error;
}
memset
(pSched->
queue
,
, (
size_t
)pSched->queueSize *
sizeof
(SSchedMsg));
pSched->fullSlot =
;
//實始化時隊列為空,故隊頭和隊尾的位置都是0
pSched->emptySlot =
;
//實始化時隊列為空,故隊頭和隊尾的位置都是0
pSched->qthread =
malloc
(
sizeof
(
pthread_t
) * (
size_t
)pSched->numOfThreads);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for
(
int
i =
; i < pSched->numOfThreads; ++i) {
if
(pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (
void
*)pSched) !=
) {
pError(
"%s: failed to create rpc thread, reason:%s"
, pSched->label, strerror(errno));
goto
_error;
}
}
pTrace(
"%s scheduler is initialized, numOfThreads:%d"
, pSched->label, pSched->numOfThreads);
return
(
void
*)pSched;
_error:
taosCleanUpScheduler(pSched);
return
NULL
;
}
再來看讀消息的taosProcessSchedQueue函數這其實是消費者一方的實作,這個函數的主要邏輯是
1.使用無限循環,隻要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續向下處理
2.在操作msg前,加入互斥體防止msg被誤用。
3.讀操作完畢後修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取餘。同時退出互斥體。
4.對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息後,emptySem的值為6,即可寫狀态,且能接受的消息數量為6
具體代碼及注釋如下:
void *taosProcessSchedQueue(void *param)
{
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
//注意這裡是個無限循環,隻要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續處理
while
(
1
) {
if
(sem_wait(&pSched->fullSem) !=
) {
pError(
"wait %s fullSem failed, errno:%d, reason:%s"
, pSched->label, errno, strerror(errno));
if
(errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
continue
;
}
}
//加入互斥體防止msg被誤用。
if
(pthread_mutex_lock(&pSched->queueMutex) !=
)
pError(
"lock %s queueMutex failed, reason:%s"
, pSched->label, strerror(errno));
msg = pSched->
queue
[pSched->fullSlot];
memset
(pSched->
queue
+ pSched->fullSlot,
,
sizeof
(SSchedMsg));
//讀取完畢修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取餘。
pSched->fullSlot = (pSched->fullSlot +
1
) % pSched->queueSize;
//讀取完畢修改退出互斥體
if
(pthread_mutex_unlock(&pSched->queueMutex) !=
)
pError(
"unlock %s queueMutex failed, reason:%s\n"
, pSched->label, strerror(errno));
//讀取完畢對emptySem進行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息後,emptySem的值為6,即可寫狀态,且能接受的消息數量為6
if
(sem_post(&pSched->emptySem) !=
)
pError(
"post %s emptySem failed, reason:%s\n"
, pSched->label, strerror(errno));
if
(msg.fp)
(*(msg.fp))(&msg);
else
if
(msg.tfp)
(*(msg.tfp))(msg.ahandle, msg.thandle);
}
}
最後寫消息的taosScheduleTask函數也就是生産的實作,其基本邏輯是
1.寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那麼減1後為0,也就是隊列已滿,必須在讀取消息後,即emptySem進行post操作後,隊列才能進行可寫狀态。
2.加入互斥體防止msg被誤操作,寫入完成後退出互斥體
3.寫隊列完成後對fullSem進行加1操作,如fullSem原值為0,那麼加1後為1,也就是隊列可讀,咱們上面介紹的讀取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就繼續向下。
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg)
{
SSchedQueue *pSched = (SSchedQueue *)qhandle;
if
(pSched ==
NULL
) {
pError(
"sched is not ready, msg:%p is dropped"
, pMsg);
return
;
}
//在寫隊列前先對emptySem進行減1操作,如emptySem原值為1,那麼減1後為0,也就是隊列已滿,必須在讀取消息後,即emptySem進行post操作後,隊列才能進行可寫狀态。
if
(sem_wait(&pSched->emptySem) !=
) pError(
"wait %s emptySem failed, reason:%s"
, pSched->label, strerror(errno));
//加入互斥體防止msg被誤操作
if
(pthread_mutex_lock(&pSched->queueMutex) !=
)
pError(
"lock %s queueMutex failed, reason:%s"
, pSched->label, strerror(errno));
pSched->
queue
[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot +
1
) % pSched->queueSize;
if
(pthread_mutex_unlock(&pSched->queueMutex) !=
)
pError(
"unlock %s queueMutex failed, reason:%s"
, pSched->label, strerror(errno));
//在寫隊列前先對fullSem進行加1操作,如fullSem原值為0,那麼加1後為1,也就是隊列可讀,咱們上面介紹的讀取函數可以進行處理。
if
(sem_post(&pSched->fullSem) !=
) pError(
"post %s fullSem failed, reason:%s"
, pSched->label, strerror(errno));
return
;
}
Java的高并發實作
從并發模型來看,Go和Rust都有channel這個概念,也都是通過Channel來實作線(協)程間的同步,由于channel帶有讀寫狀态且保證資料順序,而且channel的封裝程度和效率明顯可以做的更高,是以Go和Rust官方都會建議使用channel(通信)來共享記憶體,而不是使用共享記憶體來通信。
為了讓幫助大家找到差別,我們先以Java為例來,看一下沒有channel的進階語言Java,生産者消費者該如何實作,代碼及注釋如下:
public
class Storage
{
// 倉庫最大存儲量
private
final
int
MAX_SIZE =
10
;
// 倉庫存儲的載體
private
LinkedList<Object> list =
new
LinkedList<Object>();
// 鎖
private
final
Lock lock =
new
ReentrantLock();
// 倉庫滿的信号量
private
final
Condition full = lock.newCondition();
// 倉庫空的信号量
private
final
Condition empty = lock.newCondition();
public void produce()
{
// 獲得鎖
lock.lock();
while
(list.size() +
1
> MAX_SIZE) {
System.out.println(
"【生産者"
+ Thread.currentThread().getName()
+
"】倉庫已滿"
);
try
{
full.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
list.add(
new
Object());
System.out.println(
"【生産者"
+ Thread.currentThread().getName()
+
"】生産一個産品,現庫存"
+ list.size());
empty.signalAll();
lock.unlock();
}
public void consume()
{
// 獲得鎖
lock.lock();
while
(list.size() ==
) {
System.out.println(
"【消費者"
+ Thread.currentThread().getName()
+
"】倉庫為空"
);
try
{
empty.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println(
"【消費者"
+ Thread.currentThread().getName()
+
"】消費一個産品,現庫存"
+ list.size());
full.signalAll();
lock.unlock();
}
}
在Java、C#這種面向對象,但是沒有channel語言中,生産者、消費者模式至少要借助一個lock和兩個信号量共同完成。其中鎖的作用是保證同是時間,倉庫中隻有一個使用者進行資料的修改,而還需要表示倉庫滿的信号量,一旦達到倉庫滿的情況則将此信号量置為阻塞狀态,進而阻止其它生産者再向倉庫運商品了,反之倉庫空的信号量也是一樣,一旦倉庫空了,也要阻其它消費者再前來消費了。
Go的高并發實作
我們剛剛也介紹過了Go語言中官方推薦使用channel來實作協程間通信,是以不需要再添加lock和信号量就能實作模式了,以下代碼中我們通過子goroutine完成了生産者的功能,在在另一個子goroutine中實作了消費者的功能,注意要阻塞主goroutine以確定子goroutine能夠執行,進而輕而易舉的就這完成了生産者消費者模式。下面我們就通過具體實踐中來看一下生産者消費者模型的實作。
package
main
import
(
"fmt"
"time"
)
func Product(ch chan<- int)
{
//生産者
for
i :=
; i <
3
; i++ {
fmt.Println(
"Product produceed"
, i)
ch <- i
//由于channel是goroutine安全的,是以此處沒有必要必須加鎖或者加lock操作.
}
}
func Consumer(ch <-chan int)
{
for
i :=
; i <
3
; i++ {
j := <-ch
//由于channel是goroutine安全的,是以此處沒有必要必須加鎖或者加lock操作.
fmt.Println(
"Consmuer consumed "
, j)
}
}
func main()
{
ch :=
make
(
chan
int
)
go
Product(ch)
//注意生産者與消費者放在不同goroutine中
go
Consumer(ch)
//注意生産者與消費者放在不同goroutine中
time.Sleep(time.Second *
1
)
//防止主goroutine退出
/*運作結果并不确定,可能為
Product produceed 0
Product produceed 1
Consmuer consumed 0
Consmuer consumed 1
Product produceed 2
Consmuer consumed 2
*/
}
可以看到和Java比起來使用GO來實作并發式的生産者消費者模式的确是更為清爽了。
Rust的高并發實作
不得不說Rust的難度實在太高了,雖然筆者之前在彙編、C、Java等方面的經驗可以幫助我快速掌握Go語言。但是假期看了兩天Rust真想大呼告辭,這尼瑪也太勸退了。在Rust官方提供的功能中,其實并不包括多生産者、多消費者的channel,std:sync空間下隻有一個多生産者單消費者(mpsc)的channel。其樣例實作如下:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
let tx2 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1"),
String::from("3"),
String::from("5"),
String::from("7"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("11"),
String::from("13"),
String::from("15"),
String::from("17"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("21"),
String::from("23"),
String::from("25"),
String::from("27"),
];
for val in vals {
tx2.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for rec in rx {
println!("Got: {}", rec);
}
}
可以看到在Rust下實作生産者消費者是不難的,但是生産者可以clone多個,不過消費者卻隻能有一個,究其原因是因為Rust下沒有GC也就是垃圾回收功能,而想保證安全Rust就必須要對于變更使用權限進行嚴格管理。在Rust下使用move關鍵字進行變更的所有權轉移,但是按照Rust對于變更生産周期的管理規定,線程間權限轉移的所有權接收者在同一時間隻能有一個,這也是Rust官方隻提供MPSC的原因,
use std::thread;
fn main() {
let s = "hello";
let handle = thread::spawn(move || {
println!("{}", s);
});
handle.join().unwrap();
}
當然Rust下有一個API比較貼心就是join,他可以所有子線程都執行結束再退出主線程,這比Go中要手工阻塞還是要有一定的提高。而如果你想用多生産者、多消費者的功能,就要入手crossbeam子產品了,這個子產品掌握起來難度也真的不低。
總結
通過上面的比較我們可以用一張表格來說明幾種主流語言的情況對比:
語言 | 安全性 | 運作速度 | 程序啟動速度 | 學習難度 |
C | 低 | 極快 | 困難 | |
Java | 高 | 一般 | ||
Go | 較快 | |||
Rust | 極快(基本比肩C) | 極困難 |
可以看到Rust以其高安全性、基本比肩C的運作及啟動速度必将在Serverless的時代獨占鳌頭,Go基本也能緊随其後,而C語言程式中難以避免的野指針,Java相對較低的運作及啟動速度,可能都不太适用于函數式運算的場景,Java在企業級開發的時代打敗各種C#之類的對手,但是在雲時代好像還真沒有之前統治力那麼強了,真可謂是打敗你的往往不是你的對手,而是其它空間的降維打擊。