TODO1:完善Test1-ORM.py文件
1. 数据模型定义
根据提供的MySQL表结构,补充了以下ORM模型类:
- Employees:
完整实现了emp_no(主键)、birth_date、first_name、last_name、gender、hire_date字段,并添加了first_name字段的索引。
class Employees(db.Model):
emp_no = db.Column(db.INTEGER, primary_key=True)
birth_date = db.Column(db.DATE, nullable=False)
first_name = db.Column(db.String(14), nullable=False)
last_name = db.Column(db.String(16), nullable=False)
gender = db.Column(db.Enum('M', 'F'), nullable=False)
hire_date = db.Column(db.DATE, nullable=False)
__table_args__ = (db.Index('idx_employees_name', 'first_name'),)
- Dept_emp/Dept_manager/Titles:
完整实现复合主键、外键约束及字段定义,例如:
class Dept_emp(db.Model):
emp_no = db.Column(db.Integer, db.ForeignKey('employees.emp_no', ondelete='CASCADE'), primary_key=True)
dept_no = db.Column(db.CHAR(4), db.ForeignKey('departments.dept_no', ondelete='CASCADE'), primary_key=True)
from_date = db.Column(db.DATE, nullable=False)
to_date = db.Column(db.DATE, nullable=False)
- Dept_manager_title:
单独定义用于触发器维护的表:
class Dept_manager_title(db.Model):
emp_no = db.Column(db.Integer, primary_key=True)
from_date = db.Column(db.Date, nullable=False)
to_date = db.Column(db.Date)
2. 触发器实现
通过MySQL触发器实现dept_manager与dept_manager_title的数据同步:
插入触发器
CREATE TRIGGER ts_insert AFTER INSERT ON dept_manager
FOR EACH ROW
BEGIN
INSERT INTO dept_manager_title (emp_no, from_date, to_date)
VALUES (NEW.emp_no, NEW.from_date, NEW.to_date)
ON DUPLICATE KEY UPDATE to_date = NEW.to_date;
END;
- 逻辑:当向
dept_manager插入数据时,自动插入或更新dept_manager_title表。
删除触发器
CREATE TRIGGER ts_delete AFTER DELETE ON dept_manager
FOR EACH ROW
DELETE FROM dept_manager_title WHERE emp_no = OLD.emp_no;
- 逻辑:当从
dept_manager删除数据时,同步删除dept_manager_title中对应记录。
3. 数据导入逻辑
通过insert_db()函数实现数据导入,步骤如下:
数据读取与模型映射
def read_csv_file(file_path):
with open(file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
return [row for row in reader]
# 示例:employees表数据导入
reader = read_csv_file('./employees.csv')
rows = []
for row in reader:
employee = Employees(
emp_no=row['emp_no'],
birth_date=row['birth_date'],
first_name=row['first_name'],
last_name=row['last_name'],
gender=row['gender'],
hire_date=row['hire_date']
)
rows.append(employee)
session.bulk_save_objects(rows) # 批量插入
session.commit()
- 批量操作:使用
bulk_save_objects一次性提交多条记录,减少数据库连接开销。 - 事务提交:每张表插入完成后调用
session.commit()确保数据持久化。
TODO2:完善Test2-RESTful.py文件
**1. 数据更新接口(PUT)
**
接口地址:/api/v1/<table_name>
方法:PUT
实现逻辑:
- 表名验证:检查请求的表名是否在预定义的
TABLE_CONFIG中。 - 主键验证:确保请求体包含所有主键字段(如
employees表的emp_no)。 - 参数化SQL:防止SQL注入攻击。
# 示例代码片段
set_clause = ", ".join([f"{field} = %s" for field in update_fields])
where_clause = " AND ".join([f"{pk} = %s" for pk in pk_fields])
sql = f"UPDATE {table_name} SET {set_clause} WHERE {where_clause}"
cursor.execute(sql, values) # values包含更新字段和主键值
2. 数据删除接口(DELETE)
接口地址:/api/v1/<table_name>/<path:args>
方法:DELETE
实现逻辑:
- 复合主键解析:将URL路径参数按
/拆分(如/dept_emp/10001/d001)。 - 动态构建WHERE条件:
# 示例代码片段
pk_values = args.split('/')
conditions = " AND ".join([f"{field} = %s" for field in pk_fields])
sql = f"DELETE FROM {table_name} WHERE {conditions}"
cursor.execute(sql, pk_values)
- 结果验证:通过
cursor.rowcount检查是否实际删除了数据。
3. 数据查询接口(GET)
接口地址:
- 主键路径查询:
/api/v1/<table_name>/<path:pk_path> - 条件参数查询:
/api/v1/<table_name>?field=value
实现逻辑:
- 主键路径查询:
# 示例:查询dept_emp表中emp_no=10001且dept_no=d001的记录
pk_values = pk_path.split('/')
conditions = " AND ".join([f"{field} = %s" for field in pk_fields])
sql = f"SELECT * FROM {table_name} WHERE {conditions}"
cursor.execute(sql, pk_values)
- 条件参数查询:
# 示例:查询employees表中first_name=John的记录
field, value = next(iter(request.args.items()))
sql = f"SELECT * FROM {table_name} WHERE {field} = %s"
cursor.execute(sql, (value,))
3. 辅助工具与流程
3.1 启动脚本(start.sh)
#!/bin/bash
# 初始化数据库(仅创建表结构)
python ./src/Test1-ORM.py -o
echo "DB初始化完成"
# 启动RESTful服务
python ./src/Test2-RESTful.py &
FLASK_PID=$!
echo "Flask服务已启动,PID: $FLASK_PID"
# 等待服务就绪
while ! curl -s "http://127.0.0.1:5555" > /dev/null; do
sleep 1
done
# 导入CSV数据
python ./src/csv_import.py
echo "数据导入完成"
- 功能:自动化完成数据库初始化、服务启动和数据导入。
3.2 CSV数据导入脚本(csv_import.py)
def import_csv(file_path, table_name):
with open(file_path, 'r') as f:
reader = csv.DictReader(f)
batches = [rows[i:i+3000] for i in range(0, len(rows), 3000)] # 分批次导入
for batch in batches:
response = requests.post(f'{BASE_URL}/{table_name}', json={'rows': batch})
if response.status_code != 201:
raise Exception(f"导入失败: {response.text}")
- 分批次处理:每次提交3000条数据,避免内存溢出和超时问题。
4. 其他技术点
- ORM级联操作
- 使用
cascade='all, delete'实现外键级联删除。
- 使用
- 参数化查询
- 所有SQL语句均使用占位符(
%s),避免SQL注入。
- 所有SQL语句均使用占位符(
- 启动脚本优化
start.sh中通过curl检测服务启动状态,确保数据导入前服务已就绪。 - 错误处理与日志
所有数据库操作均包含try-except块,返回明确的HTTP状态码(如400、500)。