第七部分:子產品
"""
作者:川川
時間:2021/7/27
"""
'''用文本編輯器在目前目錄下建立 fibo.py 檔案,輸入以下内容'''
#這部分單獨建立檔案!在這展示fibo内容隻是便于好看
# def fib(n): # write Fibonacci series up to n
# a, b = 0, 1
# while a < n:
# print(a, end=' ')
# a, b = b, a+b
# print()
#
# def fib2(n): # return Fibonacci series up to n
# result = []
# a, b = 0, 1
# while a < n:
# result.append(a)
# a, b = b, a+b
# return result
'''這項操作不直接把 fibo 函數定義的名稱導入到目前符号表,隻導入子產品名 fibo 。要使用子產品名通路函數'''
# import fibo
# print(fibo.fib(1000))
'''如果經常使用某個函數,可以把它指派給局部變量'''
# a = fibo.fib
# print(a(400))
'''6.1. 子產品詳解'''
'''import 語句有一個變體,可以直接把子產品裡的名稱導入到另一個子產品的符号表'''
# from fibo import fib, fib2
# print(fib(500))
'''還有一種變體可以導入子產品内定義的所有名稱 不建議從子產品或包内導入 *, 因為,這項操作經常讓代碼變得難以了解。'''
# from fibo import *
# print(fib(500))
'''子產品名後使用 as 時,直接把 as 後的名稱與導入子產品綁定。意思就是改變名稱使用,'''
# import fibo as fib
# print(fib.fib(500))
'''from 中也可以使用這種方式'''
# from fibo import fib as fibonacci
# print(fibonacci(500))
第八部分:深層輸入輸出
"""
作者:川川
時間:2021/7/27
"""
'''在字元串開頭的引号/三引号前添加 f 或 F 。在這種字元串中,可以在 { 和 } 字元之間輸入引用的變量'''
# year = 2021
# event = 'Referendum'
# a=f'Results of the {year} {event}'
# print(a)
'''str.format() 該方法也用 { 和 } 标記替換變量的位置a 這種方法支援詳細的格式化指令'''
# yes_votes = 42_572_654
# no_votes = 43_132_495
# percentage = yes_votes / (yes_votes + no_votes)
# a='{:-5} YES votes {:1.1%}'.format(yes_votes, percentage)#調整{}内部感受下
# print(a)
'''隻想快速顯示變量進行調試,可以用 repr() 或 str() 函數把值轉化為字元串。'''
# s = 'Hello, world.'
# print(str(s))#str() 函數傳回供人閱讀的值
# print(repr(s))#repr() 則生成适于解釋器讀取的值
# print(str(1/7))
# hellos = repr('hello')
# print(hellos)
'''7.1.1. 格式化字元串字面值'''
'''格式化字元串字面值 (簡稱為 f-字元串)在字元串前加字首 f 或 F,通過 {expression} 表達式,把 Python 表達式的值添加到字元串内'''
'''下例将 pi 舍入到小數點後三位'''
# import math
# print(f'The value of pi is approximately {math.pi:.3f}.')
'''在 ':' 後傳遞整數,為該字段設定最小字元寬度,常用于列對齊'''
# table = {'Sjoerd': 4127, 'Jack': 4098, 'Dcab': 7678}
# for name, phone in table.items():
# print(f'{name:10} ==> {phone:10d}')
'''7.1.2. 字元串 format() 方法'''
# print('We are the {} who say "{}!"'.format('knights', 'Ni'))
'''花括号及之内的字元(稱為格式字段)被替換為傳遞給 str.format() 方法的對象。花括号中的數字表示傳遞給 str.format() 方法的對象所在的位置。'''
# print('{0} and {1}'.format('spam', 'eggs'))
# print('{1} and {0}'.format('spam', 'eggs'))
'''使用關鍵字參數名引用值。'''
# print('This {food} is {adjective}.'.format(food='spam', adjective='absolutely horrible'))
'''位置參數和關鍵字參數可以任意組合'''
# print('The story of {0}, {1}, and {other}.'.format('Bill', 'Manfred',
# other='Georg'))
'''用方括号 '[]' 通路鍵來完成'''
# table = {'Sjoerd': 4127, 'Jack': 4098, 'Dcab': 8637678}
# print('Jack: {0[Jack]:d}; Sjoerd: {0[Sjoerd]:d}; ''Dcab: {0[Dcab]:d}'.format(table))
'''也可以用 '**' 符号,把 table 當作傳遞的關鍵字參數。'''
# print('Jack: {Jack:d}; Sjoerd: {Sjoerd:d}; Dcab: {Dcab:d}'.format(**table))
'''生成一組整齊的列,包含給定整數及其平方與立方'''
# for x in range(1, 11):
# print('{0:2d} {1:3d} {2:4d}'.format(x, x * x, x * x * x))
'''7.1.3. 手動格式化字元串'''
# for x in range(1, 11):
# print(repr(x).rjust(2), repr(x * x).rjust(3), end=' ')
# print(repr(x * x * x).rjust(4))
'''7.1.4. 舊式字元串格式化方法'''
# import math
# print('The value of pi is approximately %5.3f.' % math.pi)
'''7.2. 讀寫檔案¶'''
'''最常用的參數有兩個: open(filename, mode)'''
# f = open('workfile', 'w')
'''
第一個實參是檔案名字元串第二個實參是包含描述檔案使用方式字元的字元串。
mode 的值包括 'r' ,表示檔案隻能讀取;'w' 表示隻能寫入(現有同名檔案會被覆寫);
'a' 表示打開檔案并追加内容,任何寫入的資料會自動添加到檔案末尾。'r+' 表示打開檔案進行讀寫。
mode 實參是可選的,省略時的預設值為 'r'。
'''
# with open('workfile') as f:
# read_data = f.read()
# print(read_data)
# f.close()#如果沒有使用 with 關鍵字,則應調用 f.close() 關閉檔案,即可釋放檔案占用的系統資源。
# with open('workfile') as f:
# a=f.read()
# print(a)
# f.close()
'''f.readline() 從檔案中讀取單行資料'''
# with open('workfile') as f:
# a=f.readline()
# b=f.readline()
# c=f.readline()
# print(a,b,c)
# for i in f:
# print(i)
# f.close()
'''從檔案中讀取多行時,可以用循環周遊整個檔案對象'''
# with open('workfile') as f:
# for line in f:
# print(line, end='')
# f.close()
'''f.write(string) 把 string 的内容寫入檔案,并傳回寫入的字元數。'''
# with open('workfile','w') as f:
# f.write('This is a test\n')
# f.close()
'''寫入其他類型的對象前,要先把它們轉化為字元串(文本模式)或位元組對象(二進制模式)'''
# with open('workfile','a') as f:
# value = ('the answer', 42)
# s = str(value)
# f.write(s)
# f.close()
# f = open('workfile', 'rb+')
# f.write(b'0123456789abcdef')
# print(f.read())
# print(f.seek(5))
# print(f.read(1))
'''7.2.2. 使用 json 儲存結構化資料'''
# import json
# a=json.dumps([1, 'simple', 'list'])
# print(a)
'''dumps() 函數還有一個變體, dump() ,它隻将對象序列化為 text file '''
#如果 f 是 text file 對象
# json.dump(x, f)
#要再次解碼對象,如果 f 是已打開、供讀取的 text file 對象
# x = json.load(f)
第九部分:異常和錯誤
"""
作者:川川
時間:2021/7/27
"""
'''處理異常搭配:try except'''
'''如果沒有異常發生,則跳過 except 子句 并完成 try 語句的執行。'''
# while True:
# try:
# x = int(input("Please enter a number: "))
# break
# except ValueError:
# print("Oops! That was no valid number. Try again...")
'''一個 try 語句可能有多個 except 子句,以指定不同異常的處理程式。'''
# class B(Exception):
# pass
#
# class C(B):
# pass
#
# class D(C):
# pass
#
# for cls in [B, C, D]:
# try:
# raise cls()
# except D:
# print("D")
# except C:
# print("C")
# except B:
# print("B")
'''最後的 except 子句可以省略異常名,以用作通配符'''
# try:
# f = open('workfile')
# s = f.readline()
# i = int(s.strip())
# except OSError as err:
# print("OS error: {0}".format(err))
# except ValueError:
# print("Could not convert data to an integer.")
# except:
# print("Unexpected error:", sys.exc_info()[0])
# raise
'''try ... except 語句有一個可選的 else 子句,在使用時必須放在所有的 except 子句後面'''
# import sys
# for arg in sys.argv[1:]:
# try:
# f = open(arg, 'r')
# except OSError:
# print('cannot open', arg)
# else:
# print(arg, 'has', len(f.readlines()), 'lines')
# f.close()
'''處理 try 子句中調用(即使是間接地)的函數内部發生的異常。'''
# def this_fails():
# x = 1 / 0
# try:
# this_fails()
# except ZeroDivisionError as err:
# print('Handling run-time error:', err)
'''raise 語句支援強制觸發指定的異常'''
# try:
# raise NameError('HiThere')
# except NameError:
# print('An exception flew by!')
# raise
'''異常鍊'''
# def func():
# raise IOError
# try:
# func()
# except IOError as exc:
# raise RuntimeError('Failed to open database') from exc
'''異常鍊在 except 或 finally 子句觸發異常時自動生成'''
# try:
# open('database.sqlite')
# except IOError:
# raise RuntimeError from None
'''定義清理操作'''
# try:
# raise KeyboardInterrupt
# finally:
# print('Goodbye, world!')
'''一個更為複雜的例子'''
def divide(x, y):
try:
result = x / y
except ZeroDivisionError:
print("division by zero!")
else:
print("result is", result)
finally:
print("executing finally clause")
print(divide(2,1))
第十部分:類
"""
作者:川川
時間:2021/7/27
"""
# class Complex:
# def __init__(self, realpart, imagpart):
# self.r = realpart
# self.i = imagpart
# x = Complex(3.0, -4.5)
# print(x.r,x.i)
'''執行個體對象¶'''
# x.counter = 1
# while x.counter < 10:
# x.counter = x.counter * 2
# print(x.counter)
# del x.counter
'''9.3.4. 方法對象'''
''''通常,方法在綁定後立即被調用'''
# class MyClass:
# """A simple example class"""
# i = 12345
#
# def f(self):
# return 'hello world'
# x=MyClass()
# xf = x.f
# while True:
# print(xf())
'''9.3.5. 類和執行個體變量'''
# class Dog:
# kind = 'canine'
# def __init__(self, name):
# self.name = name
# d = Dog('Fido')
# e = Dog('Buddy')
# print(d.name)
# print(e.name)
# print(d.kind)
''''正确的類設計應該使用執行個體變量:'''
# class Dog:
# def __init__(self, name):
# self.name = name
# self.tricks = [] # creates a new empty list for each dog
#
# def add_trick(self, trick):
# self.tricks.append(trick)
# d = Dog('Fido')
# e = Dog('Buddy')
# d.add_trick('roll over')
# e.add_trick('play dead')
# e.add_trick('play dead')
# # print(d.tricks)
# print(e.tricks)
''''疊代器'''
# for element in [1, 2, 3]:
# print(element)
# for element in (1, 2, 3):
# print(element)
# for key in {'one':1, 'two':2}:
# print(key)
# for char in "123":
# print(char)
# for line in open("workfile"):
# print(line, end='')
'''生成器'''
# def reverse(data):
# for index in range(len(data)-1, -1, -1):
# yield data[index]
#
# for char in reverse('golf'):
# print(char)
'''生成器表達式'''
# a=sum(i*i for i in range(10))
# print(a)
# xvec = [10, 20, 30]
# yvec = [7, 5, 3]
# c=sum(x*y for x,y in zip(xvec, yvec))
# print(c)
第十一部分:标準庫簡介
"""
作者:川川
時間:2021/7/27
"""
'''作業系統接口(個人感覺沒啥用)'''
# import os
# print(os.getcwd())#列印出目前檔案位置
# os.chdir('/server/accesslogs')#改變運作位置
# os.system('mkdir today') #運作這個再系統shell
'''檔案通配符'''
'''glob 子產品提供了一個在目錄中使用通配符搜尋建立檔案清單的函數:'''
# import glob
# print(glob.glob('*.py'))
'''指令行參數'''
# import sys
# print(sys.argv)#列印本檔案位置
''' 字元串模式比對'''
# import re
# b=re.findall(r'\bf[a-z]*', 'which foot or hand fell fastest')
# print(b)
# a='tea for too'.replace('too', 'two')
# print(a)
'''數學'''
# import math
# a=math.cos(math.pi / 4)
# print(a)
# import random
# b=random.choice(['apple', 'pear', 'banana'])
# print(b)
'''網際網路通路'''
# from urllib.request import urlopen
# with urlopen('http://tycho.usno.navy.mil/cgi-bin/timer.pl') as response:
# for line in response:
# line = line.decode('utf-8') # Decoding the binary data to text.
# if 'EST' in line or 'EDT' in line:
# print(line)
# smtplib 用于發送郵件
# import smtplib
# server = smtplib.SMTP('localhost')
# server.sendmail('[email protected]', '[email protected]')
# server.quit()
''' 日期和時間'''
# from datetime import date
# now = date.today()
# print(now)
# a=now.strftime("%m-%d-%y. %d %b %Y is a %A on the %d day of %B.")
# print(a)
# birthday = date(2000, 9, 20)
# age = now - birthday
# c=age.days
# print(c)
'''資料壓縮'''
# import zlib
# s = b'witch which has which witches wrist watch'
# print(len(s))
# t = zlib.compress(s)
# print(len(t))
# print( zlib.decompress(t))
# print(zlib.crc32(s))
'''性能測量'''
# from timeit import Timer
# a= Timer('t=a; a=b; b=t', 'a=1; b=2').timeit()
# print(a)
第十二部分:協程
"""
作者:川川
時間:2021/7/27
"""
# import asyncio
# async def main():
# print('Hello ...')
# await asyncio.sleep(1)
# print('... World!')
# asyncio.run(main())
'''等待 1 秒後列印 "hello",然後 再次 等待 2 秒後列印 "world"'''
# import asyncio
# import time
#
# async def say_after(delay, what):
# await asyncio.sleep(delay)
# print(what)
# async def main():
# print(f"started at {time.strftime('%X')}")
# await say_after(1, 'hello')
# await say_after(2, 'world')
# print(f"finished at {time.strftime('%X')}")
# asyncio.run(main())
'''asyncio.create_task() 函數用來并發運作作為 asyncio 任務 的多個協程。'''
# async def main():
# task1 = asyncio.create_task(
# say_after(1, 'hello'))
# task2 = asyncio.create_task(
# say_after(2, 'world'))
# print(f"started at {time.strftime('%X')}")
# # Wait until both tasks are completed (should take
# # around 2 seconds.)
# await task1
# await task2
#
# print(f"finished at {time.strftime('%X')}")
# asyncio.run(main())
'''Python 協程屬于 可等待 對象,是以可以在其他協程中被等待'''
# import asyncio
#
# async def nested():
# return 42
#
# async def main():
# # Nothing happens if we just call "nested()".
# # A coroutine object is created but not awaited,
# # so it *won't run at all*.
# # Let's do it differently now and await it:
# print(await nested()) # will print "42".
#
# asyncio.run(main())
'''
協程函數: 定義形式為 async def 的函數;
協程對象: 調用 協程函數 所傳回的對象。
'''
'''當一個協程通過 asyncio.create_task() 等函數被封裝為一個 任務,該協程會被自動排程執行'''
# import asyncio
#
# async def nested():
# return 42
#
# async def main():
# # Schedule nested() to run soon concurrently
# # with "main()".
# task = asyncio.create_task(nested())
#
# # "task" can now be used to cancel "nested()", or
# # can simply be awaited to wait until it is complete:
# await task
# asyncio.run(main())
'''運作 asyncio 程式'''
'''asyncio.run(coro, *, debug=False)¶'''
# import asyncio
# async def main():
# await asyncio.sleep(1)
# print('hello')
#
# asyncio.run(main())
'''建立任務'''
'''asyncio.create_task(coro, *, name=None)¶'''
# import asyncio
# async def coro():
# return 2021
# task = asyncio.create_task(coro())#python3.7+
# This works in all Python versions but is less readable
# task = asyncio.ensure_future(coro())#before python3.7
'''休眠'''
''' asyncio.sleep(delay, result=None, *, loop=None)¶'''
'''以下協程示例運作 5 秒,每秒顯示一次目前日期'''
# import asyncio
# import datetime
#
# async def display_date():
# loop = asyncio.get_running_loop()
# end_time = loop.time() + 5.0
# while True:
# print(datetime.datetime.now())
# if (loop.time() + 1.0) >= end_time:
# break
# await asyncio.sleep(1)
#
# asyncio.run(display_date())
'''并發運作任務'''
''' asyncio.gather(*aws, loop=None, return_exceptions=False)¶'''
# import asyncio
#
# async def factorial(name, number):
# f = 1
# for i in range(2, number + 1):
# print(f"Task {name}: Compute factorial({i})...")
# await asyncio.sleep(1)
# f *= i
# print(f"Task {name}: factorial({number}) = {f}")
#
# async def main():
# # Schedule three calls *concurrently*:
# await asyncio.gather(
# factorial("A", 2),
# factorial("B", 3),
# factorial("C", 4),
# )
#
# asyncio.run(main())
'''屏蔽取消操作'''
'''asyncio.shield(aw, *, loop=None)保護一個 可等待對象 防止其被 取消'''
# res = await shield(something())#demo
'''如果希望完全忽略取消操作 (不推薦) 則 shield() 函數需要配合一個 try/except 代碼段'''
# try:
# res = await shield(something())
# except CancelledError:
# res = None
'''逾時'''
'''asyncio.wait_for(aw, timeout, *, loop=None)¶'''
# import asyncio
# async def eternity():
# # Sleep for one hour
# await asyncio.sleep(3600)
# print('yay!')
#
# async def main():
# # Wait for at most 1 second
# try:
# await asyncio.wait_for(eternity(), timeout=1.0)
# except asyncio.TimeoutError:
# print('timeout!')
#
# asyncio.run(main())
'''簡單等待'''
'''syncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)'''
# 用法:
# import asyncio
# done, pending = await asyncio.wait(aws)
# async def foo():
# return 42
#
# task = asyncio.create_task(foo())
# done, pending = await asyncio.wait({task})
#
# if task in done:
# asyncio.run(task)
# # Everything will work as expected now.
'''線上程中運作'''
'''asyncio.to_thread(func, /, *args, **kwargs)在不同的線程中異步地運作函數 func。'''
'''這個協程函數主要是用于執行在其他情況下會阻塞事件循環的 IO 密集型函數/方法'''
# import asyncio,time
# def blocking_io():
# print(f"start blocking_io at {time.strftime('%X')}")
# # Note that time.sleep() can be replaced with any blocking
# # IO-bound operation, such as file operations.
# time.sleep(1)
# print(f"blocking_io complete at {time.strftime('%X')}")
#
# async def main():
# print(f"started main at {time.strftime('%X')}")
#
# await asyncio.gather(
# asyncio.to_thread(blocking_io),
# asyncio.sleep(1))
# print(f"finished main at {time.strftime('%X')}")
# asyncio.run(main())
''':要取消一個正在運作的 Task 對象可使用 cancel() 方法。調用此方法将使該 Task 對象抛出一個 CancelledError 異常給打包的協程'''
'''以下示例示範了協程是如何偵聽取消請求的'''
# import asyncio
# async def cancel_me():
# print('cancel_me(): before sleep')
#
# try:
# # Wait for 1 hour
# await asyncio.sleep(3600)
# except asyncio.CancelledError:
# print('cancel_me(): cancel sleep')
# raise
# finally:
# print('cancel_me(): after sleep')
#
# async def main():
# # Create a "cancel_me" Task
# task = asyncio.create_task(cancel_me())
#
# # Wait for 1 second
# await asyncio.sleep(1)
#
# task.cancel()
# try:
# await task
# except asyncio.CancelledError:
# print("main(): cancel_me is cancelled now")
#
# asyncio.run(main())
'''基于生成器的協程'''
'''@asyncio.coroutine
用來标記基于生成器的協程的裝飾器。
此裝飾器使得舊式的基于生成器的協程能與 async/await 代碼相相容
'''
# import asyncio
# @asyncio.coroutine
# def old_style_coroutine():
# yield from asyncio.sleep(1)
#
# async def main():
# await old_style_coroutine()
'''隊列能被用于多個的并發任務的工作量配置設定:'''
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
如果你有一定的程式設計基礎,那麼你一定會通過本文章查漏補缺,同時領略python基礎。由于内容實在過多,本篇文章偏文法介紹,并沒有舉很多例子,如有不好之處多對見諒。