天天看點

python gevent async_詳解python之協程gevent子產品

程序、線程、協程區分

我們通常所說的協程Coroutine其實是corporate routine的縮寫,直接翻譯為協同的例程,一般我們都簡稱為協程。

在linux系統中,線程就是輕量級的程序,而我們通常也把協程稱為輕量級的線程即微線程。

程序和協程

下面對比一下程序和協程的相同點和不同點:

相同點:

相同點存在于,當我們挂起一個執行流的時,我們要儲存的東西:

棧, 其實在你切換前你的局部變量,以及要函數的調用都需要儲存,否則都無法恢複

寄存器狀态,這個其實用于當你的執行流恢複後要做什麼

而寄存器和棧的結合就可以了解為上下文,上下文切換的了解:

CPU看上去像是在并發的執行多個程序,這是通過處理器在程序之間切換來實作的,作業系統實作這種交錯執行的機制稱為上下文切換

作業系統保持跟蹤程序運作所需的所有狀态資訊。這種狀态,就是上下文。

在任何一個時刻,作業系統都隻能執行一個程序代碼,當作業系統決定把控制權從目前程序轉移到某個新程序時,就會進行上下文切換,即儲存目前程序的上下文,恢複新程序的上下文,然後将控制權傳遞到新程序,新程序就會從它上次停止的地方開始。

不同點:

執行流的排程者不同,程序是核心排程,而協程是在使用者态排程,也就是說程序的上下文是在核心态儲存恢複的,而協程是在使用者态儲存恢複的,很顯然使用者态的代價更低

程序會被強占,而協程不會,也就是說協程如果不主動讓出CPU,那麼其他的協程,就沒有執行的機會。

對記憶體的占用不同,實際上協程可以隻需要4K的棧就足夠了,而程序占用的記憶體要大的多

從作業系統的角度講,多協程的程式是單程序,單協程

線程和協程

既然我們上面也說了,協程也被稱為微線程,下面對比一下協程和線程:

線程之間需要上下文切換成本相對協程來說是比較高的,尤其在開啟線程較多時,但協程的切換成本非常低。

同樣的線程的切換更多的是靠作業系統來控制,而協程的執行由我們自己控制。

協程隻是在單一的線程裡不同的協程之間切換,其實和線程很像,線程是在一個程序下,不同的線程之間做切換,這也可能是協程稱為微線程的原因吧。

Gevent子產品

Gevent是一種基于協程的Python網絡庫,它用到Greenlet提供的,封裝了libevent事件循環的高層同步API。它讓開發者在不改變程式設計習慣的同時,用同步的方式寫異步I/O的代碼。

簡單示例:

import gevent

def test1():

print 12

gevent.sleep(0)

print 34

def test2():

print 56

gevent.sleep(0)

print 78

gevent.joinall([

gevent.spawn(test1),

gevent.spawn(test2),

])

結果:

12

56

34

78

猴子更新檔 Monkey patching

這個更新檔是Gevent子產品最需要注意的問題,有了它,才會讓Gevent子產品發揮它的作用。我們往往使用Gevent是為了實作網絡通信的高并發,但是,Gevent直接修改标準庫裡面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等子產品,而變為協作式運作。但是我們無法保證你在複雜的生産環境中有哪些地方使用這些标準庫會由于打了更新檔而出現奇怪的問題。

一種方法是使用gevent下的socket子產品,我們可以通過”from gevent import socket”來導入。不過更常用的方法是使用猴子布丁(Monkey patching)。使用猴子更新檔褒貶不一,但是官網上還是建議使用”patch_all()”,而且在程式的第一行就執行。

from gevent import monkey; monkey.patch_socket()

import gevent

import socket

urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=5)

print [job.value for job in jobs]

上述代碼的第一行就是對socket标準庫打上猴子更新檔,此後socket标準庫中的類和方法都會被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協程的效率就真正展現出來了。Python中其它标準庫也存在阻塞的情況,gevent提供了”monkey.patch_all()”方法将所有标準庫都替換。

擷取協程狀态

started屬性/ready()方法:判斷協程是否已啟動。

successful()方法:判斷協程是否成功運作且沒有抛出異常。

value屬性:擷取協程執行完之後的傳回值。

另外,greenlet協程運作過程中發生的異常是不會被抛出到協程外的,是以需要用協程對象的”exception”屬性來擷取協程中的異常。

下面的例子很好的示範了各種方法和屬性的使用。

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

import gevent

def win():

return 'You win!'

def fail():

raise Exception('You failed!')

winner = gevent.spawn(win)

loser = gevent.spawn(fail)

print(winner.started) # True

print(loser.started) # True

# 在Greenlet中發生的異常,不會被抛到Greenlet外面。

# 控制台會打出Stacktrace,但程式不會停止

try:

gevent.joinall([winner, loser])

except Exception as e:

# 這段永遠不會被執行

print('This will never be reached')

print(winner.ready()) # True

print(loser.started) # True

print(winner.value) # 'You win!'

print(loser.value) # None

print('successful ',winner.successful()) # True

print('successful ',loser.successful()) # False

# 這裡可以通過raise loser.exception 或 loser.get()

# 來将協程中的異常抛出

print(loser.exception)

協程運作逾時控制

之前我們講過在”gevent.joinall()”方法中可以傳入timeout參數來設定逾時,我們也可以在全局範圍内設定逾時時間:

import gevent

from gevent import Timeout

timeout = Timeout(2) # 2 seconds

timeout.start()

def wait():

gevent.sleep(10)

try:

gevent.spawn(wait).join()

except Timeout:

print('Could not complete')

上例中,我們将逾時設為2秒,此後所有協程的運作,如果超過兩秒就會抛出”Timeout”異常。我們也可以将逾時設定在with語句内,這樣該設定隻在with語句塊中有效:

with Timeout(1):

gevent.sleep(10)

此外,我們可以指定逾時所抛出的異常,來替換預設的”Timeout”異常。比如下例中逾時就會抛出我們自定義的”TooLong”異常。

class TooLong(Exception):

pass

with Timeout(1, TooLong):

gevent.sleep(10)

協程間通信

事件(Event)對象

greenlet協程間的異步通訊可以使用事件(Event)對象。該對象的”wait()”方法可以阻塞目前協程,而”set()”方法可以喚醒之前阻塞的協程。在下面的例子中,5個waiter協程都會等待事件evt,當setter協程在3秒後設定evt事件,所有的waiter協程即被喚醒。

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

import gevent

from gevent.event import Event

evt = Event()

def setter():

print 'Wait for me'

gevent.sleep(3) # 3秒後喚醒所有在evt上等待的協程

print "Ok, I'm done"

evt.set() # 喚醒

def waiter():

print "I'll wait for you"

evt.wait() # 等待

print 'Finish waiting'

gevent.joinall([

gevent.spawn(setter),

gevent.spawn(waiter),

gevent.spawn(waiter),

gevent.spawn(waiter),

gevent.spawn(waiter),

gevent.spawn(waiter)

])

AsyncResult事件

除了Event事件外,gevent還提供了AsyncResult事件,它可以在喚醒時傳遞消息。讓我們将上例中的setter和waiter作如下改動:

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

from gevent.event import AsyncResult

aevt = AsyncResult()

def setter():

print 'Wait for me'

gevent.sleep(3) # 3秒後喚醒所有在evt上等待的協程

print "Ok, I'm done"

aevt.set('Hello!') # 喚醒,并傳遞消息

def waiter():

print("I'll wait for you")

message = aevt.get() # 等待,并在喚醒時擷取消息

print 'Got wake up message: %s' % message

隊列 Queue

隊列Queue的概念相信大家都知道,我們可以用它的put和get方法來存取隊列中的元素。gevent的隊列對象可以讓greenlet協程之間安全的通路。運作下面的程式,你會看到3個消費者會分别消費隊列中的産品,且消費過的産品不會被另一個消費者再取到:

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

import gevent

from gevent.queue import Queue

products = Queue()

def consumer(name):

#while not products.empty():

while True:

try:

print('%s got product %s' % (name, products.get_nowait()))

gevent.sleep(0)

except gevent.queue.Empty:

break

print('Quit')

def producer():

for i in range(1, 10):

products.put(i)

gevent.joinall([

gevent.spawn(producer),

gevent.spawn(consumer, 'steve'),

gevent.spawn(consumer, 'john'),

gevent.spawn(consumer, 'nancy'),

])

注意:協程隊列跟線程隊列是一樣的,put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。如果調用get方法時隊列為空,則是不會抛出”gevent.queue.Empty”異常。我們隻能使用get_nowait()的方式讓氣抛出異常。

信号量

信号量可以用來限制協程并發的個數。它有兩個方法,acquire和release。顧名思義,acquire就是擷取信号量,而release就是釋放。當所有信号量都已被擷取,那剩餘的協程就隻能等待任一協程釋放信号量後才能得以運作:

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

import gevent

from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker(n):

sem.acquire()

print('Worker %i acquired semaphore' % n)

gevent.sleep(0)

sem.release()

print('Worker %i released semaphore' % n)

gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])

上面的例子中,我們初始化了”BoundedSemaphore”信号量,并将其個數定為2。是以同一個時間,隻能有兩個worker協程被排程。程式運作後的結果如下:

Worker 0 acquired semaphore

Worker 1 acquired semaphore

Worker 0 released semaphore

Worker 1 released semaphore

Worker 2 acquired semaphore

Worker 3 acquired semaphore

Worker 2 released semaphore

Worker 3 released semaphore

Worker 4 acquired semaphore

Worker 4 released semaphore

Worker 5 acquired semaphore

Worker 5 released semaphore

如果信号量個數為1,那就等同于同步鎖。

協程本地變量

同線程類似,協程也有本地變量,也就是隻在目前協程内可被通路的變量:

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

import gevent

from gevent.local import local

data = local()

def f1():

data.x = 1

print data.x

def f2():

try:

print data.x

except AttributeError:

print 'x is not visible'

gevent.joinall([

gevent.spawn(f1),

gevent.spawn(f2)

])

通過将變量存放在local對象中,即可将其的作用域限制在目前協程内,當其他協程要通路該變量時,就會抛出異常。不同協程間可以有重名的本地變量,而且互相不影響。因為協程本地變量的實作,就是将其存放在以的”greenlet.getcurrent()”的傳回為鍵值的私有的命名空間内。

多并發socket模型

伺服器端:

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

import socket

import gevent

from gevent import socket, monkey

monkey.patch_all()

def server(port):

s = socket.socket()

s.bind(('0.0.0.0', port))

s.listen(500)

while True:

cli, addr = s.accept()

gevent.spawn(handle_request, cli)

def handle_request(conn):

try:

while True:

data = conn.recv(1024)

print("recv:", data)

conn.send(data)

if not data:

conn.shutdown(socket.SHUT_WR)

except Exception as ex:

print(ex)

finally:

conn.close()

if __name__ == '__main__':

server(8001)

當用戶端連接配接上伺服器端時,伺服器端通過開辟一個協程與該用戶端完成互動任務,同時由于使用了Gevent協程的方式,在每個用戶端與伺服器互動時,并不會影響到伺服器端的工作。

用戶端:

#!/usr/bin/env python

# _*_ coding utf-8 _*_

#Author: aaron

import socket

HOST = 'localhost' # The remote host

PORT = 8001 # The same port as used by the server

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.connect((HOST, PORT))

while True:

msg = bytes(input(">>:"), encoding="utf8")

s.sendall(msg)

data = s.recv(1024)

# print(data)

print('Received', repr(data)) # repr 格式化輸出

s.close()

以上就是本文的全部内容,希望對大家的學習有所幫助,也希望大家多多支援腳本之家。