37  sqlite

37.1 查看数据工具

37.2 非ORM模式操作

37.2.1 连接数据库

import sqlite3
import pandas as pd
conn = sqlite3.connect("data/auth.db")
cursor = conn.cursor()  

37.2.2 查找数据表的名称

其实sqlite有且只有一个表

cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
result = cursor.fetchall()
if result:
    df = pd.DataFrame(result[0], columns=['table_name'])
else:
    df = pd.DataFrame(result, columns=['table_name'])

df
table_name
0 user_account

37.2.3 查看数据表的字段

table_name = "user_account"
cursor.execute(f"PRAGMA table_info({table_name})")
result = cursor.fetchall()

for row in result:
    cid = row[0]
    name = row[1]
    data_type = row[2]
    not_null = row[3]
    default_value = row[4]
    is_pk = row[5]
    print(f"Field Name: {name}")
    print(f"Data Type: {data_type}")
    print(f"Not Null: {not_null}")
    print(f"Default Value: {default_value}")
    print(f"Is Primary Key: {is_pk}")
    print("--------------")
Field Name: username
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 1
--------------
Field Name: role
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------
Field Name: password
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------
Field Name: password_md5
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------
Field Name: register_time
Data Type: DATETIME
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------

37.2.4 更新数据

# 将用户名"admin"改为"管理员"
cursor.execute("UPDATE `user_account` SET `username` = '管理员' WHERE `username` = 'admin';")

# 提交更改
conn.commit()

37.2.5 插入和删除数据

## 插入一行管理员账号
from datetime import datetime
import hashlib

def str2md5(string):
    # 创建MD5对象
    md5_obj = hashlib.md5()
    
    # 更新MD5对象的内容
    md5_obj.update(string.encode('utf-8'))
    
    # 返回MD5哈希值
    return md5_obj.hexdigest()


username = '管理员'
role = '管理员'
password = 'admin'
password_md5 = str2md5('admin')
register_time = datetime.now()

# 检查是否存在admin用户
sql_check = "SELECT * FROM user_account WHERE username = '管理员'"
cursor.execute(sql_check)

# 如果存在,则删除
if cursor.fetchone():
    sql_delete = "DELETE FROM user_account WHERE username = '管理员'"
    cursor.execute(sql_delete)

# 插入新记录
sql_insert = "INSERT INTO user_account (username, role, password, password_md5, register_time) VALUES (?, ?, ?, ?, ?)"
cursor.execute(sql_insert, (username, role, password, password_md5, register_time))


# 提交更改
conn.commit()

37.2.6 查询数据

cursor.execute("Select `username` From `user_account`;")

result = cursor.fetchall()



pd.DataFrame(result).head()
0
0 中医肛肠科
1 中医针灸
2 乳腺科
3 产科
4 儿科

37.2.7 关闭数据库连接

# 关闭数据库连接
conn.close()

37.3 ORM操作实例

37.3.1 将pdf文件文本内容存入数据库

import pdfplumber
import os
import hashlib
import uuid
import json
from peewee import *


# 从文件夹获取相应文件类型的函数
def get_files_by_extension(folder_path, file_extension: str = ".pdf"):
    _files = []
    for file_name in os.listdir(folder_path):
        if file_name.lower().endswith(file_extension.lower()):
            _files.append(os.path.join(folder_path, file_name))
    return _files


# 创建文件hash字符的函数
def generate_file_hash(file_path):
    with open(file_path, 'rb') as file:
        file_data = file.read()

    hash_object = hashlib.sha256()
    hash_object.update(file_data)

    hash_string = hash_object.hexdigest()

    return hash_string



# 创建数据库的函数
def createSqliteDB(DatabaseModel, safe):
    '''
    Usage Example: createSqliteDB(UploadFile)
    DatabaseModel需要创建,如UploadFile
    '''
    # 创建数据表(safe=True,表示仅在表不存在时创建)
    DatabaseModel.create_table(safe = safe)



# 建立数据库Model
class KNOWDATA(Model):
    key = CharField(primary_key=True)
    row_data = TextField()

    class Meta:
        database = SqliteDatabase('assets/sqlite/KnowData.db')


    @classmethod
    def insert_data(cls, file_type, file_name, file_hash, page_number, file_content):
        key = str(uuid.uuid4())
        data = {
            'key': key,
            'file_type': file_type,
            'file_name': file_name,
            'file_hash': file_hash,
            'page_number': page_number,
            'file_content': file_content
        }

        row_data = json.dumps(data, ensure_ascii = False)
        cls.create(key=key, row_data=row_data)
    
    
# 创建表, safe = True表示不重复创建,而是追加,safe = False,则是删除后重新创建数据库
createSqliteDB(KNOWDATA, safe = False)



# 将pdf文件每页内容存入sqlite数据库
pdf_folder = "static/know_data_file/pdf"

pdf_files = get_files_by_extension(pdf_folder, ".pdf")

for pdf_file in pdf_files:
    with pdfplumber.open(pdf_file) as pdf:
        page_num = 1
        for page in pdf.pages:
            file_content = page.extract_text()
            file_name = pdf_file
            page_number = page_num
            file_type = os.path.splitext(file_name)[1]
            file_hash = generate_file_hash(file_name)

            page_num += 1

            KNOWDATA.insert_data(file_type, file_name, file_hash, page_number, file_content)

print("数据库KNOWDATA已创建完毕!")

37.3.2 peewee Model

from peewee import *
# 建立数据库Model
class KNOWDATA(Model):
    key = CharField(primary_key=True)
    row_data = TextField()

    class Meta:
        database = SqliteDatabase('assets/sqlite/KnowData.db')


    @classmethod
    def insert_data(cls, file_type, file_name, file_hash, page_number, file_content):
        key = str(uuid.uuid4())
        data = {
            'key': key,
            'file_type': file_type,
            'file_name': file_name,
            'file_hash': file_hash,
            'page_number': page_number,
            'file_content': file_content
        }

        row_data = json.dumps(data, ensure_ascii = False)
        cls.create(key=key, row_data=row_data)

    
    
    @classmethod
    def query_data(cls, search_term):
        query = cls.select(cls.row_data).where(

                fn.JSON_EXTRACT(cls.row_data, '$.page_content') ** f'%{search_term}%'   # 模糊检索
                
                )



        result_dicts_list = [json.loads(row.row_data) for row in query]

        print(f"query_data_by_username_tablename.result_dicts_list[0]: {result_dicts_list[0]}")

        return result_dicts_list