SQLAlchemy
演示插入数据代码
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String
from sqlalchemy.orm import sessionmaker
#创建对象的基类:
Base = declarative_base()
#定义User对象:
class User(Base):
#表的名字:
__tablename__ = 'user'
#表的结构:
userid = Column(Integer,primary_key=True)
username = Column(String(20))
age = Column(Integer)
department = Column(String(20))
#初始化数据库连接
engine = create_engine("mysql+pymysql://kang:[email protected]/test",encoding="utf-8",echo=True)
#创建session类型
DBSession = sessionmaker(bind=engine)
#创建session对象
session = DBSession()
#创建新的user对象
new_user1 = User(username='CC',age=25,department='IT')
new_user2 = User(username="LILI",age=30,department='HR')
new_user3 = User(username='JOHN',age=22,department='IT')
#添加单条数据(添加数据,但还没有提交,出错还可以使用rollback撤回操作)
new_user = User(name='lily')
#session.add(new_user1)
#添加到session
session.add_all([new_user1,new_user2,new_user3])
#提交即保存到数据库
session.commit()
#提交到数据,这一步才是真正的将数据插入到数据库中了
session.commit()
#关闭session
session.close()
概念和数据类型
概念
| 概念 | 对应数据库 | 说明 |
|---|---|---|
| Engine | 连接 | 驱动引擎 |
| Session | 连接池,事务 | 由此开始查询 |
| Model | 表 | 类定义 |
| Column | 列 | |
| Query | 若干行 | 可以链式添加多个条件 |
常见数据类型
| 数据类型 | 数据库数据类型 | python数据类型 | 说明 |
|---|---|---|---|
| Integer | int | int | 整形,32位 |
| String | varchar | string | 字符串 |
| Text | text | string | 长字符串 |
| Float | float | float | 浮点型 |
| Boolean | tinyint | bool | True / False |
| Date | date | datetime.date | 存储时间年月日 |
| DateTime | datetime | datetime.datetime | 存储年月日时分秒毫秒等 |
| Time | time | datetime.datetime | 存储时分秒 |
使用步骤
数据库连接
SQLAlchemy是Python中常用的一个ORM,SQLAlchemy分成三部分:
- ORM,就是我们用类来表示数据库schema的那部分
- SQLAlchemy Core,就是一些基础的操作,例如
update,insert等等,也可以直接使用这部分来进行操作,但是它们写起来没有ORM那么自然 - DBAPI,这部分就是数据库驱动
它们的关系如下(图片来自官网):

我们先来看看一个简单的例子:
import contextlib
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import (
create_engine,
Column,
Integer,
DateTime,
String,
)
from config import config # config模块里有自己写的配置,我们可以换成别的,注意下面用到config的地方也要一起换
engine = create_engine(
config.SQLALCHEMY_DATABASE_URI, # SQLAlchemy 数据库连接串,格式见下面
echo=bool(config.SQLALCHEMY_ECHO), # 是不是要把所执行的SQL打印出来,一般用于调试
pool_size=int(config.SQLALCHEMY_POOL_SIZE), # 连接池大小
max_overflow=int(config.SQLALCHEMY_POOL_MAX_SIZE), # 连接池最大的大小
pool_recycle=int(config.SQLALCHEMY_POOL_RECYCLE), # 多久时间主动回收连接,见下注释
)
Session = sessionmaker(bind=engine)
Base = declarative_base(engine)
class BaseMixin:
"""model的基类,所有model都必须继承"""
id = Column(Integer, primary_key=True)
created_at = Column(DateTime, nullable=False, default=datetime.datetime.now)
updated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, onupdate=datetime.datetime.now, index=True)
deleted_at = Column(DateTime) # 可以为空, 如果非空, 则为软删
@contextlib.contextmanager
def get_session():
s = Session()
try:
yield s
s.commit()
except Exception as e:
s.rollback()
raise e
finally:
s.close()
class User(Base, BaseMixin):
__tablename__ = "user"
Name = Column(String(36), nullable=False)
Phone = Column(String(36), nullable=False, unique=True)
我们注意上面的几点:
pool_recycle,设置主动回收连接的时长,如果不设置,那么可能会遇到数据库主动断开连接的问题,例如MySQL通常会为连接设置 最大生命周期为八小时,如果没有通信,那么就会断开连接。因此不设置此选项可能就会遇到
MySQL has gone away的报错。engine,engine是SQLAlchemy 中位于数据库驱动之上的一个抽象概念,它适配了各种数据库驱动,提供了连接池等功能。其用法就是 如上面例子中,
engine = create_engine(<数据库连接串>),数据库连接串的格式是
dialect+driver://username:password@host:port/database?参数这样的,dialect 可以是
mysql,postgresql,oracle,mssql,sqlite,后面的 driver 是驱动,比如MySQL的驱动pymysql, 如果不填写,就使用默认驱动。再往后就是用户名、密码、地址、端口、数据库、连接参数了,我们来看几个例子:- MySQL:
engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo?charset=utf8mb4') - PostgreSQL:
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase') - Oracle:
engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname') - MS SQL:
engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname') - SQLite:
engine = create_engine('sqlite:////absolute/path/to/foo.db') - 详见:https://docs.sqlalchemy.org/en/13/core/engines.html
- MySQL:
Session
- Session的意思就是会话,也就是说,是一个逻辑组织的概念,因此,这需要靠你的业务逻辑来划分哪些操作使用同一个Session, 哪些操作又划分为不同的业务操作,详见 这里。 举个简单的例子,以web应用为例,一个请求里共用一个Session就是一个好的例子,一个异步任务执行过程中使用一个Session也是一个例子。 但是**注意,不能直接使用Session,而是使用Session的实例**,借助上面的代码,我们可以直接这样写:
with get_session() as s:
print(s.query(User).first())
session
sqlalchemy中使用session用于创建程序和数据库之间的会话,所有对象的载入和保存都需要通过session对象 。
通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源:
from sqlalchemy.orm import sessionmaker # 创建session DbSession = sessionmaker(bind=engine) session = DbSession()session的常见操作方法包括:
- flush:预提交,提交到数据库文件,还未写入数据库文件中
- commit:提交了一个事务
- rollback:回滚
- close:关闭
session对象包括三个属性:
- new:刚加入会话的对象
- dirty:刚被修改的对象
- deleted:在会话中被删除的对象
三个属性共同的特点就是内存的数据和数据库数据不一致,也就是对象处于pending状态,这也就表明了session保存了所有对象处于pending状态的强引用。
Base,Base是ORM中的一个基类,通过集成Base,我们才能方便的使用一些基本的查询,例如
s.query(User).filter_by(User.name="nick").first()。BaseMixin,BaseMixin是我自己定义的一些通用的表结构,通过Mixin的方式集成到类里,比如上面的定义,我们常见的表结构里,都会有 ID、创建时间,更新时间,软删除标志等等,我们把它作为一个独立的类,这样通过继承即可获得相关表属性,省得重复写多次。
表的设计
表的设计通常就如 User 表一样:
class User(Base, BaseMixin):
__tablename__ = "user"
Name = Column(String(36), nullable=False)
Phone = Column(String(36), nullable=False, unique=True)
首先使用 __tablename__ 自定义表名,接着写各个表中的属性,也就是对应在数据库表中的列(column),常见的类型有:
$ egrep '^class ' ~/.pyenv/versions/3.6.0/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py
class _LookupExpressionAdapter(object):
class Concatenable(object):
class Indexable(object):
class String(Concatenable, TypeEngine):
class Text(String):
class Unicode(String):
class UnicodeText(Text):
class Integer(_LookupExpressionAdapter, TypeEngine):
class SmallInteger(Integer):
class BigInteger(Integer):
class Numeric(_LookupExpressionAdapter, TypeEngine):
class Float(Numeric):
class DateTime(_LookupExpressionAdapter, TypeEngine):
class Date(_LookupExpressionAdapter, TypeEngine):
class Time(_LookupExpressionAdapter, TypeEngine):
class _Binary(TypeEngine):
class LargeBinary(_Binary):
class Binary(LargeBinary):
class SchemaType(SchemaEventTarget):
class Enum(Emulated, String, SchemaType):
class PickleType(TypeDecorator):
class Boolean(Emulated, TypeEngine, SchemaType):
class _AbstractInterval(_LookupExpressionAdapter, TypeEngine):
class Interval(Emulated, _AbstractInterval, TypeDecorator):
class JSON(Indexable, TypeEngine):
class ARRAY(SchemaEventTarget, Indexable, Concatenable, TypeEngine):
class REAL(Float):
class FLOAT(Float):
class NUMERIC(Numeric):
class DECIMAL(Numeric):
class INTEGER(Integer):
class SMALLINT(SmallInteger):
class BIGINT(BigInteger):
class TIMESTAMP(DateTime):
class DATETIME(DateTime):
class DATE(Date):
class TIME(Time):
class TEXT(Text):
class CLOB(Text):
class VARCHAR(String):
class NVARCHAR(Unicode):
class CHAR(String):
class NCHAR(Unicode):
class BLOB(LargeBinary):
class BINARY(_Binary):
class VARBINARY(_Binary):
class BOOLEAN(Boolean):
class NullType(TypeEngine):
class MatchType(Boolean):
常见操作
我们来看看使用SQLAlchemy完成常见的操作,例如增删查改:
查询操作
SELECT * FROM user应该这样写:
with get_session() as s:
print(s.query(User).all())
SELECT * FROM user WHERE name='nick'应该这样写:
with get_session() as s:
print(s.query(User).filter_by(User.name='nick').all())
print(s.query(User).filter(User.name == 'nick').all()) # 这样写是等同效果的
SELECT * FROM user WHERE name='nick' LIMIT 1应该这样写:
with get_session() as s:
print(s.query(User).filter_by(User.name='nick').first())
如果需要加判定,例如确保只有一条数据,那就把 first() 替换为 one(),如果确保一行或者没有,那就写 one_or_none()。
SELECT * FROM user ORDER BY id DESC LIMIT 1应该这样写:
with get_session() as s:
print(s.query(User).order_by(User.id.desc()).first())
SELECT * FROM user ORDER BY id DESC LIMIT 1 OFFSET 20应该这样写:
with get_session() as s:
print(s.query(User).order_by(User.id.desc()).offset(20).first())
删除操作
DELETE FROM user应该这样写:
with get_session() as s:
s.query(User).delete()
DELETE FROM user WHERE name='nick':
with get_session() as s:
s.query(User).filter_by(User.name='nick').delete()
DELETE FROM user WHERE name='nick' LIMIT 1:
with get_session() as s:
s.query(User).filter_by(User.name='nick').limit(1).delete()
更新语句
UPDATE user SET name='nick':
with get_session() as s:
s.query(User).update({'name': 'nick'})
UPDATE user SET name='nick' WHERE id=1:
with get_session() as s:
s.query(User).filter_by(User.id=1).update({'name': 'nick'})
也可以通过更改实例的属性,然后提交:
with get_session() as s:
user = s.query(User).filter_by(User.id=1).one()
user.name = 'nick'
s.commit()
插入语句
这个就简单了,实例化对象,然后 session.add,最后提交:
with get_session() as s:
user = User()
s.add(user)
s.commit()
连表
SQLAlchemy 中可以直接使用join语句:
with get_session() as s:
s.query(Customer).join(Invoice).filter(Invoice.amount == 8500)
可以是这么几种写法:
query.join(Address, User.id==Address.user_id) # explicit condition
query.join(User.addresses) # specify relationship from left to right
query.join(Address, User.addresses) # same, with explicit target
query.join('addresses') # same, using a string
数据库migration
我们使用alembic来做数据库migration,首先安装:
$ pip install alembic
$ alembic init alembic # 此处 alembic init 后接的是保存migration的文件夹名称
然后我们要修改 alembic/env.py (假设你设置的保存migration的文件夹名称就是 alembic),将对应部分修改成如下:
config.set_main_option(
'sqlalchemy.url', config.SQLALCHEMY_DATABASE_URI
)
target_metadata = Base.metadata # 从任意一个我们的model可以拿到总的Base
engine = target_metadata.bind
因为SQLAlchemy会把表的信息存储在 metadata 里,而我们都继承了 Base,因此可以 通过 Base.metadata 来拿到所有表的信息,这样子alembic才能够拿到表的结构,然后和 数据库进行对比,生成migration脚本:
$ alembic revision --autogenerate -m '本次migration的信息,相当于git提交时的评论'
总结
这一篇中我们看了如何使用SQLAlchemy来进行常见的操作,我们首先从如何定义表开始,接着我们注意看了常见的SQL操作对应的 SQLAlchemy操作是怎样的,最后我们看了以下alembic应该怎么配置才能自动生成migration脚本。
参考资料: