天天看點

Greenplum Python工具庫gpload學習——gpload類

gpload.py代碼寫得挺簡潔的,主要邏輯集中中gpload類中,其中run函數是最為重要的,而run函數主要邏輯就是調用run2函數。

if __name__ == '__main__':
    g = gpload(sys.argv[1:])
    g.run()
    sys.stdout.flush()
    sys.stderr.flush()
    os._exit(g.exitValue)      

gpload構造函數

def __init__(self,argv):
        self.threads = [] # remember threads so that we can join() against them
        self.exitValue = 0
        self.options = options()
        self.options.h = None
        self.options.gpfdist_timeout = None
        self.options.p = None
        self.options.U = None
        self.options.W = False
        self.options.D = False
        self.options.no_auto_trans = False
        self.options.password = None
        self.options.d = None
        self.DEBUG = 5
        self.LOG = 4
        self.INFO = 3
        self.WARN = 2
        self.ERROR = 1
        self.options.qv = self.INFO
        self.options.l = None
        self.formatOpts = ""
        self.startTimestamp = time.time()
        self.error_table = False
        self.gpdb_version = ""
        seenv = False
        seenq = False

        # Create Temp and External table names. However external table name could
        # get overwritten with another name later on (see create_external_table_name).
        # MPP-20927: gpload external table name problem. We use uuid to avoid
        # external table name confliction.
        self.unique_suffix = str(uuid.uuid1()).replace('-', '_')
        self.staging_table_name = 'temp_staging_gpload_' + self.unique_suffix
        self.extTableName  = 'ext_gpload_' + self.unique_suffix

        # SQL to run in order to undo our temporary work
        self.cleanupSql = []
        self.distkey = None
        configFilename = None
        while argv:
            try:
                try:
                    if argv[0]=='-h':
                        self.options.h = argv[1]
                        argv = argv[2:]
                    if argv[0]=='--gpfdist_timeout':
                        self.options.gpfdist_timeout = argv[1]
                        argv = argv[2:]
                    elif argv[0]=='-p':
                        self.options.p = int(argv[1])
                        argv = argv[2:]
                    elif argv[0]=='-l':
                        self.options.l = argv[1]
                        argv = argv[2:]
                    elif argv[0]=='-q':
                        self.options.qv -= 1
                        argv = argv[1:]
                        seenq = True
                    elif argv[0]=='--version':
                        sys.stderr.write("gpload version $Revision$\n")
                        sys.exit(0)
                    elif argv[0]=='-v':
                        self.options.qv = self.LOG
                        argv = argv[1:]
                        seenv = True
                    elif argv[0]=='-V':
                        self.options.qv = self.DEBUG
                        argv = argv[1:]
                        seenv = True
                    elif argv[0]=='-W':
                        self.options.W = True
                        argv = argv[1:]
                    elif argv[0]=='-D':
                        self.options.D = True
                        argv = argv[1:]
                    elif argv[0]=='-U':
                        self.options.U = argv[1]
                        argv = argv[2:]
                    elif argv[0]=='-d':
                        self.options.d = argv[1]
                        argv = argv[2:]
                    elif argv[0]=='-f':
                        configFilename = argv[1]
                        argv = argv[2:]
                    elif argv[0]=='--no_auto_trans':
                        self.options.no_auto_trans = True
                        argv = argv[1:]
                    elif argv[0]=='-?':
                        usage()
                    else:
                        break
                except IndexError:
                    sys.stderr.write("Option %s needs a parameter.\n"%argv[0])
                    sys.exit(2)
            except ValueError:
                sys.stderr.write("Parameter for option %s must be an integer.\n"%argv[0])
                sys.exit(2)

        if configFilename==None:
            usage('configuration file required')
        elif argv:
            a = ""
            if len(argv) > 1:
                a = "s"
            usage('unrecognized argument%s: %s' % (a, ' '.join(argv)))

        # default to gpAdminLogs for a log file, may be overwritten
        if self.options.l is None:
            self.options.l = os.path.join(os.environ.get('HOME', '.'),'gpAdminLogs')
            if not os.path.isdir(self.options.l):
                os.mkdir(self.options.l)

            self.options.l = os.path.join(self.options.l, 'gpload_' + \
                                          datetime.date.today().strftime('%Y%m%d') + '.log')

        try:
            self.logfile = open(self.options.l,'a')
        except Exception, e:
            self.log(self.ERROR, "could not open logfile %s: %s" % \
                      (self.options.l, e))

        if seenv and seenq:
            self.log(self.ERROR, "-q conflicts with -v and -V")

        if self.options.D:
            self.log(self.INFO, 'gpload has the -D option, so it does not actually load any data')

        try:
            f = open(configFilename,'r')
        except IOError,e:
            self.log(self.ERROR, "could not open configuration file: %s" % e)

        # pull in the config file, which should be in valid YAML
        try:
            # do an initial parse, validating the config file
            doc = f.read()
            self.config = yaml.load(doc)

            self.configOriginal = changeToUnicode(self.config)
            self.config = dictKeyToLower(self.config)
            ver = self.getconfig('version', unicode, extraStuff = ' tag')
            if ver != '1.0.0.1':
                self.control_file_error("gpload configuration schema version must be 1.0.0.1")
            # second parse, to check that the keywords are sensible
            y = yaml.compose(doc)
            # first should be MappingNode
            if not isinstance(y, yaml.MappingNode):
                self.control_file_error("configuration file must begin with a mapping")

            yaml_walk(self, y.value, [])
        except yaml.scanner.ScannerError,e:
            self.log(self.ERROR, "configuration file error: %s, line %s" % \
                (e.problem, e.problem_mark.line))
        except yaml.reader.ReaderError, e:
            es = ""
            if isinstance(e.character, str):
                es = "'%s' codec can't decode byte #x%02x: %s position %d" % \
                        (e.encoding, ord(e.character), e.reason,
                         e.position)
            else:
                es = "unacceptable character #x%04x at byte %d: %s"    \
                    % (ord(e.character), e.position, e.reason)
            self.log(self.ERROR, es)
        except yaml.error.MarkedYAMLError, e:
            self.log(self.ERROR, "configuration file error: %s, line %s" % \
                (e.problem, e.problem_mark.line))
        f.close()
        self.subprocesses = []
        self.log(self.INFO,'gpload session started ' + \
                 datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))      

其中最重要的函數就是加載yaml配置檔案,利用compose函數進行解析,然後使用yaml_walk函數對yaml檔案中的配置進行正确性的校驗。如果需要在該yaml配置檔案中加入新的關鍵字,需要在下面的字典中加入新增關鍵字條目,并設定是否需要解析該關鍵字的孩子關鍵字,以及設定該關鍵字的父關鍵字。

Greenplum Python工具庫gpload學習——gpload類

gpload run成員函數

def run2(self):
        self.log(self.DEBUG, 'config ' + str(self.config))
        start = time.time()
        self.read_config()
        self.setup_connection()
        self.read_table_metadata()
        self.read_columns()
        self.read_mapping()
        self.start_gpfdists()
        self.do_method()
        self.log(self.INFO, 'running time: %.2f seconds'%(time.time()-start))      

read_config函數主要是讀取表名,如果表名中帶有schema,則将其指派給self.schema,否則self.schema為None。以command line > config file > env variable優先級處理host、Port、User、database等參數。

setup_connection連接配接資料庫

read_table_metadata

read_table_metadata讀取表的中繼資料,​​Greenplum常用SQL——通過表名查找shema名​​​,​​Greenplum常用SQL——通過表名查詢列名、類型、是否具有序列​​

def read_table_metadata(self):
        # KAS Note to self. If schema is specified, then probably should use PostgreSQL rules for defining it.

        # find the shema name for this table (according to search_path)
        # if it was not explicitly specified in the configuration file.
        if self.schema is None:
            queryString = """SELECT n.nspname
                             FROM pg_catalog.pg_class c
                             LEFT JOIN pg_catalog.pg_namespace n
                             ON n.oid = c.relnamespace
                             WHERE c.relname = '%s'
                             AND pg_catalog.pg_table_is_visible(c.oid);""" % quote_unident(self.table)

            resultList = self.db.query(queryString.encode('utf-8')).getresult()

            if len(resultList) > 0:
                self.schema = (resultList[0])[0]
                self.log(self.INFO, "setting schema '%s' for table '%s'" % (self.schema, quote_unident(self.table)))
            else:
                self.log(self.ERROR, "table %s not found in any database schema" % self.table)


        queryString = """select nt.nspname as table_schema,
         c.relname as table_name,
         a.attname as column_name,
         a.attnum as ordinal_position,
         format_type(a.atttypid, a.atttypmod) as data_type,
         c.relkind = 'r' AS is_updatable,
         a.atttypid in (23, 20) and a.atthasdef and
             (select position ( 'nextval(' in pg_catalog.pg_get_expr(adbin,adrelid) ) > 0 and
                          position ( '::regclass)' in pg_catalog.pg_get_expr(adbin,adrelid) ) > 0
              FROM pg_catalog.pg_attrdef d
              WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef) as has_sequence
          from pg_catalog.pg_class c join pg_catalog.pg_namespace nt on (c.relnamespace = nt.oid)
             join pg_attribute a on (a.attrelid = c.oid)
         where c.relname = '%s' and nt.nspname = '%s'
         and a.attnum > 0 and a.attisdropped = 'f'
         order by a.attnum """ % (quote_unident(self.table), quote_unident(self.schema))

        count = 0
        self.into_columns = []
        self.into_columns_dict = dict()
        resultList = self.db.query(queryString.encode('utf-8')).dictresult()
        while count < len(resultList):
            row = resultList[count]
            count += 1
            ct = unicode(row['data_type'])
            if ct == 'bigserial':
               ct = 'bigint'
            elif ct == 'serial':
               ct = 'int4'
            name = unicode(row['column_name'], 'utf-8')
            name = quote_ident(name)
            if unicode(row['has_sequence']) != unicode('f'):
                has_seq = True
            else:
                has_seq = False
            i = [name,ct,None, has_seq]
            self.into_columns.append(i)
            self.into_columns_dict[name] = i
            self.log(self.DEBUG, "found input column: " + str(i))
        if count == 0:
            # see if it's a permissions issue or it actually doesn't exist
            tableName = quote_unident(self.table)
            tableSchema = quote_unident(self.schema)
            sql = """select 1 from pg_class c, pg_namespace n
                        where c.relname = '%s' and
                        n.nspname = '%s' and
                        n.oid = c.relnamespace""" % (tableName, tableSchema)
            resultList = self.db.query(sql.encode('utf-8')).getresult()
            if len(resultList) > 0:
                self.log(self.ERROR, "permission denied for table %s.%s" % \
                            (tableSchema, tableName))
            else:
               self.log(self.ERROR, 'table %s.%s does not exist in database %s'% (tableSchema, tableName, self.options.d))      

經過該函數我們得到了self.into_columns清單存放的是[name,ct,None, has_seq](列名,列類型,None,是否具有序列),into_columns_dict是鍵為列名,值為[name,ct,None, has_seq]的字典。

read_columns

read_columns用于使用者指定導入資料對應哪些列,構成元素為(列名,列類型,None,False)的from_columns。如果沒有指定該功能,則from_columns等于into_columns。

def read_columns(self):
        columns = self.getconfig('gpload:input:columns',list,None, returnOriginal=True)
        if columns != None:
            self.from_cols_from_user = True # user specified from columns
            self.from_columns = []
            for d in columns:
                if type(d)!=dict:
                    self.control_file_error("gpload:input:columns must be a sequence of YAML mappings")
                tempkey = d.keys()[0]
                value = d[tempkey]
                """ remove leading or trailing spaces """
                d = { tempkey.strip() : value }
                key = d.keys()[0]
                if d[key] is None:
                    self.log(self.DEBUG,
                             'getting source column data type from target')
                    for name, typ, mapto, hasseq in self.into_columns:
                        if sqlIdentifierCompare(name, key):
                            d[key] = typ
                            break

                # perform the same kind of magic type change that postgres does
                if d[key] == 'bigserial':
                    d[key] = 'bigint'
                elif d[key] == 'serial':
                    d[key] = 'int4'

                # Mark this column as having no mapping, which is important
                # for do_insert()
                self.from_columns.append([key.lower(),d[key].lower(),None, False])
        else:
            self.from_columns = self.into_columns
            self.from_cols_from_user = False

        # make sure that all columns have a type
        for name, typ, map, hasseq in self.from_columns:
            if typ is None:
                self.log(self.ERROR, 'column "%s" has no type ' % name +
                       'and does not appear in target table "%s"' % self.schemaTable)
        self.log(self.DEBUG, 'from columns are:')
        for c in self.from_columns:
            name = c[0]
            typ = c[1]
            self.log(self.DEBUG, '%s: %s'%(name,typ))      

read_mapping

如果配置了映射,則通過self.into_columns中的列名檢視映射的字典,如果有則将其映射的value,添加到self.into_columns清單相應元素的第3位。如果沒有配置映射,則将列名map anything yet to be mapped to itself,一一映射。

def read_mapping(self):
        mapping = self.getconfig('gpload:output:mapping',dict,None, returnOriginal=True)

        if mapping:
            for key,value in mapping.iteritems():
                if type(key) != unicode or type(value) != unicode:
                    self.control_file_error("gpload:output:mapping must be a YAML type mapping from strings to strings")
                found = False
                for a in self.into_columns:
                    if sqlIdentifierCompare(a[0], key) == True:
                       a[2] = value
                       found = True
                       break
                if found == False:
                    self.log(self.ERROR,'%s in mapping is not in table %s'% \
                                    (key, self.schemaTable))
        else:
            # Now, map anything yet to be mapped to itself, picking up on those
            # columns which are not found in the table.
            for x in self.from_columns:
                # Check to see if it already has a mapping value
                i = filter(lambda a:a[2] == x[0], self.into_columns)
                if not i:
                    # Check to see if the target column names match the input column names.
                    for a in self.into_columns:
                        if sqlIdentifierCompare(a[0], x[0]) == True:
                           i = a
                           break
                    if i:
                        if i[2] is None: i[2] = i[0]
                    else:
                        self.log(self.ERROR, 'no mapping for input column ' +
                                 '"%s" to output table' % x[0])
        for name,typ,mapto,seq in self.into_columns:
            self.log(self.DEBUG,'%s: %s = %s'%(name,typ,mapto))      

start_gpfdists

後續解析

do_method

如果開啟preload,則需要看看再insert模式下是否需要先truncate表,如果需要則truncate。從配置檔案中擷取reuse_tables、fast_match和staging_table。由于GP5不支援error_table,如果配置有選中該特性,則将reuse_tables和log_errors設定為true。

執行pre sql,針對不同模式執行相應函數

如果處于merge或update模式,需要truncate staging_table_name

執行after sql,如果no_auto_trans未打開且不是insert,則需要執行commit

def do_method(self):
        # Is the table to be truncated before the load?
        preload = self.getconfig('gpload:preload', list, default=None)
        method = self.getconfig('gpload:output:mode', unicode, 'insert').lower()
        self.log_errors = self.getconfig('gpload:input:log_errors', bool, False)
        truncate = False
        self.reuse_tables = False

        if not self.options.no_auto_trans and not method=='insert':
            self.db.query("BEGIN")

        if preload:
            truncate = self.getconfig('gpload:preload:truncate',bool,False)
            self.reuse_tables = self.getconfig('gpload:preload:reuse_tables',bool,False)
            self.fast_match = self.getconfig('gpload:preload:fast_match',bool,False)
            if self.reuse_tables == False and self.fast_match == True:
                self.log(self.WARN, 'fast_match is ignored when reuse_tables is false!')
            self.staging_table = self.getconfig('gpload:preload:staging_table', unicode, default=None)
        if self.error_table:
            self.log_errors = True
            self.reuse_tables = True
        if truncate == True:
            if method=='insert':
                self.do_truncate(self.schemaTable)
            else:
                self.log(self.ERROR, 'preload truncate operation should be used with insert ' +
                                     'operation only. used with %s' % method)

        # sql pre or post processing?
        sql = self.getconfig('gpload:sql', list, default=None)
        before   = None
        after    = None
        if sql:
            before   = self.getconfig('gpload:sql:before', unicode, default=None)
            after    = self.getconfig('gpload:sql:after', unicode, default=None)
        if before:
            self.log(self.LOG, "Pre-SQL from user: %s" % before)
            if not self.options.D:
                try:
                    self.db.query(before.encode('utf-8'))
                except Exception, e:
                    self.log(self.ERROR, 'could not execute SQL in sql:before "%s": %s' %
                             (before, str(e)))


        if method=='insert':
            self.do_method_insert()
        elif method=='update':
            self.do_method_update()
        elif method=='merge':
            self.do_method_merge()
        else:
            self.control_file_error('unsupported method %s' % method)

        # truncate the staging table to avoid dumping it's content - see MPP-15474
        if method=='merge' or method=='update':
            self.do_truncate(self.staging_table_name)

        if after:
            self.log(self.LOG, "Post-SQL from user: %s" % after)
            if not self.options.D:
                try:
                    self.db.query(after.encode('utf-8'))
                except Exception, e:
                    self.log(self.ERROR, 'could not execute SQL in sql:after "%s": %s' %
                             (after, str(e)))

        if not self.options.no_auto_trans and not method=='insert':
            self.db.query("COMMIT")