我们将使用新的函数定义功能为 Amazon Bedrock 创建一个代理。我们将以 HR 代理为例。使用此代理,我们可以查看员工可用的假期天数并请求新的休假。我们将使用 AWS Lambda 函数来定义检查可用假期天数和确认新的休假时间的逻辑。我们将在 SQLite 数据库中管理员工数据,并生成模拟数据来演示agent。
现在让我们检查 boto3 版本以确保已安装正确的版本。我们的版本应该大于或等于 1.34.90。
import boto3
import json
import time
import zipfile
from io import BytesIO
import uuid
import pprint
import logging
# 设置日志记录器
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# 获取所需 AWS 服务的 boto3 客户端
sts_client = boto3.client('sts')
iam_client = boto3.client('iam')
lambda_client = boto3.client('lambda')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
# 为代理和要创建的 lambda 函数设置一些配置变量
session = boto3.session.Session()
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
# 配置变量
suffix = f"{region}-{account_id}"
agent_name = "hr-assistant-function-def"
agent_bedrock_allow_policy_name = f"{agent_name}-ba-{suffix}"
agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{agent_name}'
agent_foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
agent_description = "为管理假期时间提供 HR 协助的代理"
agent_instruction = "你是一名 HR 代理,帮助员工了解 HR 政策并管理假期时间"
agent_action_group_name = "VacationsActionGroup"
agent_action_group_description = "获取员工可用假期天数和确认新时间的操作"
agent_alias_name = f"{agent_name}-alias"
lambda_function_role = f'{agent_name}-lambda-role-{suffix}'
lambda_function_name = f'{agent_name}-{suffix}'
我们现在将创建一个 lambda 函数,它与 SQLite 文件 employee_database.db
进行交互。为此,我们将:
employee_database.db
文件,并生成一些数据。lambda_function.py
文件。# 创建将由 lambda 函数使用的员工数据库
import sqlite3
import random
from datetime import date, timedelta
# 连接到 SQLite 数据库(如果不存在则创建一个新的)
conn = sqlite3.connect('employee_database.db')
c = conn.cursor()
# 创建 employees 表
c.execute('''CREATE TABLE IF NOT EXISTS employees
(employee_id INTEGER PRIMARY KEY AUTOINCREMENT, employee_name TEXT, employee_job_title TEXT, employee_start_date TEXT, employee_employment_status TEXT)''')
# 创建 vacations 表
c.execute('''CREATE TABLE IF NOT EXISTS vacations
(employee_id INTEGER, year INTEGER, employee_total_vacation_days INTEGER, employee_vacation_days_taken INTEGER, employee_vacation_days_available INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')
# 创建 planned_vacations 表
c.execute('''CREATE TABLE IF NOT EXISTS planned_vacations
(employee_id INTEGER, vacation_start_date TEXT, vacation_end_date TEXT, vacation_days_taken INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')
# 为 10 名员工生成一些随机数据
employee_names = ['John Doe', 'Jane Smith', 'Bob Johnson', 'Alice Williams', 'Tom Brown', 'Emily Davis', 'Michael Wilson', 'Sarah Taylor', 'David Anderson', 'Jessica Thompson']
job_titles = ['Manager', 'Developer', 'Designer', 'Analyst', 'Accountant', 'Sales Representative']
employment_statuses = ['Active', 'Inactive']
for i in range(10):
name = employee_names[i]
job_title = random.choice(job_titles)
start_date = date(2015 + random.randint(0, 7), random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
employment_status = random.choice(employment_statuses)
c.execute("INSERT INTO employees (employee_name, employee_job_title, employee_start_date, employee_employment_status) VALUES (?, ?, ?, ?)", (name, job_title, start_date, employment_status))
employee_id = c.lastrowid
# 为当前员工生成假期数据
for year in range(date.today().year, date.today().year - 3, -1):
total_vacation_days = random.randint(10, 30)
days_taken = random.randint(0, total_vacation_days)
days_available = total_vacation_days - days_taken
c.execute("INSERT INTO vacations (employee_id, year, employee_total_vacation_days, employee_vacation_days_taken, employee_vacation_days_available) VALUES (?, ?, ?, ?, ?)", (employee_id, year, total_vacation_days, days_taken, days_available))
# 为当前员工和年份生成一些计划的假期
num_planned_vacations = random.randint(0, 3)
for _ in range(num_planned_vacations):
start_date = date(year, random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
end_date = (date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])) + timedelta(days=random.randint(1, 14))).strftime('%Y-%m-%d')
days_taken = (date(int(end_date[:4]), int(end_date[5:7]), int(end_date[8:])) - date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])))
c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, days_taken.days))
# 提交更改并关闭连接
conn.commit()
conn.close()
现在让我们创建我们的 lambda 函数。它实现了 get_available_vacations_days
功能,用于获取给定员工 ID 的可用假期天数,以及 book_vacations
功能,用于为员工预订假期时间。
%%writefile lambda_function.py
import os
import json
import shutil
import sqlite3
from datetime import datetime
def get_available_vacations_days(employee_id):
# 连接到 SQLite 数据库
conn = sqlite3.connect('/tmp/employee_database.db')
c = conn.cursor()
if employee_id:
# 获取员工的可用假期天数
c.execute("""
SELECT employee_vacation_days_available
FROM vacations
WHERE employee_id = ?
ORDER BY year DESC
LIMIT 1
""", (employee_id,))
available_vacation_days = c.fetchone()
if available_vacation_days:
available_vacation_days = available_vacation_days[0] # 解包元组
print(f"employee_id {employee_id} 的可用假期天数: {available_vacation_days}")
conn.close()
return available_vacation_days
else:
return_msg = f"没有找到 employed_id {employee_id} 的假期数据"
print(return_msg)
return return_msg
conn.close()
else:
raise Exception(f"未提供员工 id")
# 关闭数据库连接
conn.close()
def reserve_vacation_time(employee_id, start_date, end_date):
# 连接到 SQLite 数据库
conn = sqlite3.connect('/tmp/employee_database.db')
c = conn.cursor()
try:
# 计算假期天数
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
vacation_days = (end_date - start_date).days + 1
# 获取当前年份
current_year = start_date.year
# 检查员工是否存在
c.execute("SELECT * FROM employees WHERE employee_id = ?", (employee_id,))
employee = c.fetchone()
if employee is None:
return_msg = f"ID 为 {employee_id} 的员工不存在。"
print(return_msg)
conn.close()
return return_msg
# 检查员工在当前年份是否有足够的假期天数
c.execute("SELECT employee_vacation_days_available FROM vacations WHERE employee_id = ? AND year = ?", (employee_id, current_year))
available_days = c.fetchone()
if available_days is None or available_days[0] < vacation_days:
return_msg = f"ID 为 {employee_id} 的员工在请求的时间段内没有足够的假期天数。"
print(return_msg)
conn.close()
return return_msg
# 将新假期插入 planned_vacations 表
c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, vacation_days))
# 在 vacations 表中更新新的假期天数
c.execute("UPDATE vacations SET employee_vacation_days_taken = employee_vacation_days_taken + ?, employee_vacation_days_available = employee_vacation_days_available - ? WHERE employee_id = ? AND year = ?", (vacation_days, vacation_days, employee_id, current_year))
conn.commit()
print(f"成功为 ID 为 {employee_id} 的员工保存了从 {start_date} 到 {end_date} 的假期。")
# 关闭数据库连接
conn.close()
return f"成功为 ID 为 {employee_id} 的员工保存了从 {start_date} 到 {end_date} 的假期。"
except Exception as e:
raise Exception(f"发生错误: {e}")
conn.rollback()
# 关闭数据库连接
conn.close()
return f"发生错误: {e}"
def lambda_handler(event, context):
original_db_file = 'employee_database.db'
target_db_file = '/tmp/employee_database.db'
if not os.path.exists(target_db_file):
shutil.copy2(original_db_file, target_db_file)
agent = event['agent']
actionGroup = event['actionGroup']
function = event['function']
parameters = event.get('parameters', [])
responseBody = {
"TEXT": {
"body": "Error, no function was called"
}
}
if function == 'get_available_vacations_days':
employee_id = None
for param in parameters:
if param["name"] == "employee_id":
employee_id = param["value"]
if not employee_id:
raise Exception("Missing mandatory parameter: employee_id")
vacation_days = get_available_vacations_days(employee_id)
responseBody = {
'TEXT': {
"body": f"employee_id {employee_id} 的可用假期天数: {vacation_days}"
}
}
elif function == 'reserve_vacation_time':
employee_id = None
start_date = None
end_date = None
for param in parameters:
if param["name"] == "employee_id":
employee_id = param["value"]
if param["name"] == "start_date":
start_date = param["value"]
if param["name"] == "end_date":
end_date = param["value"]
if not employee_id:
raise Exception("Missing mandatory parameter: employee_id")
if not start_date:
raise Exception("Missing mandatory parameter: start_date")
if not end_date:
raise Exception("Missing mandatory parameter: end_date")
completion_message = reserve_vacation_time(employee_id, start_date, end_date)
responseBody = {
'TEXT': {
"body": completion_message
}
}
action_response = {
'actionGroup': actionGroup,
'function': function,
'functionResponse': {
'responseBody': responseBody
}
}
function_response = {'response': action_response, 'messageVersion': event['messageVersion']}
print("Response: {}".format(function_response))
return function_response
接下来,让我们创建 lambda IAM 角色和策略,以便调用 Bedrock 模型。
# 创建 Lambda 函数的 IAM 角色
try:
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
assume_role_policy_document_json = json.dumps(assume_role_policy_document)