連接配接mysql
# 方法一
import pymysql.cursors
import pandas as pd
# Connect to the database
def get_mysql():
config = {
'host': '***.***.*.**', # ip 位址
'port': 3306, # port default
'user': '***',
'password': '***',
'database': '***'
}
try:
connection = pymysql.connect(
host=config['host'],
user=config['user'],
password=config['password'],
db=config['database'],
charset='utf8',
connect_timeout=700,
cursorclass=pymysql.cursors.DictCursor
)
except pymysql.err.OperationalError:
print('No database connection')
else:
return connection
class DB_operation(object):
def __init__(self, name='default'):
self.mysql = get_mysql()
self.name = name
def get_df_by_platform(self):
sql = """SELECT * from table"""
with self.mysql.cursor() as cursor:
cursor.execute(sql)
fetch_all = cursor.fetchall()
return pd.DataFrame(fetch_all)
# ON DUPLICATE KEY UPDATE 當插入已存在主鍵的記錄時,将插入操作變為修改
def insert_info(self, info_list):
insert = """
INSERT INTO
`db`.`table`
(`movie_info_id`, `insert_time`)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE
`insert_time` = `insert_time`,
`movie_info_id` = values(`movie_info_id`);
"""
with self.mysql.cursor() as cursor:
cursor.executemany(insert, info_list)
self.mysql.commit()
drill = DB_operation()
data = drill.get_df_by_platform()
drill.insert_info(info_list)
# 第二種方法
from sqlalchemy import create_engine
engine = create_engine(
'mysql://user:[email protected]***.***.*.**:3306/db',
encoding='utf-8')
# 或者
engine = create_engine(
'mysql+pymysql://user:[email protected]***.***.*.**:3306/db',
encoding='utf-8')
# 查詢資料
engine.execute("SELECT * FROM table").fetchall()
# 導入到資料庫
df.to_sql(
'temp_ds_item_info',
engine,
if_exists='append',
index=False,
chunksize=500)
連接配接hive
# 第一種方法
from pyhive import hive
import pandas as pd
class pyhive_db(object):
def __init__(self):
self.conn = hive.Connection(host='***.***.*.**', port=10000, auth=None,
database='***', username='***',
password='****')
def query_data(self, sql):
"""
Query the data form Apache Drill.
:return: DataFrame
"""
con = self.conn
cursor = con.cursor()
cursor.execute(sql)
result = cursor.fetchall()
result = pd.DataFrame(result)
cursor.close()
con.close()
return result
sql = "select * from table limit 100;"
db = pyhive_db()
db.query_data(sql)
# 第二種方法
from pydrill.exceptions import ImproperlyConfigured
from pydrill.client import PyDrill
class GNPD_Drill(object):
def __init__(self):
"""
The Apache Drill helper of GNPD project.
"""
self.host = '192.168.0.21'
self.port = 8047
self.auth = 'mapr:%F>X*<>a3v1DY_S?^_3Wz'
self.storage = 'hive.gnpd'
self.timeout = 20000
def open_drill(self):
"""
Open Apache Drill database.
:return: drill: Apache Drill
"""
drill = PyDrill(host=self.host, port=self.port, auth=self.auth)
if not drill.is_active():
raise ImproperlyConfigured('Please run Drill first.')
drill.storage_enable(self.storage)
return drill
def query_data(self, sql):
"""
Query the data form Apache Drill.
:return: DataFrame
"""
drill = self.open_drill()
db_query = drill.query(sql, timeout=self.timeout)
return db_query.to_dataframe()
drill = GNPD_Drill()
sql = """select * from table limit 100;"""
data = drill.query_data(sql=sql)
連接配接neo4j圖資料庫(demo)
from neo4j import GraphDatabase as G
import pandas as pd
# docs at https://neo4j.com/docs/api/python-driver/1.7/driver.html
driver = G.driver("bolt://***.**.***.***:7687",
auth=("***", "******"),
encrypted=False)
# print message
def print_message(tx):
record = tx.run("""MATCH (n:MasterBrand)
RETURN n.mstrBrandName""")
df = pd.DataFrame(record, columns=['mstrBrandName'])
return df
with driver.session() as session:
df = session.read_transaction(print_message)
print(df)
# insert massage
def create_node(
tx,
mstrBrandName,
masterItemName,
masterItemKey,
masterCreateDt):
return tx.run("""CREATE (a:MasterItem {mstrBrandName:$mstrBrandName,
masterItemName:$masterItemName,masterItemKey:$masterItemKey,
masterCreateDt:$masterCreateDt})""", mstrBrandName=mstrBrandName,
masterItemName=masterItemName,
masterItemKey=masterItemKey,
masterCreateDt=masterCreateDt)
df = pd.read_csv(r'C:\Users\EDDC\Desktop\create_node_test.csv')
with driver.session() as session:
for i in range(len(df)):
session.write_transaction(
create_node, df.iloc[i, 0], df.iloc[i, 1], df.iloc[i, 2], df.iloc[i, 3])
# set massage demo
def set_itemname(tx, ecomItemKey, updatemasterItemKeye):
return tx.run("""MATCH (n:Item) where n.ecomItemKey=$ecomItemKey
set n.updatemasterItemKeye=$updatemasterItemKeye RETURN n""", ecomItemKey=ecomItemKey,
updatemasterItemKeye=updatemasterItemKeye)
with driver.session() as session:
session.read_transaction(set_itemname, '1010500000045647', '16705')