Bedrock Agent提供两种会话属性来维护对话上下文:
在这个笔记本中,我们将演示如何使用这些属性来个性化对话并简化对话流程。
我们假设用户通过SSO身份验证进入系统,这为代理提供了他们的名字、姓氏和员工ID。我们可以将这些信息存储在sessionAttributes
中,以个性化整个对话过程中的用户体验。
用户将能够按照之前会话中设计的方式与HR代理进行交互。他们可以查看可用的假期天数或申请新的假期休假。
代理能够使用PromptSessionAttributes
获取时间上下文,具体来说,它在PromptSessionAttributes
中存储了CurrentDate
信息。如果用户询问相对信息,如"明天”,代理可以确定"明天"指的是确切的日期。
在开始之前,让我们更新botocore和boto3软件包,以确保我们有最新版本。
!python3 -m pip install --upgrade -q botocore
!python3 -m pip install --upgrade -q boto3
!python3 -m pip install --upgrade -q awscli
现在让我们检查boto3版本,以确保安装了正确的版本。我们的版本应该大于或等于1.34.90。
import boto3
import json
import time
import zipfile
from io import BytesIO
import uuid
import pprint
import logging
from datetime import datetime
print(boto3.__version__)
# 设置日志记录器
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
现在让我们为所需的AWS服务创建boto3客户端。
# 获取boto3客户端以访问所需的AWS服务
sts_client = boto3.client('sts')
iam_client = boto3.client('iam')
lambda_client = boto3.client('lambda')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
接下来,我们可以为代理和要创建的lambda函数设置一些配置变量。
session = boto3.session.Session()
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
region, account_id
# 配置变量
suffix = f"{region}-{account_id}"
agent_name = "hr-assistant-function-def"
agent_bedrock_allow_policy_name = f"{agent_name}-ba-{suffix}"
agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{agent_name}'
agent_foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
agent_description = "提供HR协助以预订假期时间的代理"
agent_instruction = "你是一名HR代理,帮助员工了解HR政策并管理假期时间"
agent_action_group_name = "VacationsActionGroup"
agent_action_group_description = "获取员工可用假期天数和预订新假期时间的操作"
agent_alias_name = f"{agent_name}-alias"
lambda_function_role = f'{agent_name}-lambda-role-{suffix}'
lambda_function_name = f'{agent_name}-{suffix}'
我们现在将创建一个与SQLite文件employee_database.db
交互的lambda函数。为此,我们将:
employee_database.db
文件,并生成一些数据。lambda_function.py
文件,其中包含lambda函数的逻辑。# 创建将由lambda函数使用的员工数据库
import sqlite3
import random
from datetime import date, timedelta
# 连接到SQLite数据库(如果不存在则创建一个新的)
conn = sqlite3.connect('employee_database.db')
c = conn.cursor()
# 创建employees表
c.execute('''CREATE TABLE IF NOT EXISTS employees
(employee_id INTEGER PRIMARY KEY AUTOINCREMENT, employee_name TEXT, employee_job_title TEXT, employee_start_date TEXT, employee_employment_status TEXT)''')
# 创建vacations表
c.execute('''CREATE TABLE IF NOT EXISTS vacations
(employee_id INTEGER, year INTEGER, employee_total_vacation_days INTEGER, employee_vacation_days_taken INTEGER, employee_vacation_days_available INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')
# 创建planned_vacations表
c.execute('''CREATE TABLE IF NOT EXISTS planned_vacations
(employee_id INTEGER, vacation_start_date TEXT, vacation_end_date TEXT, vacation_days_taken INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')
# 为10名员工生成一些随机数据
employee_names = ['John Doe', 'Jane Smith', 'Bob Johnson', 'Alice Williams', 'Tom Brown', 'Emily Davis', 'Michael Wilson', 'Sarah Taylor', 'David Anderson', 'Jessica Thompson']
job_titles = ['Manager', 'Developer', 'Designer', 'Analyst', 'Accountant', 'Sales Representative']
employment_statuses = ['Active', 'Inactive']
for i in range(10):
name = employee_names[i]
job_title = random.choice(job_titles)
start_date = date(2015 + random.randint(0, 7), random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
employment_status = random.choice(employment_statuses)
c.execute("INSERT INTO employees (employee_name, employee_job_title, employee_start_date, employee_employment_status) VALUES (?, ?, ?, ?)", (name, job_title, start_date, employment_status))
employee_id = c.lastrowid
# 为当前员工生成假期数据
for year in range(date.today().year, date.today().year - 3, -1):
total_vacation_days = random.randint(10, 30)
days_taken = random.randint(0, total_vacation_days)
days_available = total_vacation_days - days_taken
c.execute("INSERT INTO vacations (employee_id, year, employee_total_vacation_days, employee_vacation_days_taken, employee_vacation_days_available) VALUES (?, ?, ?, ?, ?)", (employee_id, year, total_vacation_days, days_taken, days_available))
# 为当前员工和年份生成一些计划的假期
num_planned_vacations = random.randint(0, 3)
for _ in range(num_planned_vacations):
start_date = date(year, random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
end_date = (date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])) + timedelta(days=random.randint(1, 14))).strftime('%Y-%m-%d')
days_taken = (date(int(end_date[:4]), int(end_date[5:7]), int(end_date[8:])) - date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])))
c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, days_taken.days))
# 提交更改并关闭连接
conn.commit()
conn.close()
现在让我们创建我们的lambda函数。它实现了get_available_vacations_days
功能,用于获取给定employee_id的可用假期天数,以及reserve_vacation_time
功能,用于给定员工的开始和结束日期预订假期时间。
%%writefile lambda_function.py
import os
import json
import shutil
import sqlite3
from datetime import datetime
def get_available_vacations_days(employee_id):
# 连接到SQLite数据库
conn = sqlite3.connect('/tmp/employee_database.db')
c = conn.cursor()
if employee_id:
# 获取员工的可用假期天数
c.execute("""
SELECT employee_vacation_days_available
FROM vacations
WHERE employee_id = ?
ORDER BY year DESC
LIMIT 1
""", (employee_id,))
available_vacation_days = c.fetchone()
if available_vacation_days:
available_vacation_days = available_vacation_days[0] # 解包元组
print(f"employee_id {employee_id}的可用假期天数: {available_vacation_days}")
conn.close()
return available_vacation_days
else:
return_msg = f"没有找到employee_id {employee_id}的假期数据"
print(return_msg)
conn.close()
return return_msg
else:
raise Exception(f"未提供employee id")
# 关闭数据库连接
conn.close()
def reserve_vacation_time(employee_id, start_date, end_date):
# 连接到SQLite数据库
conn = sqlite3.connect('/tmp/employee_database.db')
c = conn.cursor()
try:
# 计算假期天数
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
vacation_days = (end_date - start_date).days + 1
# 获取当前年份
current_year = start_date.year
# 检查员工是否存在
c.execute("SELECT * FROM employees WHERE employee_id = ?", (employee_id,))
employee = c.fetchone()
if employee is None:
return_msg = f"employee_id {employee_id}的员工不存在。"
print(return_msg)
conn.close()
return
# 检查员工在当前年份是否有足够的可用假期天数
c.execute("SELECT employee_vacation_days_available FROM vacations WHERE employee_id = ? AND year = ?", (employee_id, current_year))
available_days = c.fetchone()
if available_days is None or available_days[0] < vacation_days:
return_msg = f"employee_id {employee_id}的员工在请求的时间段内没有足够的可用假期天数。"
print(return_msg)
conn.close()
return
# 将新假期插入planned_vacations表
c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, vacation_days))
# 在vacations表中更新已用假期天数
c.execute("UPDATE vacations SET employee_vacation_days_taken = employee_vacation_days_taken + ?, employee_vacation_days_available = employee_vacation_days_available - ? WHERE employee_id = ? AND year = ?", (vacation_days, vacation_days, employee_id, current_year))
conn.commit()
return_msg = f"成功为employee_id {employee_id}的员工从{start_date}到{end_date}预订了假期。"
print(return_msg)
# 关闭数据库连接
conn.close()
return return_msg
except Exception as e:
raise Exception(f"发生错误: {e}")
conn.rollback()
# 关闭数据库连接
conn.close()
return f"发生错误: {e}"
def lambda_handler(event, context):
original_db_file = 'employee_database.db'
target_db_file = '/tmp/employee_database.db'
if not os.path.exists(target_db_file):
shutil.copy2(original_db_file, target_db_file)
# 检索代理会话属性以获取上下文
session_attributes = event.get('sessionAttributes', {})
first_name = session_attributes.get('firstName', '')
last_name = session_attributes.get('lastName', '')
employee_id = session_attributes.get('employeeId', '')
current_date = event.get('promptSessionAttributes', {}).get('currentDate', '')
agent = event['agent']
actionGroup = event['actionGroup']
function = event['function']
parameters = event.get('parameters', [])
responseBody = {
"TEXT": {
"body": "错误,没有调用任何函数"
}
}
if function == 'get_available_vacations_days':
## 如果我们不使用会话属性,我们需要将employee ID作为参数传递
# employee_id = None
# for param in parameters:
# if param["name"] == "employee_id":
# employee_id = param["value"]
if not employee_id:
raise Exception("缺少必需参数: employee_id")
vacation_days = get_available_vacations_days(employee_id)
responseBody = {
'TEXT': {
"body": f"employee_id {employee_id}的可用假期天数: {vacation_days}"
}
}
elif function == 'reserve_vacation_time':
# employee_id = None
start_date = None
end_date = None
for param in parameters:
# if param["name"] == "employee_id":
# employee_id = param["value"]
if param["name"] == "start_date":
start_date = param["value"]
if param["name"] == "end_date":
end_date = param["value"]
# if not employee_id:
# raise Exception("缺少必需参数: employee_id")
if not start_date:
raise Exception("缺少必需参数: start_date")
if not end_date:
raise Exception("缺少必需参数: end_date")
completion_message = reserve_vacation_time(employee_id, start_date, end_date)
responseBody = {
'TEXT': {
"body": completion_message
}
}
action_response = {
'actionGroup': actionGroup,
'function': function,
'functionResponse': {
'responseBody': responseBody
}
}
function_response = {
'response': action_response,
'messageVersion': event['messageVersion'],
'session