天天看點

python連接配接各種資料庫(建議收藏)連接配接mysql連接配接hive連接配接neo4j圖資料庫(demo)

連接配接mysql

# 方法一
import pymysql.cursors
import pandas as pd

# Connect to the database

def get_mysql():
    config = {
        'host': '***.***.*.**',  # ip 位址
        'port': 3306,  # port  default
        'user': '***',
        'password': '***',
        'database': '***'
    }
    try:
        connection = pymysql.connect(
            host=config['host'],
            user=config['user'],
            password=config['password'],
            db=config['database'],
            charset='utf8',
            connect_timeout=700,
            cursorclass=pymysql.cursors.DictCursor
        )
    except pymysql.err.OperationalError:
        print('No database connection')
    else:
        return connection


class DB_operation(object):
    def __init__(self, name='default'):
        self.mysql = get_mysql()
        self.name = name

    def get_df_by_platform(self):
        sql = """SELECT  * from table"""
        with self.mysql.cursor() as cursor:
            cursor.execute(sql)
            fetch_all = cursor.fetchall()
        return pd.DataFrame(fetch_all)

    # ON DUPLICATE KEY UPDATE 當插入已存在主鍵的記錄時,将插入操作變為修改
    def insert_info(self, info_list):
        insert = """
        INSERT INTO
        `db`.`table`
        (`movie_info_id`, `insert_time`)
        VALUES (%s, %s)
        ON DUPLICATE KEY UPDATE
        `insert_time` = `insert_time`,
        `movie_info_id` = values(`movie_info_id`);
        """
        with self.mysql.cursor() as cursor:
            cursor.executemany(insert, info_list)
            self.mysql.commit()


drill = DB_operation()
data = drill.get_df_by_platform()
drill.insert_info(info_list)

# 第二種方法
from sqlalchemy import create_engine
engine = create_engine(
    'mysql://user:[email protected]***.***.*.**:3306/db',
    encoding='utf-8')
# 或者
engine = create_engine(
    'mysql+pymysql://user:[email protected]***.***.*.**:3306/db',
    encoding='utf-8')
# 查詢資料
engine.execute("SELECT * FROM table").fetchall()
# 導入到資料庫
df.to_sql(
    'temp_ds_item_info',
    engine,
    if_exists='append',
    index=False,
    chunksize=500)
           

連接配接hive

# 第一種方法
from pyhive import hive
import pandas as pd

class pyhive_db(object):
    def __init__(self):
        self.conn = hive.Connection(host='***.***.*.**', port=10000, auth=None,
                                    database='***', username='***',
                                    password='****')

    def query_data(self, sql):
        """
        Query the data form Apache Drill.
        :return: DataFrame
        """
        con = self.conn
        cursor = con.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
        result = pd.DataFrame(result)
        cursor.close()
        con.close()
        return result


sql = "select * from table limit 100;"
db = pyhive_db()
db.query_data(sql)

# 第二種方法
from pydrill.exceptions import ImproperlyConfigured
from pydrill.client import PyDrill
class GNPD_Drill(object):
    def __init__(self):
        """
        The Apache Drill helper of GNPD project.
        """
        self.host = '192.168.0.21'
        self.port = 8047
        self.auth = 'mapr:%F>X*<>a3v1DY_S?^_3Wz'
        self.storage = 'hive.gnpd'
        self.timeout = 20000

    def open_drill(self):
        """
        Open Apache Drill database.
        :return: drill: Apache Drill
        """
        drill = PyDrill(host=self.host, port=self.port, auth=self.auth)
        if not drill.is_active():
            raise ImproperlyConfigured('Please run Drill first.')
        drill.storage_enable(self.storage)
        return drill

    def query_data(self, sql):
        """
        Query the data form Apache Drill.
        :return: DataFrame
        """
        drill = self.open_drill()
        db_query = drill.query(sql, timeout=self.timeout)
        return db_query.to_dataframe()


drill = GNPD_Drill()
sql = """select * from table limit 100;"""
data = drill.query_data(sql=sql)
           

連接配接neo4j圖資料庫(demo)

from neo4j import GraphDatabase as G
import pandas as pd

# docs at https://neo4j.com/docs/api/python-driver/1.7/driver.html
driver = G.driver("bolt://***.**.***.***:7687",
                  auth=("***", "******"),
                  encrypted=False)

# print message

def print_message(tx):
    record = tx.run("""MATCH (n:MasterBrand)
     RETURN n.mstrBrandName""")
    df = pd.DataFrame(record, columns=['mstrBrandName'])
    return df


with driver.session() as session:
    df = session.read_transaction(print_message)
print(df)

# insert massage

def create_node(
        tx,
        mstrBrandName,
        masterItemName,
        masterItemKey,
        masterCreateDt):
    return tx.run("""CREATE (a:MasterItem {mstrBrandName:$mstrBrandName,
                  masterItemName:$masterItemName,masterItemKey:$masterItemKey,
                  masterCreateDt:$masterCreateDt})""", mstrBrandName=mstrBrandName,
                  masterItemName=masterItemName,
                  masterItemKey=masterItemKey,
                  masterCreateDt=masterCreateDt)


df = pd.read_csv(r'C:\Users\EDDC\Desktop\create_node_test.csv')

with driver.session() as session:
    for i in range(len(df)):
        session.write_transaction(
            create_node, df.iloc[i, 0], df.iloc[i, 1], df.iloc[i, 2], df.iloc[i, 3])

# set massage demo


def set_itemname(tx, ecomItemKey, updatemasterItemKeye):
    return tx.run("""MATCH (n:Item) where n.ecomItemKey=$ecomItemKey
    set n.updatemasterItemKeye=$updatemasterItemKeye RETURN n""", ecomItemKey=ecomItemKey,
                  updatemasterItemKeye=updatemasterItemKeye)


with driver.session() as session:
    session.read_transaction(set_itemname, '1010500000045647', '16705')
           

繼續閱讀