使用返回控制 (ROC) 创建代理

在本 notebook 中,我们将使用新的功能定义和返回控制功能为 Amazon Bedrock 创建一个代理。

我们将以 HR 代理为例。使用此代理,我们可以查看可用的假期天数并请求新的假期休假。我们将使用一个本地函数来定义检查可用假期天数和预订新假期的逻辑。请注意,此示例不需要 API 架构文件,也不需要 Lambda 函数。

对于此示例,我们将使用 SQLite 数据库生成一些员工数据

先决条件

在开始之前,让我们更新 botocore 和 boto3 软件包以确保我们有最新版本

!python3 -m pip install --upgrade -q botocore
!python3 -m pip install --upgrade -q boto3
!python3 -m pip install --upgrade -q awscli

现在让我们检查 boto3 版本以确保已安装正确的版本。我们的版本应该大于或等于 1.34.90。

import boto3
import json
import time
import uuid
import pprint
import logging
print(boto3.__version__)
# 设置日志记录器
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

现在让我们为所需的 AWS 服务创建 boto3 客户端

# 获取所需 AWS 服务的 boto3 客户端
sts_client = boto3.client('sts')
iam_client = boto3.client('iam')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')

接下来我们可以设置代理的一些配置变量

session = boto3.session.Session()
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
region, account_id
# 配置变量
suffix = f"{region}-{account_id}"
agent_name = "hr-assistant-function-roc"
agent_bedrock_allow_policy_name = f"{agent_name}-ba-{suffix}"
agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{agent_name}'
agent_foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"
agent_description = "提供 HR 协助以预订假期的代理"
agent_instruction = "我们是一名 HR 代理,帮助员工了解 HR 政策并管理假期时间"
agent_action_group_name = "VacationsActionGroup"
agent_action_group_description = "获取员工可用假期天数和在系统中预订新假期的操作"
agent_alias_name = f"{agent_name}-alias"

创建代理

我们现在将创建代理。为此,我们首先需要创建允许 bedrock 模型调用和与之关联的代理 IAM 角色的代理策略。我们将允许此代理调用 Claude Sonnet 模型

# 创建代理的 IAM 策略
bedrock_agent_bedrock_allow_policy_statement = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmazonBedrockAgentBedrockFoundationModelPolicy",
            "Effect": "Allow",
            "Action": "bedrock:InvokeModel",
            "Resource": [
                f"arn:aws:bedrock:{region}::foundation-model/{agent_foundation_model}"
            ]
        }
    ]
}

bedrock_policy_json = json.dumps(bedrock_agent_bedrock_allow_policy_statement)

agent_bedrock_policy = iam_client.create_policy(
    PolicyName=agent_bedrock_allow_policy_name,
    PolicyDocument=bedrock_policy_json
)


# 创建代理的 IAM 角色并附加 IAM 策略
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [{
          "Effect": "Allow",
          "Principal": {
            "Service": "bedrock.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
    }]
}

assume_role_policy_document_json = json.dumps(assume_role_policy_document)
agent_role = iam_client.create_role(
    RoleName=agent_role_name,
    AssumeRolePolicyDocument=assume_role_policy_document_json
)

# 暂停以确保角色已创建
time.sleep(10)
    
iam_client.attach_role_policy(
    RoleName=agent_role_name,
    PolicyArn=agent_bedrock_policy['Policy']['Arn']
)

创建代理

一旦创建了所需的 IAM 角色,我们就可以使用 Bedrock 代理客户端创建一个新的代理。为此,我们使用 create_agent 函数。它需要代理名称、基础模型和说明。我们还可以提供代理描述。请注意,创建的代理尚未准备就绪。我们将专注于准备代理,然后使用它来调用操作和使用其他 API。

response = bedrock_agent_client.create_agent(
    agentName=agent_name,
    agentResourceRoleArn=agent_role['Role']['Arn'],
    description=agent_description,
    idleSessionTTLInSeconds=1800,
    foundationModel=agent_foundation_model,
    instruction=agent_instruction,
)
response

现在让我们将代理 id 存储在一个局部变量中,以在下一步中使用它

agent_id = response['agent']['agentId']
agent_id

创建代理操作组

我们现在将创建一个代理操作组。<code>create_agent_action_group</code> 函数提供了这个功能。我们将使用 DRAFT 作为代理版本,因为我们还没有创建代理版本或别名。为了告知代理操作组的功能,我们将提供包含操作组功能的操作组描述。

在这个例子中,我们将使用 functionSchema 提供操作组功能。我们也可以提供 APISchema。notebook 02-create-agent-with-api-schema.ipynb 提供了一个示例。

要使用函数架构定义函数,我们需要提供每个函数的 namedescriptionparameters

agent_functions = [
    {
        'name': 'get_available_vacations_days',
        'description': '获取某个员工可用的假期天数',
        'parameters': {
            "employee_id": {
                "description": "获取可用假期的员工 ID",
                "required": True,
                "type": "integer"
            }
        }
    },
    {
        'name': 'reserve_vacation_time',
        'description': '为特定员工预订假期时间 - 我们需要所有参数来预订假期时间',
        'parameters': {
            "employee_id": {
                "description": "将为其预订假期时间的员工 ID",
                "required": True,
                "type": "integer"
            },
            "start_date": {
                "description": "假期开始日期",
                "required": True,
                "type": "string"
            },
            "end_date": {
                "description": "假期结束日期",
                "required": True,
                "type": "string"
            }
        }
    },
]

这里我们创建了一个操作组,其 customContrl 执行器为 RETURN_CONTROL。这让代理知道,它应该简单地返回适当的函数和参数,而不是执行该函数。然后客户端应用程序负责执行该函数。

# 暂停以确保代理已创建
time.sleep(30)
# 现在,我们可以在这里配置和创建一个操作组:
agent_action_group_response = bedrock_agent_client.create_agent_action_group(
    agentId=agent_id,
    agentVersion='DRAFT',
    actionGroupExecutor={
        'customControl': 'RETURN_CONTROL'
    },
    actionGroupName=agent_action_group_name,
    functionSchema={
        'functions': agent_functions
    },
    description=agent_action_group_description
)
agent_action_group_response

准备代理

让我们创建代理的 DRAFT 版本,以供内部测试使用。

response = bedrock_agent_client.prepare_agent(
    agentId=agent_id
)
print(response)

调用代理

现在我们已经创建了代理,让我们使用 bedrock-agent-runtime 客户端来调用这个代理并执行一些任务。

# 暂停以确保代理已准备就绪
time.sleep(30)

# 从响应中提取 agentAliasId
agent_alias_id = "TSTALIASID"
## 创建一个随机 id 作为会话发起者 id
session_id:str = str(uuid.uuid1())
enable_trace:bool = False
end_session:bool = False
# 暂停以确保代理别名已就绪
# time.sleep(30)

# 调用代理 API
agentResponse = bedrock_agent_runtime_client.invoke_agent(
    inputText="employee 1 有多少假期天数可用?",
    agentId=agent_id,
    agentAliasId=agent_alias_id, 
    sessionId=session_id,
    enableTrace=enable_trace, 
    endSession= end_session
)

logger.info(pprint.pprint(agentResponse))
%%time
event_stream = agentResponse['completion']
for event in event_stream:
    if 'returnControl' in event:
        pprint.pp(event)

定义函数实现

现在让我们实现我们的函数,以获取员工 ID 的假期信息,并为员工 ID 在开始日期和结束日期之间预订假期。

为此,我们将首先创建一个包含生成数据的 SQLite 数据库

# 创建将由 lambda 函数使用的员工数据库
import sqlite3
import random
from datetime import date, timedelta

# 连接到 SQLite 数据库(如果不存在则创建一个新的)
conn = sqlite3.connect('employee_database.db')
c = conn.cursor()

# 创建员工表
c.execute('''CREATE TABLE IF NOT EXISTS employees
                (employee_id INTEGER PRIMARY KEY AUTOINCREMENT, employee_name TEXT, employee_job_title TEXT, employee_start_date TEXT, employee_employment_status TEXT)''')

# 创建假期表
c.execute('''CREATE TABLE IF NOT EXISTS vacations
                (employee_id INTEGER, year INTEGER, employee_total_vacation_days INTEGER, employee_vacation_days_taken INTEGER, employee_vacation_days_available INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')

# 创建计划假期表
c.execute('''CREATE TABLE IF NOT EXISTS planned_vacations
                (employee_id INTEGER, vacation_start_date TEXT, vacation_end_date TEXT, vacation_days_taken INTEGER, FOREIGN KEY(employee_id) REFERENCES employees(employee_id))''')

# 为 10 名员工生成一些随机数据
employee_names = ['John Doe', 'Jane Smith', 'Bob Johnson', 'Alice Williams', 'Tom Brown', 'Emily Davis', 'Michael Wilson', 'Sarah Taylor', 'David Anderson', 'Jessica Thompson']
job_titles = ['Manager', 'Developer', 'Designer', 'Analyst', 'Accountant', 'Sales Representative']
employment_statuses = ['Active', 'Inactive']

for i in range(10):
    name = employee_names[i]
    job_title = random.choice(job_titles)
    start_date = date(2015 + random.randint(0, 7), random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
    employment_status = random.choice(employment_statuses)
    c.execute("INSERT INTO employees (employee_name, employee_job_title, employee_start_date, employee_employment_status) VALUES (?, ?, ?, ?)", (name, job_title, start_date, employment_status))
    employee_id = c.lastrowid

    # 为当前员工生成假期数据
    for year in range(date.today().year, date.today().year - 3, -1):
        total_vacation_days = random.randint(10, 30)
        days_taken = random.randint(0, total_vacation_days)
        days_available = total_vacation_days - days_taken
        c.execute("INSERT INTO vacations (employee_id, year, employee_total_vacation_days, employee_vacation_days_taken, employee_vacation_days_available) VALUES (?, ?, ?, ?, ?)", (employee_id, year, total_vacation_days, days_taken, days_available))

        # 为当前员工和年份生成一些计划假期
        num_planned_vacations = random.randint(0, 3)
        for _ in range(num_planned_vacations):
            start_date = date(year, random.randint(1, 12), random.randint(1, 28)).strftime('%Y-%m-%d')
            end_date = (date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])) + timedelta(days=random.randint(1, 14))).strftime('%Y-%m-%d')
            days_taken = (date(int(end_date[:4]), int(end_date[5:7]), int(end_date[8:])) - date(int(start_date[:4]), int(start_date[5:7]), int(start_date[8:])))
            c.execute("INSERT INTO planned_vacations (employee_id, vacation_start_date, vacation_end_date, vacation_days_taken) VALUES (?, ?, ?, ?)", (employee_id, start_date, end_date, days_taken.days))

# 提交更改并关闭连接
conn.commit