天天看點

python blast線上比對 每天最多數萬條python blast線上比對、每天可達數萬條

python blast線上比對、每天可達數萬條

背景:在堿基序列,如

TRINITY_DN931509_c0_g1_i1

GCACGTTCTTCTCGATGCTGACGGAGCGAGGCGCGCTGGCGGAGCTGATGGC

GGCGCAGCTGGAGCGAGGGCGCACGCCGCGGGTGCGCCGGCGCCGGCCGC

GCCCCAGGCCCAGGGCCAGGCCGCGCCCGAGGCCGAGGAGTAGGAAATAAT

GATCTGTGACGAAACTAACCCACCCCTCGAGCTTGCGGAAATGCTGGAATCTG

CGACGTAGGCACTACTCGAACTTGAG

需要比對(即Basic Local Alignment Search Tool,BLAST)時,采取的方式主要有如下 :

  1. 美國國立生物技術資訊中心(National Center for Biotechnology Information ,NCBI)官網提供免費線上blast服務
  2. 搭建本地的blast環境。
  3. 使用雲服務商提供的付費BLAST服務,如blast2go

當需比對堿基序列量少時,即不考慮時間的情況下,上述三種方式均可;但如果需要大量比對時,正常操作下三種方式各有利弊,第一種和第三種的共同點——慢;第二種的比對極大消耗本地伺服器叢集的算力。本文試圖改良第一種方式提高效率

一.啰嗦一下線上BLAST的流程

1.首先進入比對網址:https://blast.ncbi.nlm.nih.gov/Blast.cgi?PROGRAM=blastx&PAGE_TYPE=BlastSearch&BLAST_SPEC=&LINK_LOC=blasttab&LAST_PAGE=blastn

線上blast序列可采用兩種上傳方式,直接粘貼到輸入框,或上傳本地序列檔案,如下圖。

python blast線上比對 每天最多數萬條python blast線上比對、每天可達數萬條

2.然後根據需要配置其他參數,點選下方的”BLAST” 按鈕進入漫長的等待(主要時間為等待NCBI伺服器處理送出的序列)如下圖

python blast線上比對 每天最多數萬條python blast線上比對、每天可達數萬條

這個頁面會自動重新整理,當比對完成後重新整理出的截面就是結果頁面;請注意圖中紅框标出的”Request ID” 的值,後面要用到。

3.結果頁面如下圖

python blast線上比對 每天最多數萬條python blast線上比對、每天可達數萬條

在多選框標明某特定堿基序列的選項,頁面會自動跳轉到該結果的頁面(通過該頁面定義為GetResults()的JavaScript函數),然後再根據需要再此頁面索取資訊。有時BLAST會比對不到結果(這是序列自身與資料庫中的堿基對沒比對上,請跟後面會提到的比對失敗差別開),上圖中,選項較淺且以 * 開頭的為沒有比對到結果的序列,即這些序列的比對結果為NULL或空。

二、分析

1.F12堿基序列送出後的等待頁面發現頁面在不斷的發送一個POST包到指定位址,以詢問是否出結果,如果出結果則跳轉到結果頁面(恕小弟才疏學淺沒有發現POST包的構成規律)。

2.發現結果頁面有GET方法可以通路https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Get&RID=***********;其中***********為上文提到的Request ID;也即是說,送出堿基序列過程和資訊抓取過程可以分開,浏覽器不必一直等待結果。可以送出序列,然後得到Request ID,另一個程式或裝置用Request ID來抓取資料。

3.通路到結果頁面後,通過不斷選擇下拉框的方式來獲得同一批送出的不同的堿基序列的結果,而這些操作,使用python+selenium是友善的。

4.不斷向NCBI伺服器發起比對請求,并使用多線程的方式處理不同比對RID的結果,提高程式效率。

三、實際操作

  1. 環境:

    windows 10+python 3.6.5(推薦搭配PyCharm)+phantomjs +Mysql

  2. 用到的python第三方擴充包:
    1. selenium
    2. pymysql
    3. openpyxl
  3. 預處理:
    1. Mysql中建立table2表存放堿基序列,結構為
n id dna

其中n為自增的主鍵,id為堿基序列的名稱,dna為堿基序列。

2.Mysql中建立submit表存放Request ID,結構為

RID TTime len

其中TTime為堿基序列送出的時間,len為本次送出的總長度。

将待處理堿基序列導入表table2中。可以參考下列python代碼

import pymysql
def sp(st):#分離序列的ID和序列
    b=st.split('\n')
    ID=b[]
    DNAs=''
    for i in b:
        if b.index(i)==:
            continue
        DNAs=DNAs+i
    return ID, DNAs
def DOSQLin(n,idL,DNAs):
#生成插入SQL語句
    id=idL.split(' ')
    ID=id[]
    sql = "insert into table2 (n,id,dna) values (%s,'%s','%s');"%(n,ID,DNAs)
    return sql
f = open('C:\\*.fasta','r') #
All=f.read()
a=All.split('>')#每一條序列都是>開頭的
del a[]#删除第一個空元素
#連結資料庫
conn = pymysql.connect(host='127.0.0.1', port=, user='root', passwd='luanchen',db='blast')
cur = conn.cursor()
n=
for i in a:
    ID,DNA=sp(i)
    sql=DOSQLin(n, ID, DNA)
    try:
        cur.execute(sql)
    except:
        prio
cur.close()
conn.cmmit()#送出改變結果
conn.close()
           

四、送出堿基序列得到Request ID

過程分為下列步驟:

  1. 從mysql表中得到若幹條待比對的堿基序列,再将這若幹條從mysql表中删除;
  2. 将1中得到的序列生成一個本地檔案(*.fasta) ;
  3. 通過selenium的方式打開線上BLAST網頁,上傳2得到的檔案;
  4. 得到上傳檔案後的Request ID,并儲存;
  5. 重複1~4直至mysql table2表中沒有序列。

python代碼(盡量加了很多注釋,實際寫得很亂…)

import string
from time import sleep
from selenium import webdriver
from selenium.webdriver.common.by import By
from threading import Thread
import random
import pymysql
def Fasta(lid,n=):#讀取table2表中若幹條資料生成檔案
    file = 'D:\\pills\\%s.fasta' % lid#應該事先建立該檔案夾,存放随機生成名字的序列檔案,下一步再将其上傳到NCBI。
    conn = pymysql.connect(host='127.0.0.1', port=, user='root', passwd='luanchen',db='hp')
    cur = conn.cursor()
    cur.execute('LOCK tables hp.table2 write')#給table2上表鎖,防止多個線程同時讀取時,删除序列時發生錯誤。
    sql='select n,id,dna from table2 order by n limit %s'% n
    cur.execute(sql)
    lf=
    di=
    f = open(file, 'w') #
    for i in cur:
        lf=lf+len(i[])
        if lf>:#控制送出的總長度,最好不超過3萬,否則容易被伺服器拒絕
            print('..')
            break
        di=di+
        f.write('>'+str(i[])+i[]+'\n')
        f.write(i[]+'\n')
    cur.execute('delete from table2 order by n limit %s'% di)#取table2的若幹條序列後将其在table2中删除
    conn.commit()
    cur.execute('Unlock tables;')#解開表鎖,使得其他線程可通路該資料表
    if di==:
        return 
    cur.close()
    conn.close()
    return file
def isExist(driver,id):#判斷頁面某元素是否存在
    try:
        driver.find_element_by_id(id)
        return True
    except:
        return False
def submit(RIDs,Len=):
    f = open('C:\\ursb.txt', 'a+')#備份RID到本地,有可能會有用
    f.write(RIDs+'\n')
    f.close()
    RID = RIDs.split(' ')
    sql = "insert into blast.submit (RID,len,TTime) values ('%s',%s,now());" % (RID[-], Len)
    conn = pymysql.connect(host='127.0.0.1', port=, user='root', passwd='luanchen', db='hp')
    cur = conn.cursor()
    cur.execute(sql)
    conn.commit()
    cur.close()
    conn.close()
def do(path):
    driver = webdriver.PhantomJS(executable_path="D:/phantomjs-2.1.1-windows/bin/phantomjs")#無頭浏覽器phantomjs的位置應該指定,或設定環境變量
    #driver = webdriver.Firefox()#測試時,應使用可見的浏覽器,友善發現問題并解決。
    try:
        driver.set_page_load_timeout()
        driver.get('https://blast.ncbi.nlm.nih.gov/Blast.cgi?PROGRAM=blastx&PAGE_TYPE=BlastSearch&LINK_LOC=blasthome')
    except:
        driver.quit()
        return do(path)
    try:
        upload = driver.find_element_by_id('upl')
        #sleep(3)
        upload.send_keys(path)  # send_keys 上傳本地檔案
    #print(upload.get_attribute('value'))  # check value
        driver.find_element_by_id('b1').click() #模拟點選BLAST 按鈕 送出檔案
    except:
        driver.quit()
        return do(path)
    r = ''
    for i in range():
        if isExist(driver,'statInfo'):#statInfo 頁面元素内部有RID
            try:
                table_loc = (By.ID, 'statInfo')
                table_tr_list = driver.find_element(*table_loc).find_elements(By.TAG_NAME, "tr")#通過元素定位方式找到RID
                for tr in table_tr_list:
                    LEN = len(open(path, 'r').read())
                    print(tr.text,LEN)
                    submit(tr.text, Len=LEN)
                    driver.quit()
                    break
                break
            except:
                sleep()
        else:
            continue
    return
def go(lid):#随機生産檔案名,go()執行一次送出。
    #lid并未用到,懶得改了= =
    suqs=[]
    Str=''.join(random.sample(string.ascii_letters + string.digits, ))
    file=Fasta(Str)
    do(file)
    return
def main(n):
    #注意線程數量和sleep的關系,按照實際情況調節。
    for i in range(n):
        try:
            go()
            sleep(random.randint(, ))#随機等待6-15秒
        except:
            continue
Thread(target=main,args=(,)).start()#這一行代碼建立了1個線程,線程不要建立太多,否則會被NCBI拒絕。
sleep()
main()
           

提一下:應該注意自己的Mysql連接配接方式,并且在代碼運作前,在MYSQL上試一下看相關的SQL語句能否運作,比如,第一次執行SQL的批量删除語句時會報錯

You are using safe update e and you tried to update a table…

這時應該在MYSQL上執行

SET SQL_SAFE_UPDATES = 0;

而這種類型的錯誤在python的錯誤資訊是不展現的。

運作結果:

python blast線上比對 每天最多數萬條python blast線上比對、每天可達數萬條

五、通過RID得到比對結果

步驟如下:

1. 通過submit表中的RID建構GET位址(上文二、分析中已提到),并在submit表中删除該條RID。

2. 通過firefox進入1中建構的網址

3. 使用

s = Select(driver.find_element_by_id('queryList')) s.select_by_index(n)

方式改變下拉框的標明,以期跳轉到不同堿基序列的頁面。

4. 通過元素定位的方式,得到結果頁面需要的資訊。

5. 重複3~4直至處理完當次RID的所有不同堿基序列的結果。

6. 重複1~5直至submit為空。

在實際操作的情況下,主要有兩種比對失敗的情況:

  1. 一直不出結果(伺服器原因):通過GET通路結果頁面卻出現等待結果界面,因為五、通過RID擷取結果的操作是具有人工(python程式)設定的查詢長度限制和20分鐘延後性,是以都應該得出結果了。這應該認為是比對失敗,跳過該條RID。事實上,當20分鐘沒有出現結果,數小時後也不會出結果。
  2. 發起的查詢請求堿基序列過長,被伺服器拒絕,如圖:
    python blast線上比對 每天最多數萬條python blast線上比對、每天可達數萬條
    這種情況視作(通過檢查相關元素是否存在來判斷)比對失敗,跳過該條RID。

除了比對失敗的情況,還有一種頁面不正常的情況,即標明下拉框選項後頁面不跳轉到選項對應的結果,這個時候如果抓取資訊的話會得到錯誤的資訊(誤以為跳轉前的資訊對應跳轉後的資訊);通過下圖中兩個紅框内的值是否等同,來判斷是否跳轉。

python blast線上比對 每天最多數萬條python blast線上比對、每天可達數萬條

如果沒跳轉用

driver.execute_script('GetReasult()')

執行頁面自帶的跳轉js代碼。

python代碼

from selenium import webdriver
from time import sleep
from selenium.webdriver.support.ui import Select
from selenium import webdriver
import threading
from selenium.webdriver.common.by import By
from threading import Thread
from collections import Counter
import random
import pymysql
import os
#停詞表
mu = threading.Lock()#線程鎖,避免同時讀取同意檔案發生錯誤
stop_words=['hypothetical','protein','predictd:']#停詞表
def isExist(driver,id):#判斷元素是否存在
    try:
        driver.find_element_by_id(id)
        return True
    except:
        return False
def getSc(DESs,Accs):#得分函數。。這應該根據自己需要來定義
    all=[]
    for i in DESs:
        word=i.replace('[', ' ').replace(']', ' ')
        words=word.split(' ')
        for j in words:
            if j.lower() in stop_words:
                continue
            all.append(j)
    count=Counter(all).most_common()
    if len(count)==:
        return DESs[],Accs[]
    Wmax=count[][]#出現頻率最大
    Wmax0=count[][]#第二大
    #高頻得分
    W0=count[][]*
    W=(count[][]-count[][])*+W0
    n=
    score=[]
    for i in range(len(Accs)):
        sc=
        if 'PREDICTED'in DESs[i]:
            sc=sc-
        if Wmax in DESs[i]:
            sc=W
        elif Wmax0 in DESs[i]:
            sc=W0
        else:
            sc=
        Tsc=n+sc
        score.append(Tsc)
        n=n-
    ind=score.index(max(score))
    return DESs[ind],Accs[ind]
def dele(upid): #删除取得的RID
    conn = pymysql.connect(host='47.106.121.42', port=, user='root', passwd='luanchen', db='blast')
    cur = conn.cursor()
    sql = "delete from submit where RID='%s'"% upid
    try:
        cur.execute(sql)
    except:
            print(sql)
    conn.commit()
    cur.close()
    conn.close()
def get_table_content(driver,tableId):#得到表内内容
    arr=[]
    arr1=[]
    DESs=[]
    Accs=[]
    table_loc=(By.ID,tableId)
# 按行查詢表格的資料,取出的資料是一整行,按空格分隔每一列的資料  
    try:
        table_tr_list=driver.find_element(*table_loc).find_elements(By.TAG_NAME,"tr")
    except:
        driver.refresh()
        driver.switch_to_alert().accept()
    n=
    for tr in table_tr_list:
        n = n + 
        if n==:
            continue
        if n>:
            break
        arr1=(tr.text).split(" ")#以空格拆分成若幹個(個數與列的個數相同)一維清單  
        DES=' '.join(arr1[:-])
        Acc=arr1[-]
        DESs.append(DES)
        Accs.append(Acc)
    des,acc=getSc(DESs,Accs)
    return des,acc
def output(suc):#輸出到檔案
    mu.acquire()#線程鎖
    file = 'D:\\fanal.txt'
    f = open(file, 'a+')
    for i in suc:
        if len(i)<:
            continue
        index=i[]
        n=index[:index.index('T')]
        id=index[index.index('T'):]
        f.write('%s\t%s\t%s\t%s\n'%(n,id,i[],i[]))
    f.close()
    mu.release()#釋放鎖
def do(driver,ht):#進入結果頁面後的操作
    while(True):
        if isExist(driver, 'msgFrm') or isExist(driver, 'statInfo'):
        #這是兩種比對錯誤的情況,一種是一直不出結果(伺服器原因),另一種是比對序列太長被伺服器拒絕
            print('wrong!!!')
            driver.quit()
            return -
        elif isExist(driver, 'descriptions')==False:
            driver.quit()
            driver = webdriver.Firefox()
            driver.get('https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Get&RID=%s' % ht)
            sleep()
            continue
        elif isExist(driver, 'queryList'):
            break
        else:
            continue
    s = Select(driver.find_element_by_id('queryList'))
    Nop=s.options
    suc=[]
    shitbug=[]
    for i in Nop:
        shitbug.append(i.text)
    n=
    b=
    l=len(shitbug)
    while(True):#存放選項name
        if isExist(driver, 'msgFrm') or isExist(driver, 'statInfo'):#比對失敗
            print('wrong!!!')
            driver.quit()
            return -
        if(n>=l):
           break
        if '*' in shitbug[n]:#沒有比對上
            item=[]#為了對齊
            item.append(shitbug[n].split(' ')[])
            item.append(' ')
            item.append(' ')
            #outline(item)
            suc.append(item)
            n=n+
            continue
        sleep()
        try:
            s = Select(driver.find_element_by_id('queryList'))
            s.select_by_index(n)
            sleep()
            #s = Select(driver.find_element_by_id('queryList'))
            item = []
            id = driver.find_element_by_xpath("//div[@id='querysummary']/dl/dd[3]").text
            if id not in shitbug[n]:
                try:
                    driver.execute_script('GetReasult()')
                    continue
                except:
                    continue
            des, acc = get_table_content(driver, 'dscTable')
            ##這是處理得到結果表格的操作,本文主要是通過結果表格每一行的情況得到最合适的describtion和access
            item.append(id)
            item.append(des)
            item.append(acc)
            #outline(item)
            suc.append(item)
            n=n+
            b=b+
        except:
            driver.refresh()
            try:
                driver.switch_to_alert().accept()
            except:
                continue
            sleep()
    driver.quit()
    if b<:
        return 
    output(suc)
    print('done')
    return suc
def read(n):#從submit表中取一條RID并在表中删除,再建構GET方法,通過打開的浏覽器通路GET方法,傳回浏覽器驅動,再執行do()
    #參數n當時用來标記的,這段代碼沒有用到
    if os.path.exists('C:\\stop.txt'):#作為程式停止的标記,當C:\\stop.txt存在時,程式停止運作
        return 
    conn = pymysql.connect(host='127.0.0.1', port=, user='root', passwd='luanchen', db='hp')
    cur = conn.cursor()
    cur.execute('LOCK tables hp.submit write')#上鎖
    sql = 'select RID,len from hp.submit where now()-TTime>1200 order by TTime limit %s' % #SQL語句查詢上傳時間超過1200秒的RID
    cur.execute(sql)
    n = 
    ht=''
    for i in cur:
        ht=(i[])
    if ht=='':
        return 
    sql = "delete from submit where RID='%s'" % ht
    try:
        cur.execute(sql)
    except:
        print(sql)
    conn.commit()
    cur.execute('Unlock tables;')
    cur.close()
    conn.close()
    driver = webdriver.Firefox()
    #driver = webdriver.PhantomJS(executable_path="E:/phantomjs-2.1.1-windows/bin/phantomjs")
    while(True):
        try:
            driver.get('https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Get&RID=%s' % ht)#get方法得到結果頁面
            sleep()
            break
        except:
            driver.quit()
            #river = webdriver.PhantomJS(executable_path="E:/phantomjs-2.1.1-windows/bin/phantomjs")
            driver = webdriver.Firefox()
            driver.get('https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Get&RID=%s' % ht)
            sleep()
            break
    suc=do(driver,ht)
    read(n)
def test():
    t=[]
    for i in range():#這裡設定線程數,根據CPU需要設定,建議使用線程池
        T=Thread(target=read, args=(i,))
        T.start()
        sleep()
test()
           

六、總結

  1. 第四過程的最好使用線程池管理線程。
  2. 在一次操作後,會有部分比對失敗的序列,應該分析待比對序列和結果,看哪些不在結果中,再将這些漏掉的重新導入mysql table2表中,這個操作不建議在mysql中操作,而用python操作,因為sql語句執行太慢。把每次的待比對序列放入一個備份表是會更友善的
  3. 根據裝置和網絡情況設定線程數。