天天看點

簡單的低頻指數投資二:每天定時擷取資料

apscheduler是一個python定時任務架構,我們可以利用它來完成一個每天自動擷取資料并且發出交易指令的系統。

from apscheduler.schedulers.blocking import BlockingScheduler
import tushare

pro = ts.pro_api(token = token)

def update_data():
    '''
    擷取指數價格以及市盈率等資料的函數
    '''
    a = data.IndexDataCrawler(pro)
    a.update_index()


scheduler = BlockingScheduler()
#設定每周一至周五晚上7.30分爬取資料
scheduler.add_job(update_data, 'cron', day_of_week='1-5', hour=19, minute=30)
scheduler.start()
           

這裡我們設定了一個blockingscheduler,這個函數可以幫助我們在指定時間點自動運作設定好的update_data函數,并通過這個函數來進行爬取資料。這裡IndexDataCrawler是一個設定好的類,主要用來輔助爬取資料,這裡也分享一下類中的爬取資料的函數。

import logging

class IndexDataCrawler(object):
    """docstring for ClassName"""
    def __init__(self, pro):
        super(IndexDataCrawler, self).__init__()
        self.log_path = './log/'
        self.pro = pro
    
    def set_logger(self):
        '''
        根據自己喜好來設定logger
        '''
        pass
        
    def update_index(self):
        connect, cursor = connect_sql()
        eng_con = con_eng()

        #1.\擷取所有基金清單
        sql = 'SELECT * FROM `Index`'
        cursor.execute(sql)
        col = cursor.description
        fund_list = cursor.fetchall()


        #2.對每個指數擷取資料
        for fund_tuple in fund_list:
            print(fund_tuple)
            ts_code = fund_tuple[2]
            table_name = ts_code.split('.')[0] + 'i'
            try:
                lack_date_list = get_lack_date(table_name, cursor, start_date = fund_tuple[5].date())
            except Exception as e:
                self.logger.warning('no date', e)
                lack_date_list = get_today_date(cursor)

            self.logger.info(str(lack_date_list))
            result_df = pd.DataFrame()
            
            try:
                for date in lack_date_list:
                    time_str = datetime_to_str(date)
                    
                    #擷取指數的價格等資訊
                    df = get_index_daily(self.pro, ts_code, time_str)
                    #擷取pe,pb等資訊
                    pe, pb = self.get_index_basic(ts_code, time_str)

                    if pe == 0 and pb == 0:
                        pass
                    else:
                        df.loc[0,'pe'] = pe
                        df.loc[0,'pb'] = pb

                    result_df = result_df.append(df)

            except Exception as e:
                self.logger.error(e)
            
            pd.io.sql.to_sql(result_df, table_name, con=eng_con, schema='stock_db', if_exists='append', index = False)
            self.logger.info('update index %s success' % (ts_code)) 
        close_sql(connect, cursor)
        close_eng(eng_con)
    
    def get_index_basic(self, ts_code, trade_date):
        '''
        更新基金的pe與pb
        ts_code: 基金的tushare代碼
        trade_date: 基金的日期
        '''
        total_pe = 0
        total_pb = 0
        #擷取離當天最近的一次指數權重更新
        if ts_code in ['000001.SH', '000016.SH', '000300.SH', '000905.SH']:
            #可以直接獲得pe資訊
            df = get_main_index_basic(self.pro, ts_code, trade_date)
            total_pe = df.loc[0,'pe']
            total_pb = df.loc[0,'pb']
        else:
            index_weight = get_index_weight(self.pro, ts_code, trade_date)
            if index_weight is None or len(index_weight) == 0:
                #輸出,暫無指數權重資料
                pass
            else:
                #選取最近的一次
                index_weight = index_weight[index_weight.trade_date == index_weight.trade_date.max()]
                #擷取pe值
                pe_df = get_basic(self.pro, trade_date = trade_date)

                merge_df = pd.merge(index_weight, pe_df, left_on='con_code', right_on='ts_code', how = 'inner')

                total_pe = merge_df.weight.sum() / (merge_df.weight / merge_df.pe).sum()
                total_pb = merge_df.weight.sum() / (merge_df.weight / merge_df.pb).sum()

        return total_pe, total_pb        
           

更新資料的函數主要分為兩部分,一部分是用來擷取指數的價格,成交量等資訊,另外一部分則是用來擷取指數的市盈率,市淨率。這裡對于上證指數,中證500等成分股較多的指數,由于tushare恰巧提供了這些指數的市盈率資料,是以我們可以直接爬取,而不用再去算了。

繼續閱讀