中期大作业

TODO1:完善Test1-ORM.py文件

1. 数据模型定义

根据提供的MySQL表结构,补充了以下ORM模型类:

  • Employees
    完整实现了emp_no(主键)、birth_datefirst_namelast_namegenderhire_date字段,并添加了first_name字段的索引。
class Employees(db.Model):
	emp_no = db.Column(db.INTEGER, primary_key=True)
	birth_date = db.Column(db.DATE, nullable=False)
	first_name = db.Column(db.String(14), nullable=False)
	last_name = db.Column(db.String(16), nullable=False)
	gender = db.Column(db.Enum('M', 'F'), nullable=False)
	hire_date = db.Column(db.DATE, nullable=False)
	__table_args__ = (db.Index('idx_employees_name', 'first_name'),)
  • Dept_emp/Dept_manager/Titles
    完整实现复合主键、外键约束及字段定义,例如:
class Dept_emp(db.Model):
	emp_no = db.Column(db.Integer, db.ForeignKey('employees.emp_no', ondelete='CASCADE'), primary_key=True)
	dept_no = db.Column(db.CHAR(4), db.ForeignKey('departments.dept_no', ondelete='CASCADE'), primary_key=True)
	from_date = db.Column(db.DATE, nullable=False)
	to_date = db.Column(db.DATE, nullable=False)
  • Dept_manager_title
    单独定义用于触发器维护的表:
class Dept_manager_title(db.Model):
	emp_no = db.Column(db.Integer, primary_key=True)
	from_date = db.Column(db.Date, nullable=False)
	to_date = db.Column(db.Date)

2. 触发器实现

通过MySQL触发器实现dept_managerdept_manager_title的数据同步:

插入触发器

CREATE TRIGGER ts_insert AFTER INSERT ON dept_manager
FOR EACH ROW
BEGIN
    INSERT INTO dept_manager_title (emp_no, from_date, to_date)
    VALUES (NEW.emp_no, NEW.from_date, NEW.to_date)
    ON DUPLICATE KEY UPDATE to_date = NEW.to_date;
END;
  • 逻辑:当向dept_manager插入数据时,自动插入或更新dept_manager_title表。

删除触发器

CREATE TRIGGER ts_delete AFTER DELETE ON dept_manager
FOR EACH ROW
DELETE FROM dept_manager_title WHERE emp_no = OLD.emp_no;
  • 逻辑:当从dept_manager删除数据时,同步删除dept_manager_title中对应记录。

3. 数据导入逻辑

通过insert_db()函数实现数据导入,步骤如下:

数据读取与模型映射

def read_csv_file(file_path):
    with open(file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        return [row for row in reader]

# 示例:employees表数据导入
reader = read_csv_file('./employees.csv')
rows = []
for row in reader:
    employee = Employees(
        emp_no=row['emp_no'],
        birth_date=row['birth_date'],
        first_name=row['first_name'],
        last_name=row['last_name'],
        gender=row['gender'],
        hire_date=row['hire_date']
    )
    rows.append(employee)
session.bulk_save_objects(rows)  # 批量插入
session.commit()
  • 批量操作:使用bulk_save_objects一次性提交多条记录,减少数据库连接开销。
  • 事务提交:每张表插入完成后调用session.commit()确保数据持久化。

TODO2:完善Test2-RESTful.py文件

**1. 数据更新接口(PUT)

**
接口地址/api/v1/<table_name>
方法PUT
实现逻辑

  1. 表名验证:检查请求的表名是否在预定义的TABLE_CONFIG中。
  2. 主键验证:确保请求体包含所有主键字段(如employees表的emp_no)。
  3. 参数化SQL:防止SQL注入攻击。
# 示例代码片段
set_clause = ", ".join([f"{field} = %s" for field in update_fields])
where_clause = " AND ".join([f"{pk} = %s" for pk in pk_fields])
sql = f"UPDATE {table_name} SET {set_clause} WHERE {where_clause}"
cursor.execute(sql, values)  # values包含更新字段和主键值

2. 数据删除接口(DELETE)

接口地址/api/v1/<table_name>/<path:args>
方法DELETE
实现逻辑

  1. 复合主键解析:将URL路径参数按/拆分(如/dept_emp/10001/d001)。
  2. 动态构建WHERE条件
# 示例代码片段
pk_values = args.split('/')
conditions = " AND ".join([f"{field} = %s" for field in pk_fields])
sql = f"DELETE FROM {table_name} WHERE {conditions}"
cursor.execute(sql, pk_values)
  1. 结果验证:通过cursor.rowcount检查是否实际删除了数据。

3. 数据查询接口(GET)

接口地址

  • 主键路径查询:/api/v1/<table_name>/<path:pk_path>
  • 条件参数查询:/api/v1/<table_name>?field=value
    实现逻辑
  1. 主键路径查询
# 示例:查询dept_emp表中emp_no=10001且dept_no=d001的记录
pk_values = pk_path.split('/')
conditions = " AND ".join([f"{field} = %s" for field in pk_fields])
sql = f"SELECT * FROM {table_name} WHERE {conditions}"
cursor.execute(sql, pk_values)
  1. 条件参数查询
# 示例:查询employees表中first_name=John的记录
field, value = next(iter(request.args.items()))
sql = f"SELECT * FROM {table_name} WHERE {field} = %s"
cursor.execute(sql, (value,))

3. 辅助工具与流程

3.1 启动脚本(start.sh)

#!/bin/bash
# 初始化数据库(仅创建表结构)
python ./src/Test1-ORM.py -o
echo "DB初始化完成"

# 启动RESTful服务
python ./src/Test2-RESTful.py &
FLASK_PID=$!
echo "Flask服务已启动,PID: $FLASK_PID"

# 等待服务就绪
while ! curl -s "http://127.0.0.1:5555" > /dev/null; do
  sleep 1
done

# 导入CSV数据
python ./src/csv_import.py
echo "数据导入完成"
  • 功能:自动化完成数据库初始化、服务启动和数据导入。

3.2 CSV数据导入脚本(csv_import.py)

def import_csv(file_path, table_name):
    with open(file_path, 'r') as f:
        reader = csv.DictReader(f)
        batches = [rows[i:i+3000] for i in range(0, len(rows), 3000)]  # 分批次导入
        for batch in batches:
            response = requests.post(f'{BASE_URL}/{table_name}', json={'rows': batch})
            if response.status_code != 201:
                raise Exception(f"导入失败: {response.text}")
  • 分批次处理:每次提交3000条数据,避免内存溢出和超时问题。

4. 其他技术点

  1. ORM级联操作
    • 使用cascade='all, delete'实现外键级联删除。
  2. 参数化查询
    • 所有SQL语句均使用占位符(%s),避免SQL注入。
  3. 启动脚本优化
    start.sh中通过curl检测服务启动状态,确保数据导入前服务已就绪。
  4. 错误处理与日志
    所有数据库操作均包含try-except块,返回明确的HTTP状态码(如400、500)。
Built with MDFriday ❤️