天天看点

impala查询语句_获取impala下所有的数据库建表语句

#!/usr/bin/env python#-*- coding:utf8 -*-#从mysql中提取hive建表语句

importos,sysimportfileinputimportdatetimeimportmysql.connector

reload(sys)

sys.setdefaultencoding("utf8")defhive_create_table():

conn= mysql.connector.connect(host="192.168.xxx.xxx",user='hive',passwd='123456',database='hive',charset='utf8')

mycursor=conn.cursor()#获取DB_ID

select_DB_ID = "select DB_ID from DBS;"mycursor.execute(select_DB_ID)

result_DB_ID=mycursor.fetchall()

fo= open("create_tab.sql", "w")for dir_DB_ID inresult_DB_ID :#获取数据库名

DB_ID = str(dir_DB_ID)[1:].split(',')[0]print(DB_ID)

select_DB_NAME= "select NAME from DBS where DB_ID="+DB_ID+";"

print(select_DB_NAME )

mycursor.execute(select_DB_NAME)

result_DB_NAME=mycursor.fetchone()

fo.write("\n===========数据库:"+str(result_DB_NAME).split('\'')[1]+"===========\n")

DBname=str(result_DB_NAME).split('\'')[1]print '数据库名字:' +DBnameprint(result_DB_NAME)#获取表名

select_table_name_sql = "select TBL_NAME from TBLS where DB_ID="+DB_ID+";"mycursor.execute(select_table_name_sql)

result_table_names=mycursor.fetchall()for table_name inresult_table_names :

fo.write("\nCREATE TABLE"+DBname +'.`'+str(table_name).split('\'')[1]+"`(\n")#根据表名获取SD_ID

select_table_SD_ID = "select SD_ID from TBLS where tbl_name='"+str(table_name).split('\'')[1]+"' and DB_ID="+DB_ID+";"

print(select_table_SD_ID)

mycursor.execute(select_table_SD_ID)

result_SD_ID=mycursor.fetchone()print(result_SD_ID )#根据SD_ID获取CD_ID

SD_ID=str(result_SD_ID)[1:].split(',')[0]

select_table_CD_ID= "select CD_ID from SDS where SD_ID="+str(result_SD_ID)[1:].split(',')[0]+";"

print(select_table_CD_ID)

mycursor.execute(select_table_CD_ID)

result_CD_ID=mycursor.fetchone()print(result_CD_ID)#根据CD_ID获取表的列

CD_ID=str(result_CD_ID)[1:].split(',')[0]

select_table_COLUMN_NAME= "select COLUMN_NAME,TYPE_NAME,COMMENT from COLUMNS_V2 where CD_ID="+str(result_CD_ID)[1:].split(',')[0]+"order by INTEGER_IDX;"

print(select_table_COLUMN_NAME)

mycursor.execute(select_table_COLUMN_NAME)

result_COLUMN_NAME=mycursor.fetchall()print(result_COLUMN_NAME)

index=0for col,col_type,col_name inresult_COLUMN_NAME:print(col)print(col_type)print(col_name)print(len(result_COLUMN_NAME) )#写入表的列和列的类型到文件

if col_name isNone:

fo.write("`"+str(col)+"`"+str(col_type))else:

fo.write("`"+str(col)+"`"+str(col_type) + "COMMENT '" + str(col_name) + "'")if index < len(result_COLUMN_NAME)-1:

index= index + 1fo.write(",\n")elif index == len(result_COLUMN_NAME)-1:

fo.write("\n)")#根据表名获取TBL_ID

select_table_SD_ID = "select TBL_ID from TBLS where tbl_name='"+str(table_name).split('\'')[1]+"' and DB_ID="+DB_ID+";"

print(select_table_SD_ID)

mycursor.execute(select_table_SD_ID)

result_TBL_ID=mycursor.fetchone()print(result_TBL_ID)#根据TBL_ID获取分区信息

select_table_PKEY_NAME_TYPE = "select PKEY_NAME,PKEY_TYPE,PKEY_COMMENT from PARTITION_KEYS where TBL_ID="+str(result_TBL_ID)[1:].split(',')[0]+"order by INTEGER_IDX;"

print(select_table_PKEY_NAME_TYPE)

mycursor.execute(select_table_PKEY_NAME_TYPE)

result_PKEY_NAME_TYPE=mycursor.fetchall()print(result_PKEY_NAME_TYPE)if len(result_PKEY_NAME_TYPE) >0:

fo.write("\nPARTITIONED BY (\n")

else:

fo.write("\n")

i=0for pkey_name,pkey_type,PKEY_COMMENT inresult_PKEY_NAME_TYPE:if str(PKEY_COMMENT) isNone:

fo.write("`"+str(pkey_name)+"`"+str(pkey_type))else:

fo.write("`"+str(pkey_name)+"`"+str(pkey_type) + "COMMENT '" + str(PKEY_COMMENT) + "'\n")if i < len(result_PKEY_NAME_TYPE)- 1:

i= i + 1fo.write(",")elif i == len(result_PKEY_NAME_TYPE) - 1:

fo.write(")\n")#根据表TBL_ID 获得中文名称

select_PARAM_VALUE01 = "select PARAM_VALUE from TABLE_PARAMS WHERE TBL_ID=( select TBL_ID from TBLS where tbl_name='"+str(table_name).split('\'')[1]+"' and DB_ID="+DB_ID+") and PARAM_KEY='comment';"

print(select_PARAM_VALUE01)

mycursor.execute(select_PARAM_VALUE01)

result_PARAM_VALUE01=mycursor.fetchone()printresult_PARAM_VALUE01if result_PARAM_VALUE01 isNone:print '未设置表名'

elif notresult_PARAM_VALUE01[0]:print '表名为空'

else:

fo.write("COMMENT '" + str(result_PARAM_VALUE01[0]) +"' \n")#根据SD_ID和CD_ID获取SERDE_ID

select_SERDE_ID = "select SERDE_ID from SDS where SD_ID="+SD_ID+"and CD_ID="+CD_ID+";"

print(select_SERDE_ID)

mycursor.execute(select_SERDE_ID)

result_SERDE_ID=mycursor.fetchone()print(result_SERDE_ID)#根据SERDE_ID获取PARAM_VALUE(列分隔符)

select_PARAM_VALUE = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+"and PARAM_KEY='field.delim';"

print(select_PARAM_VALUE)

mycursor.execute(select_PARAM_VALUE)

result_PARAM_VALUE=mycursor.fetchone()print(result_PARAM_VALUE)if result_PARAM_VALUE is notNone:

fo.write("ROW FORMAT DELIMITED\n")

fo.write("FIELDS TERMINATED BY '"+str(result_PARAM_VALUE).split('\'')[1]+"'\n")#根据SERDE_ID获取PARAM_VALUE(行分隔符)

select_PARAM_HNAG = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+"and PARAM_KEY='line.delim';"

print(select_PARAM_HNAG)

mycursor.execute(select_PARAM_HNAG)

RESULT_PARAM_HNAG=mycursor.fetchone()print(RESULT_PARAM_HNAG)if RESULT_PARAM_HNAG is notNone:

fo.write("LINES TERMINATED BY '"+str(RESULT_PARAM_HNAG).split('\'')[1]+"'\n")#根据SD_ID和CD_ID获取输入输出格式

select_table_STORE_FORMAT = "select INPUT_FORMAT from SDS where SD_ID="+SD_ID+"and CD_ID="+CD_ID+";"

print(select_table_STORE_FORMAT)

mycursor.execute(select_table_STORE_FORMAT)

result_table_STORE_FORMAT=mycursor.fetchall()print(result_table_STORE_FORMAT)for store_format inresult_table_STORE_FORMAT:if "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" instr(store_format):

fo.write("STORED AS ORC;\n")elif "org.apache.hadoop.mapred.TextInputFormat" instr(store_format):

fo.write("STORED AS TEXTFILE;\n")elif "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" instr(store_format):

fo.write("STORED AS PARQUET;\n")elif "org.apache.kudu.mapreduce.KuduTableInputFormat" instr(store_format):

fo.write("STORED AS KuduTable;\n")else:

fo.write("STORED AS null;\n")

fo.close()

hive_create_table()