LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

30天学会Python编程:24.Python数据库编程基础

admin
2025年7月17日 21:55 本文热度 27

1. 数据库基础概念

1.1 数据库类型对比

在Python数据库编程中,主要使用两种类型的数据库:

关系型数据库 (RDBMS)

  • 使用表格结构存储数据
  • 支持SQL查询语言
  • 遵循ACID原则(原子性、一致性、隔离性、持久性)
  • 典型代表:MySQL、PostgreSQL、SQLite

NoSQL数据库

  • 非表格结构存储数据
  • 灵活的数据模型
  • 高扩展性和性能
  • 典型代表:MongoDB(文档型)、Redis(键值对)


1.2 Python DB-API规范

Python的DB-API是数据库操作的标准化接口,核心组件包括:

  1. connect()
     - 建立数据库连接
  2. cursor()
     - 创建游标对象
  3. execute()
     - 执行SQL语句
  4. fetchone()/fetchall()
     - 获取查询结果
  5. commit()/rollback()
     - 事务控制

注意事项

  • 始终使用参数化查询防止SQL注入
  • 操作完成后关闭数据库连接
  • 使用事务保证数据一致性

2. SQLite操作

2.1 基本CRUD操作

import sqlite3

# 创建连接(自动创建数据库文件)
conn = sqlite3.connect('example.db', check_same_thread=False)
cursor = conn.cursor()

# 创建表(IF NOT EXISTS避免重复创建)
cursor.execute('''CREATE TABLE IF NOT EXISTS users
               (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)'''
)

# 插入数据(使用参数化查询)
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice'25))

# 查询数据
cursor.execute("SELECT * FROM users WHERE age > ?", (20,))
print(cursor.fetchall())  # 获取所有匹配记录

# 更新数据
cursor.execute("UPDATE users SET age = ? WHERE name = ?", (26'Alice'))

# 删除数据
cursor.execute("DELETE FROM users WHERE id = ?", (1,))

# 提交事务并关闭连接
conn.commit()
conn.close()

2.2 高级特性

# 使用上下文管理器自动处理连接
with sqlite3.connect('example.db'as conn:
    conn.row_factory = sqlite3.Row  # 字典式访问结果
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    for row in cursor:
        print(f"{row['name']}{row['age']}")  # 列名访问

# 内存数据库(临时数据库)
mem_db = sqlite3.connect(':memory:')

实践建议

  • 使用WITH语句确保连接正确关闭
  • 设置row_factory使结果更易读
  • 内存数据库适合临时数据处理和测试

3. MySQL/PostgreSQL操作

3.1 PyMySQL操作MySQL

import pymysql

# 连接MySQL数据库
conn = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='test',
    cursorclass=pymysql.cursors.DictCursor  # 返回字典格式结果
)

try:
    with conn.cursor() as cursor:
        # 创建表
        cursor.execute("""CREATE TABLE IF NOT EXISTS products (
                       id INT AUTO_INCREMENT PRIMARY KEY,
                       name VARCHAR(100) NOT NULL,
                       price DECIMAL(10,2) NOT NULL)"""
)
        
        # 插入数据
        sql = "INSERT INTO products (name, price) VALUES (%s, %s)"
        cursor.execute(sql, ('Laptop'999.99))
        
        # 查询数据
        cursor.execute("SELECT * FROM products WHERE price > %s", (500,))
        result = cursor.fetchone()
        print(result)
    
    # 提交事务
    conn.commit()
finally:
    conn.close()  # 确保连接关闭

3.2 psycopg2操作PostgreSQL

import psycopg2

# 连接PostgreSQL
conn = psycopg2.connect(
    host="localhost",
    database="test",
    user="postgres",
    password="password"
)

# 使用上下文管理自动提交事务
with conn:
    with conn.cursor() as cursor:
        # 插入数据并返回生成的ID
        cursor.execute("""
            INSERT INTO orders (customer, total)
            VALUES (%s, %s)
            RETURNING id
        """
, ("John Doe"150.75))
        order_id = cursor.fetchone()[0]
        print(f"新订单ID: {order_id}")

关键点

  • MySQL使用%s作为占位符,PostgreSQL也使用%s
  • PostgreSQL支持RETURNING子句获取插入后的数据
  • 始终使用上下文管理器(with)确保资源释放

4. ORM框架:SQLAlchemy与Django ORM

4.1 SQLAlchemy核心操作

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

Base = declarative_base()

# 定义数据模型
classUser(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100), unique=True)
    orders = relationship("Order", back_populates="user")

classOrder(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    total = Column(Integer)
    user = relationship("User", back_populates="orders")

# 初始化数据库连接
engine = create_engine('sqlite:///mydatabase.db')
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 添加新用户和订单
new_user = User(name="Alice", email="alice@example.com")
new_order = Order(total=100, user=new_user)
session.add(new_user)
session.add(new_order)
session.commit()

# 查询操作
user = session.query(User).filter_by(email="alice@example.com").first()
print(f"用户: {user.name}, 订单数: {len(user.orders)}")

session.close()

4.2 Django ORM查询

# models.py
from django.db import models

classProduct(models.Model):
    name = models.CharField(max_length=100)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    category = models.ForeignKey('Category', on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)

classCategory(models.Model):
    name = models.CharField(max_length=50)

# 查询操作
from django.db.models import Q, F, Count

# 复杂查询
products = Product.objects.filter(
    Q(price__lt=100) | Q(name__startswith="Premium"),
    created_at__year=2023
).annotate(
    discounted_price=F('price') * 0.9
).select_related('category')

# 聚合查询
from django.db.models import Avg, Sum
category_stats = Category.objects.annotate(
    product_count=Count('product'),
    avg_price=Avg('product__price')
).filter(product_count__gt=5)

ORM优势

  • 避免直接编写SQL语句
  • 提供面向对象的数据库操作接口
  • 自动处理数据库连接和事务
  • 支持高级查询和聚合操作

5. NoSQL数据库操作

5.1 MongoDB文档操作

from pymongo import MongoClient
from datetime import datetime

# 连接MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']
products = db['products']

# 插入文档
product_data = {
    "name""Wireless Headphones",
    "price"129.99,
    "categories": ["Electronics""Audio"],
    "stock"50,
    "last_updated": datetime.utcnow()
}
result = products.insert_one(product_data)
print(f"插入文档ID: {result.inserted_id}")

# 查询文档
query = {"price": {"$lt"150}, "categories""Electronics"}
for product in products.find(query).sort("price", -1).limit(5):
    print(product["name"], product["price"])

# 聚合管道
pipeline = [
    {"$match": {"price": {"$gt"100}}},
    {"$group": {
        "_id""$category",
        "avg_price": {"$avg""$price"},
        "count": {"$sum"1}
    }}
]
results = products.aggregate(pipeline)

5.2 Redis缓存与数据结构

import redis

# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 字符串操作
r.set('site:visits'100, ex=3600)  # 设置1小时过期
r.incr('site:visits')  # 增加访问计数

# 哈希操作(存储对象)
r.hset('user:1000', mapping={
    'name''Alice',
    'email''alice@example.com',
    'last_login''2023-06-15'
})

# 集合操作(标签系统)
r.sadd('product:123:tags''electronics''wireless''audio')

# 有序集合(排行榜)
r.zadd('leaderboard', {'player1'100'player2'85'player3'120})
top_players = r.zrevrange('leaderboard'02, withscores=True)

NoSQL适用场景举例

  • MongoDB:灵活的数据结构、快速原型开发
  • Redis:高速缓存、会话存储、实时排行榜

6. 数据库连接池管理

6.1 SQLAlchemy连接池实现

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# 创建连接池
engine = create_engine(
    'mysql+pymysql://user:password@localhost/dbname',
    poolclass=QueuePool,
    pool_size=10,         # 常驻连接数
    max_overflow=5,       # 最大临时连接数
    pool_timeout=30,      # 获取连接超时时间(秒)
    pool_recycle=3600     # 连接回收时间(秒)
)

# 使用连接
with engine.connect() as conn:
    result = conn.execute("SELECT * FROM users")
    for row in result:
        print(row)

6.2 连接池实践

  1. 连接池大小设置

    • 初始连接数 = 预期平均并发请求数
    • 最大连接数 = 预期峰值并发请求数 × 1.5
  2. 连接回收策略

    • 定期回收连接防止数据库超时断开
    • 设置合理的pool_recycle值(通常小于数据库连接超时时间)
  3. 资源管理

    • 使用with语句确保连接正确释放
    • 监控连接池使用情况


7. 应用举例

7.1 电商订单系统示例

# 使用SQLAlchemy实现电商核心模型
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
import datetime

Base = declarative_base()

classUser(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True)
    orders = relationship("Order", back_populates="user")

classProduct(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    price = Column(Float, nullable=False)
    stock = Column(Integer, default=0)

classOrder(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    status = Column(String(20), default='pending')
    
    user = relationship("User", back_populates="orders")
    items = relationship("OrderItem", back_populates="order")

classOrderItem(Base):
    __tablename__ = 'order_items'
    id = Column(Integer, primary_key=True)
    order_id = Column(Integer, ForeignKey('orders.id'))
    product_id = Column(Integer, ForeignKey('products.id'))
    quantity = Column(Integer, default=1)
    price = Column(Float)  # 下单时的价格快照
    
    order = relationship("Order", back_populates="items")
    product = relationship("Product")

# 使用示例
defcreate_order(user, products):
    """创建新订单"""
    order = Order(user=user)
    for product, quantity in products:
        # 检查库存
        if product.stock < quantity:
            raise ValueError(f"{product.name}库存不足")
        # 创建订单项
        order.items.append(OrderItem(
            product=product,
            quantity=quantity,
            price=product.price
        ))
        # 减少库存
        product.stock -= quantity
    return order

7.2 数据库缓存策略示例

import sqlite3
import redis
import json
import time

classCachedDB:
    """带Redis缓存的数据库访问层"""
    
    def__init__(self, db_path=':memory:', cache_expire=300):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)
        self.db_conn = sqlite3.connect(db_path)
        self.cache_expire = cache_expire  # 缓存过期时间(秒)
        self._init_db()
    
    def_init_db(self):
        """初始化数据库表结构"""
        cur = self.db_conn.cursor()
        cur.execute('''CREATE TABLE IF NOT EXISTS articles (
                   id INTEGER PRIMARY KEY,
                   title TEXT NOT NULL,
                   content TEXT,
                   views INTEGER DEFAULT 0,
                   created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)'''
)
        self.db_conn.commit()
    
    defget_article(self, article_id):
        """获取文章(带缓存)"""
        cache_key = f"article:{article_id}"
        cached = self.redis.get(cache_key)
        
        if cached:
            # 缓存命中
            article = json.loads(cached)
            article['from_cache'] = True
            return article
        
        # 缓存未命中,查询数据库
        cur = self.db_conn.cursor()
        cur.execute("SELECT * FROM articles WHERE id=?", (article_id,))
        row = cur.fetchone()
        
        ifnot row:
            returnNone
        
        # 更新访问计数
        cur.execute("UPDATE articles SET views = views + 1 WHERE id=?", (article_id,))
        self.db_conn.commit()
        
        # 构建文章对象
        article = {
            'id': row[0],
            'title': row[1],
            'content': row[2],
            'views': row[3],
            'created_at': row[4],
            'from_cache'False
        }
        
        # 写入缓存
        self.redis.setex(cache_key, self.cache_expire, json.dumps(article))
        return article
    
    defcreate_article(self, title, content):
        """创建新文章"""
        cur = self.db_conn.cursor()
        cur.execute("INSERT INTO articles (title, content) VALUES (?, ?)",
                   (title, content))
        self.db_conn.commit()
        article_id = cur.lastrowid
        # 清除可能的缓存
        self.redis.delete(f"article:{article_id}")
        return article_id
    
    defsearch_articles(self, keyword):
        """文章搜索(不使用缓存)"""
        cur = self.db_conn.cursor()
        cur.execute("SELECT id, title FROM articles WHERE title LIKE ? OR content LIKE ?",
                   (f'%{keyword}%'f'%{keyword}%'))
        return [dict(id=row[0], title=row[1]) for row in cur.fetchall()]

8. 知识图谱


总结

  1. 安全第一

    • 始终使用参数化查询防止SQL注入
    • 保护数据库凭据,使用环境变量存储密码
  2. 性能优化

    • 使用连接池管理数据库连接
    • 对频繁查询添加适当索引
    • 合理使用缓存减少数据库压力
  3. 代码可维护性

    • 使用ORM框架管理复杂数据库操作
    • 将数据库访问逻辑与业务逻辑分离
    • 对复杂查询添加注释说明
  4. 错误处理

    • 实现健壮的错误处理和重试机制
    • 使用事务保证数据一致性
    • 记录数据库操作日志
  5. 测试与监控

    • 编写数据库操作单元测试
    • 监控慢查询和连接池使用情况
    • 定期备份重要数据

通过掌握这些知识和技巧,我们将能够构建高效、安全且可维护的Python数据库应用程序。


阅读原文:原文链接


该文章在 2025/7/18 10:53:58 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved