提示和会话参数

Bedrock Agent提供两种会话属性来维护对话上下文:

  • sessionAttributes
  • promptSessionAttributes

在这个笔记本中,我们将演示如何使用这些属性来个性化对话并简化对话流程。

我们假设用户通过SSO身份验证进入系统,这为代理提供了他们的名字、姓氏和员工ID。我们可以将这些信息存储在sessionAttributes中,以个性化整个对话过程中的用户体验。

用户将能够按照之前会话中设计的方式与HR代理进行交互。他们可以查看可用的假期天数或申请新的假期休假。

代理能够使用PromptSessionAttributes获取时间上下文,具体来说,它在PromptSessionAttributes中存储了CurrentDate信息。如果用户询问相对信息,如"明天”,代理可以确定"明天"指的是确切的日期。

先决条件

在开始之前,让我们更新botocore和boto3软件包,以确保我们有最新版本。

!python3 -m pip install --upgrade -q botocore
!python3 -m pip install --upgrade -q boto3
!python3 -m pip install --upgrade -q awscli

现在让我们检查boto3版本,以确保安装了正确的版本。我们的版本应该大于或等于1.34.90。

import boto3
import json
import time
import zipfile
from io import BytesIO
import uuid
import pprint
import logging
from datetime import datetime
print(boto3.__version__)
# 设置日志记录器
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

现在让我们为所需的AWS服务创建boto3客户端。

# 获取boto3客户端以访问所需的AWS服务
sts_client = boto3.client('sts')
iam_client = boto3.client('iam')
lambda_client = boto3.client('lambda')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')

接下来,我们可以为代理和要创建的lambda函数设置一些配置变量。

session = boto3.session.Session()
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
region, account_id
# 配置变量
suffix = f"{region}-{account_id}"
agent_name = "hr-assistant-function-def"
agent_bedrock_allow_policy_name = f"{agent_name}-ba-{suffix}"
agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{agent_name}'
agent_foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
agent_description = "提供HR协助以预订假期时间的代理"
agent_instruction = "你是一名HR代理,帮助员工了解HR政策并管理假期时间"
agent_action_group_name = "VacationsActionGroup"
agent_action_group_description = "获取员工可用假期天数和预订新假期时间的操作"
agent_alias_name = f"{agent_name}-alias"
lambda_function_role = f'{agent_name}-lambda-role-{suffix}'
lambda_function_name = f'{agent_name}-{suffix}'

创建Lambda函数

我们现在将创建一个与SQLite文件employee_database.db交互的lambda函数。为此,我们将:

  1. 创建包含员工数据库的employee_database.db文件,并生成一些数据。
  2. 创建lambda_function.py文件,其中包含lambda函数的逻辑。
  3. 创建Lambda函数的IAM角色。
  4. 创建具有所需权限的lambda函数基础设施。
# 创建将由lambda函数使用的员工数据库
import sqlite3
import random
from datetime import date, timedelta

# 连接到SQLite数据库(如果不存在则创建一个新的)
conn = sqlite3.connect('employee_database.db')
c = conn.cursor()

# 创建employees表
c.execute('''CREATE TABLE IF NOT EXISTS employees
                (employee_id INTEGER PRIMARY KEY AUTOINCREMENT, employee_name TEXT, employee_job_title TEXT, employee_start_date TEXT, employee_employment_status TEXT)''')

# 创建vacations表
c.execute('''CREATE TABLE IF NOT EXISTS vacations
                (employee_id INTEGER, year INTEGER, employee_total_vacation_days INTEGER, employee_vacation_days_taken INTEGER, employee_vacation_days_available INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')

# 创建planned_vacations表
c.execute('''CREATE TABLE IF NOT EXISTS planned_vacations
                (employee_id INTEGER, vacation_start_date TEXT, vacation_end_date TEXT, vacation_days_taken INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')

# 为10名员工生成一些随机数据
employee_names = ['John Doe', 'Jane Smith', 'Bob Johnson', 'Alice Williams', 'Tom Brown', 'Emily Davis', 'Michael Wilson', 'Sarah Taylor', 'David Anderson', 'Jessica Thompson']
job_titles = ['Manager', 'Developer', 'Designer', 'Analyst', 'Accountant', 'Sales Representative']
employment_statuses = ['Active', 'Inactive']

for i in range(10):
    name = employee_names[i]
    job_title = random.choice(job_titles)
    start_date = date(2015 + random.randint(0, 7), random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
    employment_status = random.choice(employment_statuses)
    c.execute("INSERT INTO employees (employee_name, employee_job_title, employee_start_date, employee_employment_status) VALUES (?, ?, ?, ?)", (name, job_title, start_date, employment_status))
    employee_id = c.lastrowid

    # 为当前员工生成假期数据
    for year in range(date.today().year, date.today().year - 3, -1):
        total_vacation_days = random.randint(10, 30)
        days_taken = random.randint(0, total_vacation_days)
        days_available = total_vacation_days - days_taken
        c.execute("INSERT INTO vacations (employee_id, year, employee_total_vacation_days, employee_vacation_days_taken, employee_vacation_days_available) VALUES (?, ?, ?, ?, ?)", (employee_id, year, total_vacation_days, days_taken, days_available))

        # 为当前员工和年份生成一些计划的假期
        num_planned_vacations = random.randint(0, 3)
        for _ in range(num_planned_vacations):
            start_date = date(year, random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
            end_date = (date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])) + timedelta(days=random.randint(1, 14))).strftime('%Y-%m-%d')
            days_taken = (date(int(end_date[:4]), int(end_date[5:7]), int(end_date[8:])) - date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])))
            c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, days_taken.days))

# 提交更改并关闭连接
conn.commit()
conn.close()

现在让我们创建我们的lambda函数。它实现了get_available_vacations_days功能,用于获取给定employee_id的可用假期天数,以及reserve_vacation_time功能,用于给定员工的开始和结束日期预订假期时间。

%%writefile lambda_function.py
import os
import json
import shutil
import sqlite3
from datetime import datetime

def get_available_vacations_days(employee_id):
    # 连接到SQLite数据库
    conn = sqlite3.connect('/tmp/employee_database.db')
    c = conn.cursor()

    if employee_id:

        # 获取员工的可用假期天数
        c.execute("""
            SELECT employee_vacation_days_available
            FROM vacations
            WHERE employee_id = ?
            ORDER BY year DESC
            LIMIT 1
        """, (employee_id,))

        available_vacation_days = c.fetchone()

        if available_vacation_days:
            available_vacation_days = available_vacation_days[0]  # 解包元组
            print(f"employee_id {employee_id}的可用假期天数: {available_vacation_days}")
            conn.close()
            return available_vacation_days
        else:
            return_msg = f"没有找到employee_id {employee_id}的假期数据"
            print(return_msg)
            conn.close()
            return return_msg
    else:
        raise Exception(f"未提供employee id")

    # 关闭数据库连接
    conn.close()
    
    
def reserve_vacation_time(employee_id, start_date, end_date):
    # 连接到SQLite数据库

    conn = sqlite3.connect('/tmp/employee_database.db')
    c = conn.cursor()
    try:
        # 计算假期天数
        start_date = datetime.strptime(start_date, '%Y-%m-%d')
        end_date = datetime.strptime(end_date, '%Y-%m-%d')
        vacation_days = (end_date - start_date).days + 1

        # 获取当前年份
        current_year = start_date.year

        # 检查员工是否存在
        c.execute("SELECT * FROM employees WHERE employee_id = ?", (employee_id,))
        employee = c.fetchone()
        if employee is None:
            return_msg = f"employee_id {employee_id}的员工不存在。"
            print(return_msg)
            conn.close()
            return

        # 检查员工在当前年份是否有足够的可用假期天数
        c.execute("SELECT employee_vacation_days_available FROM vacations WHERE employee_id = ? AND year = ?", (employee_id, current_year))
        available_days = c.fetchone()
        if available_days is None or available_days[0] < vacation_days:
            return_msg = f"employee_id {employee_id}的员工在请求的时间段内没有足够的可用假期天数。"
            print(return_msg)
            conn.close()
            return

        # 将新假期插入planned_vacations表
        c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, vacation_days))

        # 在vacations表中更新已用假期天数
        c.execute("UPDATE vacations SET employee_vacation_days_taken = employee_vacation_days_taken + ?, employee_vacation_days_available = employee_vacation_days_available - ? WHERE employee_id = ? AND year = ?", (vacation_days, vacation_days, employee_id, current_year))

        conn.commit()
        return_msg = f"成功为employee_id {employee_id}的员工从{start_date}到{end_date}预订了假期。"
        print(return_msg)
        # 关闭数据库连接
        conn.close()
        return return_msg
    except Exception as e:
        raise Exception(f"发生错误: {e}")
        conn.rollback()
        # 关闭数据库连接
        conn.close()
        return f"发生错误: {e}"
        

def lambda_handler(event, context):
    original_db_file = 'employee_database.db'
    target_db_file = '/tmp/employee_database.db'
    if not os.path.exists(target_db_file):
        shutil.copy2(original_db_file, target_db_file)
        
    # 检索代理会话属性以获取上下文 
    session_attributes = event.get('sessionAttributes', {})
    first_name = session_attributes.get('firstName', '')
    last_name = session_attributes.get('lastName', '')
    employee_id = session_attributes.get('employeeId', '')
    current_date = event.get('promptSessionAttributes', {}).get('currentDate', '')
    
    agent = event['agent']
    actionGroup = event['actionGroup']
    function = event['function']
    parameters = event.get('parameters', [])
    responseBody =  {
        "TEXT": {
            "body": "错误,没有调用任何函数"
        }
    }
    
    if function == 'get_available_vacations_days':
        ## 如果我们不使用会话属性,我们需要将employee ID作为参数传递

        # employee_id = None
        # for param in parameters:
        #     if param["name"] == "employee_id":
        #         employee_id = param["value"]

        if not employee_id:
            raise Exception("缺少必需参数: employee_id")
        vacation_days = get_available_vacations_days(employee_id)
        responseBody =  {
            'TEXT': {
                "body": f"employee_id {employee_id}的可用假期天数: {vacation_days}"
            }
        }
    elif function == 'reserve_vacation_time':
        # employee_id = None
        start_date = None
        end_date = None
        for param in parameters:
            # if param["name"] == "employee_id":
            #     employee_id = param["value"]
            if param["name"] == "start_date":
                start_date = param["value"]
            if param["name"] == "end_date":
                end_date = param["value"]
            
        # if not employee_id:
        #     raise Exception("缺少必需参数: employee_id")
        if not start_date:
            raise Exception("缺少必需参数: start_date")
        if not end_date:
            raise Exception("缺少必需参数: end_date")
        
        completion_message = reserve_vacation_time(employee_id, start_date, end_date)
        responseBody =  {
            'TEXT': {
                "body": completion_message
            }
        }  
    action_response = {
        'actionGroup': actionGroup,
        'function': function,
        'functionResponse': {
            'responseBody': responseBody
        }

    }

    function_response = {
                    'response': action_response,
                    'messageVersion': event['messageVersion'],
                    'session