Pythonç¬è«å 个æ¥éª¤æä½ åå ¥mysqlæ°æ®åº
Pythonç¬è«å®ç°ç¬åç½ç«ä¸çæ°æ®å¹¶åå ¥MySQLæ°æ®åºä¸ï¼å¨ç¬åçæ¶åæ»è¦æ¶åå°æ°æ®æä¹ ååå¨ï¼å½ç¶æå¾å¤ä¸åå¨çæ¹å¼ï¼ç®åç¹çæexcelãtxtãjsonãcsvççãåå ¥mysqlæè§çæ好å¤æä½ç©ºé´ï¼å¦ææ¯å¼åpythonå端ä¹å¯ä»¥çæä¸ä¸sqlè¯å¥,åå ¥æ°æ®åºçæ¹æ³ä¹æ¯è¯äºäºè®¸ç½ä¸ä¸äºæ¹æ³ï¼ç°å¨æå®æ´åè½ä¾å¤§å®¶åèã
ç´æ¥æç´¢ phpStudyå®è£ å³å¯,æç §ä¸å¾é ç½®æ°æ®åºãç¨æ·åå¯ç èªè¡è®¾ç½®,ç¶åè¿åé¦é¡µå¯å¨å³å¯ã
pip install pymysql
æå¼åå®è£ çphpstudyå®è£ ä¸ä¸ªmysql客æ·ç«¯è¿æ¥ï¼æ°æ®åºæ¯æ¬å°çhostå¯ä»¥å¡« 127.0.0.1 æ localhostç¨æ·åå¯ç æ¯ä¸é¢è®¾ç½®ç
MySQLå建对åºç表
CREATEÂ TABLEÂ `text_archives`Â Â (
  `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 'é¾æ¥',
  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 'æ é¢',
  `image` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 'å¾ç',
  `keywords` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'å
³é®æè¿°',
  `description` varchar(600) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 'å
容æè¿°',
  `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT 'å
容',
  `weigh` int(10) NOT NULL DEFAULT 0 COMMENT 'æé',
  `createtime` bigint(16) NOT NULL DEFAULT 0 COMMENT 'å建æ¶é´',
  `updatetime` bigint(16) NOT NULL DEFAULT 0 COMMENT 'æ´æ°æ¶é´',
  `deletetime` bigint(16) NULL DEFAULT NULL COMMENT 'å é¤æ¶é´',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2692 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = 'å
容表' ROW_FORMAT = Dynamic;
SETÂ FOREIGN_KEY_CHECKSÂ =Â 1;
æé SQL è¯å¥çå符串 sql ï¼ç¶åéè¿ cursor.excute(sql) æ§è¡ï¼ä¸é¢ç®åçå°è£ ,ç´æ¥å¤å¶å³å¯ç¨ã
import pymysql
class Mysql(object):
    def __init__(self):
        self._connect = pymysql.connect(
            host='127.0.0.1',
            user='test',
            password='######',
            database='test',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        self._cursor = self._connect.cursor()
    def inset_db(self, table_name, insert_data):
        try:
            data = self.get_mysql_data(data=insert_data)
            fields = data[0]
            values = data[1]
            sql = "INSERT INTO {table_name}({fields}) values ({values})".format(table_name=table_name, fields=fields,
                                                                                values=values)
            self._cursor.execute(sql)
            self._connect.commit()
        except Exception as e:
            self._connect.rollback()  # å¦æè¿éæ¯æ§è¡çæ§è¡åå¨è¿ç¨çsqlå½ä»¤ï¼é£ä¹å¯è½ä¼åå¨rollbackçæ
åµï¼æ以è¿éåºè¯¥èèå°
            print("æ°æ®æå
¥å¤±è´¥ï¼å¤±è´¥åå :", e)
            print(insert_data)
        else:
            # self.db_close()
            return self._cursor.lastrowid
    def update_db(self, table_name, update_data, wheres=None):
        try:
            if wheres is not None:
                sql = "UPDATE {table_name} SET {update_data} WHERE {wheres}".format(
                    table_name=table_name,
                    update_data=update_data,
                    wheres=wheres
                )
            else:
                sql = "UPDATE {table_name} SET {update_data}".format(
                    table_name=table_name,
                    update_data=update_data)
            self._cursor.execute(sql)
            self._connect.commit()
        except Exception as e:
            print("æ´æ°å¤±è´¥:", e)
            return False
        else:
            # self.db_close()
            return True
    def delete_db(self, table_name, wheres):
        try:
            # æ建sqlè¯å¥
            sql = "DELETE FROM {table_name} WHERE {wheres}".format(table_name=table_name, wheres=wheres)
            self._cursor.execute(sql)
            self._connect.commit()
        except Exception as e:
            print('å é¤å¤±è´¥ï¼', e)
            return False
        else:
            # self.db_close()
            return True
    def select_db(self, table_name, fields, wheres=None, get_one=False):
        try:
            if wheres is not None:
                sql = "SELECT {fields} FROM {table_name} WHERE {wheres}".format(
                    fields=fields,
                    table_name=table_name,
                    wheres=wheres
                )
            else:
                sql = "SELECT {fields} FROM {table_name}".format(fields=fields, table_name=table_name)
            self._cursor.execute(sql)
            self._connect.commit()
            if get_one:
                result = self._cursor.fetchone()
            else:
                result = self._cursor.fetchall()
        except Exception as e:
            print("æ¥è¯¢å¤±è´¥", e)
            return None
        else:
            # self.db_close()
            return result
    def get_mysql_data(self, data):
        fields = ""
        insert_data = ""
        for k, v in data.items():
            fields = fields + k + ','
            insert_data = insert_data + "'" + str(v) + "'" + ','
        fields = fields.strip(',')
        insert_data = insert_data.strip(',')
        return [fields, insert_data]
    def db_close(self):
        self._cursor.close()
        self._connect.close()
è¿æ¬¡ç®åç¹å±ä»¬ç¨xpathå°±è¡,æä¸ä¸ªå°æå·§å±ä»¬å¨ç¬åçç½é¡µæå¼å¼åé½æ¨¡å¼F12.å¦ä¸å¾çº¢æ¡å¤å¶ç¬¬ä¸ä¸ªæé½ç¬¬äºä¸ªå°±è¡ã
ä¸é¢ä»£ç æ¯å®ç°ç¬åæ°æ®ç¶ååå ¥æ°æ®åºç±»ï¼å¤§å®¶å¯åè
from model.nav import Nav
import requests
from urllib import parse
from lxml import etree
from fake_useragent import UserAgent
from lib.reptile import Reptile
import json
class Common(object):
    def __init__(self, params):
        self.url = params['url']
        self.params = params
        self.blog = 1
 Â
    def get_header(self):
        ua = UserAgent()
        headers = {
            'User-Agent': ua.random
        }
        return headers
    def get_html(self, url):
        # å¨è¶
æ¶é´å
ï¼å¯¹äºå¤±è´¥é¡µé¢å°è¯è¯·æ±ä¸æ¬¡
        if self.blog <= 3:
            try:
                res = requests.get(url=url, headers=self.get_header(), timeout=3)
                res.encoding = res.apparent_encoding
                html = res.text
                return html
            except Exception as e:
                print(e)
                self.blog += 1
                self.get_html(url)
    def json_insert_data(self, params):
      category_id = self.insert_category(cname=params['category_name'], pid=params['pid'], icon='')
      print("åç±»æå
¥æåï¼{}".format(params['category_name']))
      if category_id:
          url = params['url']
          title = params['title']
          image = params['image']
          description = params['description']
          keywords = params['keywords']
          content = params['content']
          self.insert_archives(category_id, url, title, image, description, keywords, content)
          print("å
容æå
¥æåï¼{}".format(title))
          print("------------------------------------------------------------")
    def get_item(self, xpath_html):
        item_list = xpath_html.xpath(self.params['item_xpath'])
        print(item_list)
        for row in item_list:
            url_list = row.xpath(self.params['url_xpath'])
            if len(url_list) > 0:
                self.get_content(url_list[0])
    def get_content(self, url):
        print("æ£å¨æåé¾æ¥ï¼{}".format(url))
        domain = parse.urlparse(url).netloc
        d_domain = parse.urlparse(self.url).netloc
        if domain == d_domain:
            html = self.get_html(url)
            self.reptile.blog = 1
            if html:
                p = etree.HTML(html)
                title = self.get_conmon_content(p, self.params['title_xpath'])
                print("æ é¢ä¸ºï¼{}".format(title))
                category_name = self.get_conmon_content(p, self.params['category_xpath'])
                print("å类为ï¼{}".format(category_name))
                image = self.get_conmon_content(p, self.params['image_xpath'])
                print("å¾ç为ï¼{}".format(image))
                link = self.get_conmon_content(p, self.params['link_xpaht'])
                print("é¾æ¥ä¸ºï¼{}".format(link))
                description = self.get_conmon_content(p, self.params['description_xpath'])
                print("æ述为ï¼{}".format(description))
                keywords = self.get_conmon_content(p, self.params['keywords_xpath'])
                print("å
³é®æè¿°ï¼{}".format(keywords))
                content = self.get_conmon_content(p, self.params['content_xpath'])
                print("å
容为ï¼{}".format(content))
                params = {
                    "pid": 158,
                    "title": title,
                    "category_name": category_name,
                    "image": image,
                    'url': link,
                    'description': description,
                    'keywords': keywords,
                    'content': content,
                }
                if title and category_name and link:
                    self.json_insert_data(params)#åå
¥æ°æ®åº
    def get_conmon_content(self, xpath_html, xpath):
        content_list = xpath_html.xpath(xpath)
        content = ''
        if len(content_list) > 0:
            content = content_list[0].strip()
        return content
    def run(self):
        print("url:{}".format(self.url))
        html = self.get_html(self.url)
        if html:
            p = etree.HTML(html)
            self.get_item(p)
#ç¬åçxpath
params = {
    "url": "https://www.widiz.com/", #ç¬åurl
    "url_xpath": './/a[1]/@href',
    "title_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/h1/text()',
    "category_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/a[1]/text()',
    "image_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[2]/div/img/@src',
    "link_xpaht": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/div/div[1]/span/a/@href',
    "description_xpath": '/html/head/meta[10]/@content',
    "keywords_xpath": '/html/head/meta[5]/@content',
    "content_xpath": '/html/body/div[1]/div[2]/div[3]/main/div[1]/div/div[1]/div/div[2]/text()'
}
Common(params).run()
æç»ææï¼