使用函数定义、返回控制和用户确认创建代理

在这个笔记本中,我们将使用新的功能定义、获取用户确认功能在调用操作组函数之前的功能来为Amazon Bedrock创建一个代理。

我们将以HR代理为例。使用这个代理,我们可以申请新的休假。我们将使用一个本地函数来定义检查可用假期天数并预订新假期的逻辑。请注意,这个示例不需要API架构文件,也不需要Lambda函数。

对于这个示例,我们将使用SQLite 数据库生成一些员工数据

先决条件

在开始之前,让我们更新botocore和boto3包以确保我们有最新版本

!python3 -m pip install --upgrade -q botocore
!python3 -m pip install --upgrade -q boto3
!python3 -m pip install --upgrade -q awscli

现在让我们检查boto3版本,以确保已安装正确的版本。我们的版本应该大于或等于1.34.90。

import boto3
import json
import time
import uuid
import pprint
import logging

print(boto3.__version__)
# 设置logger
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

现在让我们创建所需AWS服务的boto3客户端

# 获取所需AWS服务的boto3客户端
sts_client = boto3.client('sts')
iam_client = boto3.client('iam')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')

接下来我们可以设置代理的一些配置变量

session = boto3.session.Session()
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
region, account_id
# 配置变量
suffix = f"{region}-{account_id}"
agent_name = "hr-assist-func-roc-user-conf"
agent_bedrock_allow_policy_name = f"{agent_name}-ba-{suffix}"
agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{agent_name}'
agent_foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
agent_description = "为员工提供HR帮助的代理,管理假期"
agent_instruction = "你是一个HR代理,帮助员工了解HR政策并管理假期时间"
agent_action_group_name = "VacationsActionGroup"
agent_action_group_description = "预订新假期的操作,需要用户确认"
agent_alias_name = f"{agent_name}-alias"

创建代理

我们现在将创建代理。为此,我们首先需要创建允许Bedrock模型调用和与之关联的代理IAM角色的代理策略。我们将允许这个代理调用Claude Sonnet模型

# 创建代理的IAM策略
bedrock_agent_bedrock_allow_policy_statement = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmazonBedrockAgentBedrockFoundationModelPolicy",
            "Effect": "Allow",
            "Action": "bedrock:InvokeModel",
            "Resource": [
                f"arn:aws:bedrock:{region}::foundation-model/{agent_foundation_model}"
            ]
        }
    ]
}

bedrock_policy_json = json.dumps(bedrock_agent_bedrock_allow_policy_statement)

agent_bedrock_policy = iam_client.create_policy(
    PolicyName=agent_bedrock_allow_policy_name,
    PolicyDocument=bedrock_policy_json
)
# 创建代理的IAM角色并附加IAM策略
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [{
          "Effect": "Allow",
          "Principal": {
            "Service": "bedrock.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
    }]
}

assume_role_policy_document_json = json.dumps(assume_role_policy_document)
agent_role = iam_client.create_role(
    RoleName=agent_role_name,
    AssumeRolePolicyDocument=assume_role_policy_document_json
)

# 暂停10秒钟以确保角色已创建
time.sleep(10)
    
iam_client.attach_role_policy(
    RoleName=agent_role_name,
    PolicyArn=agent_bedrock_policy['Policy']['Arn']
)

创建代理

一旦创建了所需的IAM角色,我们就可以使用Bedrock代理客户端创建一个新的代理。为此,我们使用create_agent函数。它需要代理名称、基础模型和说明。我们还可以提供代理描述。请注意,创建的代理尚未准备就绪。我们将关注于准备代理,然后使用它来调用操作并使用其他API。

response = bedrock_agent_client.create_agent(
    agentName=agent_name,
    agentResourceRoleArn=agent_role['Role']['Arn'],
    description=agent_description,
    idleSessionTTLInSeconds=1800,
    foundationModel=agent_foundation_model,
    instruction=agent_instruction,
)
response

现在让我们将代理ID存储在一个局部变量中,以便在下一步中使用它

agent_id = response['agent']['agentId']
agent_id

创建一个启用了所需确认字段的代理操作组

我们现在将创建一个代理操作组。create_agent_action_group函数提供了这个功能。我们将使用DRAFT作为代理版本,因为我们还没有创建代理版本或别名。为了告知代理操作组的功能,我们将提供一个包含操作组功能的操作组描述。

在这个示例中,我们将使用functionSchema提供操作组功能。我们也可以提供APISchema。笔记本02-create-agent-with-api-schema.ipynb 提供了一个示例。

要使用函数模式定义函数,我们需要为每个函数提供名称、描述、参数和requireConfirmation。requireConfirmation属性指示是否需要用户确认才能执行该函数。

更多关于此配置的详细信息,请参考提供的链接 中的文档。

agent_functions = [

    {
        'name': 'reserve_vacation_time',
        'description': '为特定员工预订假期时间 - 我们需要所有参数来预订假期时间',
        'parameters': {
            "employee_id": {
                "description": "将预订假期时间的员工ID",
                "required": True,
                "type": "integer"
            },
            "start_date": {
                "description": "假期开始日期",
                "required": True,
                "type": "string"
            },
            "end_date": {
                "description": "假期结束日期",
                "required": True,
                "type": "string"
            }
        },
        'requireConfirmation':'ENABLED'
    },
]

这里我们创建了一个带有RETURN_CONTROL执行器的操作组。这让代理知道,它应该简单地返回适当的函数和参数,而不是执行该函数。然后由客户端应用程序负责执行该函数。

# 暂停30秒钟以确保代理已创建
time.sleep(30)
# 现在,我们可以在这里配置和创建一个操作组:
agent_action_group_response = bedrock_agent_client.create_agent_action_group(
    agentId=agent_id,
    agentVersion='DRAFT',
    actionGroupExecutor={
        'customControl': 'RETURN_CONTROL'
    },
    actionGroupName=agent_action_group_name,
    functionSchema={
        'functions': agent_functions
    },
    description=agent_action_group_description
)
agent_action_group_response

准备代理

让我们创建一个代理的DRAFT版本,可用于内部测试。

response = bedrock_agent_client.prepare_agent(
    agentId=agent_id
)
print(response)

调用代理

现在我们已经创建了代理,让我们使用bedrock-agent-runtime客户端来调用这个代理并执行一些任务。

# 暂停30秒钟以确保代理已准备就绪
time.sleep(30)

# 从响应中提取agentAliasId
agent_alias_id = "TSTALIASID"
## 创建一个随机的会话发起者ID
session_id:str = str(uuid.uuid1())
enable_trace:bool = False
end_session:bool = False
# 暂停30秒钟以确保代理别名已就绪
# time.sleep(30)

# 调用代理API
agentResponse = bedrock_agent_runtime_client.invoke_agent(
    inputText="为员工1预订2天假期,开始日期为2024-11-14,结束日期为2024-11-19",
    agentId=agent_id,
    agentAliasId=agent_alias_id, 
    sessionId=session_id,
    enableTrace=enable_trace, 
    endSession= end_session
)

logger.info(pprint.pprint(agentResponse))
%%time
event_stream = agentResponse['completion']

for event in event_stream:
    if 'returnControl' in event:
        pprint.pp(event)

定义函数实现

现在让我们实现我们的函数,获取员工_id的假期信息,并为员工_id在start_date和end_dates之间预订假期。

为此,我们将首先创建一个包含生成数据的SQLite数据库

# 创建将由lambda函数使用的员工数据库
import sqlite3
import random
from datetime import date, timedelta

# 连接到SQLite数据库(如果不存在则创建一个新的)
conn = sqlite3.connect('employee_database.db')
c = conn.cursor()

# 创建employees表
c.execute('''CREATE TABLE IF NOT EXISTS employees
                (employee_id INTEGER PRIMARY KEY AUTOINCREMENT, employee_name TEXT, employee_job_title TEXT, employee_start_date TEXT, employee_employment_status TEXT)''')

# 创建vacations表
c.execute('''CREATE TABLE IF NOT EXISTS vacations
                (employee_id INTEGER, year INTEGER, employee_total_vacation_days INTEGER, employee_vacation_days_taken INTEGER, employee_vacation_days_available INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')

# 创建planned_vacations表
c.execute('''CREATE TABLE IF NOT EXISTS planned_vacations
                (employee_id INTEGER, vacation_start_date TEXT, vacation_end_date TEXT, vacation_days_taken INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')

# 为10名员工生成一些随机数据
employee_names = ['John Doe', 'Jane Smith', 'Bob Johnson', 'Alice Williams', 'Tom Brown', 'Emily Davis', 'Michael Wilson', 'Sarah Taylor', 'David Anderson', 'Jessica Thompson']
job_titles = ['Manager', 'Developer', 'Designer', 'Analyst', 'Accountant', 'Sales Representative']
employment_statuses = ['Active', 'Inactive']

for i in range(10):
    name = employee_names[i]
    job_title = random.choice(job_titles)
    start_date = date(2015 + random.randint(0, 7), random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
    employment_status = random.choice(employment_statuses)
    c.execute("INSERT INTO employees (employee_name, employee_job_title, employee_start_date, employee_employment_status) VALUES (?, ?, ?, ?)", (name, job_title, start_date, employment_status))
    employee_id = c.lastrowid

    # 为当前员工生成假期数据
    for year in range(date.today().year, date.today().year - 3, -1):
        total_vacation_days = random.randint(10, 30)
        days_taken = random.randint(0, total_vacation_days)
        days_available = total_vacation_days - days_taken
        c.execute("INSERT INTO vacations (employee_id, year, employee_total_vacation_days, employee_vacation_days_taken, employee_vacation_days_available) VALUES (?, ?, ?, ?, ?)", (employee_id, year, total_vacation_days, days_taken, days_available))

        # 为当前员工和年份生成一些计划的假期
        num_planned_vacations = random.randint(0, 3)
        for _ in range(num_planned_vacations):
            start_date = date(year, random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
            end_date = (date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])) + timedelta(days=random.randint(1, 14))).strftime('%Y-%m-%d')
            days_taken = (date(int(end_date[:4]), int(end_date[5:7]), int(end_date[8:])) - date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])))
            c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, days_taken.days))

# 提交更改并关闭连接
conn.commit()
conn.close()

接下来让我们使用生成的文件employee_database.db来提供我们查询的结果,通过实现reserve_vacation_time

import sqlite3
from datetime import datetime