import sqlite3
import pandas as pd
conn = sqlite3.connect("data/auth.db")
cursor = conn.cursor() 37 sqlite
37.1 查看数据工具
- DB Browser for SQLite https://sqlitebrowser.org/dl/
37.2 非ORM模式操作
37.2.1 连接数据库
37.2.2 查找数据表的名称
其实sqlite有且只有一个表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
result = cursor.fetchall()
if result:
df = pd.DataFrame(result[0], columns=['table_name'])
else:
df = pd.DataFrame(result, columns=['table_name'])
df| table_name | |
|---|---|
| 0 | user_account |
37.2.3 查看数据表的字段
table_name = "user_account"
cursor.execute(f"PRAGMA table_info({table_name})")
result = cursor.fetchall()
for row in result:
cid = row[0]
name = row[1]
data_type = row[2]
not_null = row[3]
default_value = row[4]
is_pk = row[5]
print(f"Field Name: {name}")
print(f"Data Type: {data_type}")
print(f"Not Null: {not_null}")
print(f"Default Value: {default_value}")
print(f"Is Primary Key: {is_pk}")
print("--------------")Field Name: username
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 1
--------------
Field Name: role
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------
Field Name: password
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------
Field Name: password_md5
Data Type: VARCHAR(255)
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------
Field Name: register_time
Data Type: DATETIME
Not Null: 1
Default Value: None
Is Primary Key: 0
--------------
37.2.4 更新数据
# 将用户名"admin"改为"管理员"
cursor.execute("UPDATE `user_account` SET `username` = '管理员' WHERE `username` = 'admin';")
# 提交更改
conn.commit()37.2.5 插入和删除数据
## 插入一行管理员账号
from datetime import datetime
import hashlib
def str2md5(string):
# 创建MD5对象
md5_obj = hashlib.md5()
# 更新MD5对象的内容
md5_obj.update(string.encode('utf-8'))
# 返回MD5哈希值
return md5_obj.hexdigest()
username = '管理员'
role = '管理员'
password = 'admin'
password_md5 = str2md5('admin')
register_time = datetime.now()
# 检查是否存在admin用户
sql_check = "SELECT * FROM user_account WHERE username = '管理员'"
cursor.execute(sql_check)
# 如果存在,则删除
if cursor.fetchone():
sql_delete = "DELETE FROM user_account WHERE username = '管理员'"
cursor.execute(sql_delete)
# 插入新记录
sql_insert = "INSERT INTO user_account (username, role, password, password_md5, register_time) VALUES (?, ?, ?, ?, ?)"
cursor.execute(sql_insert, (username, role, password, password_md5, register_time))
# 提交更改
conn.commit()37.2.6 查询数据
cursor.execute("Select `username` From `user_account`;")
result = cursor.fetchall()
pd.DataFrame(result).head()| 0 | |
|---|---|
| 0 | 中医肛肠科 |
| 1 | 中医针灸 |
| 2 | 乳腺科 |
| 3 | 产科 |
| 4 | 儿科 |
37.2.7 关闭数据库连接
# 关闭数据库连接
conn.close()37.3 ORM操作实例
37.3.1 将pdf文件文本内容存入数据库
import pdfplumber
import os
import hashlib
import uuid
import json
from peewee import *
# 从文件夹获取相应文件类型的函数
def get_files_by_extension(folder_path, file_extension: str = ".pdf"):
_files = []
for file_name in os.listdir(folder_path):
if file_name.lower().endswith(file_extension.lower()):
_files.append(os.path.join(folder_path, file_name))
return _files
# 创建文件hash字符的函数
def generate_file_hash(file_path):
with open(file_path, 'rb') as file:
file_data = file.read()
hash_object = hashlib.sha256()
hash_object.update(file_data)
hash_string = hash_object.hexdigest()
return hash_string
# 创建数据库的函数
def createSqliteDB(DatabaseModel, safe):
'''
Usage Example: createSqliteDB(UploadFile)
DatabaseModel需要创建,如UploadFile
'''
# 创建数据表(safe=True,表示仅在表不存在时创建)
DatabaseModel.create_table(safe = safe)
# 建立数据库Model
class KNOWDATA(Model):
key = CharField(primary_key=True)
row_data = TextField()
class Meta:
database = SqliteDatabase('assets/sqlite/KnowData.db')
@classmethod
def insert_data(cls, file_type, file_name, file_hash, page_number, file_content):
key = str(uuid.uuid4())
data = {
'key': key,
'file_type': file_type,
'file_name': file_name,
'file_hash': file_hash,
'page_number': page_number,
'file_content': file_content
}
row_data = json.dumps(data, ensure_ascii = False)
cls.create(key=key, row_data=row_data)
# 创建表, safe = True表示不重复创建,而是追加,safe = False,则是删除后重新创建数据库
createSqliteDB(KNOWDATA, safe = False)
# 将pdf文件每页内容存入sqlite数据库
pdf_folder = "static/know_data_file/pdf"
pdf_files = get_files_by_extension(pdf_folder, ".pdf")
for pdf_file in pdf_files:
with pdfplumber.open(pdf_file) as pdf:
page_num = 1
for page in pdf.pages:
file_content = page.extract_text()
file_name = pdf_file
page_number = page_num
file_type = os.path.splitext(file_name)[1]
file_hash = generate_file_hash(file_name)
page_num += 1
KNOWDATA.insert_data(file_type, file_name, file_hash, page_number, file_content)
print("数据库KNOWDATA已创建完毕!")37.3.2 peewee Model
from peewee import *
# 建立数据库Model
class KNOWDATA(Model):
key = CharField(primary_key=True)
row_data = TextField()
class Meta:
database = SqliteDatabase('assets/sqlite/KnowData.db')
@classmethod
def insert_data(cls, file_type, file_name, file_hash, page_number, file_content):
key = str(uuid.uuid4())
data = {
'key': key,
'file_type': file_type,
'file_name': file_name,
'file_hash': file_hash,
'page_number': page_number,
'file_content': file_content
}
row_data = json.dumps(data, ensure_ascii = False)
cls.create(key=key, row_data=row_data)
@classmethod
def query_data(cls, search_term):
query = cls.select(cls.row_data).where(
fn.JSON_EXTRACT(cls.row_data, '$.page_content') ** f'%{search_term}%' # 模糊检索
)
result_dicts_list = [json.loads(row.row_data) for row in query]
print(f"query_data_by_username_tablename.result_dicts_list[0]: {result_dicts_list[0]}")
return result_dicts_list