创建与 Bedrock 知识库集成的代理并附加操作组

在本笔记本中,我们将学习如何创建一个亚马逊 Bedrock 代理,该代理利用亚马逊 Bedrock 的知识库来检索有关餐厅菜单的数据。该用例是创建一个餐厅代理,它的任务是向客户提供有关成人或儿童菜单的信息,并负责餐桌预订系统。客户将能够创建、删除或获取预订信息。架构如下所示:


完成本笔记本的步骤如下:

  1. 导入所需的库
  2. 为亚马逊 Bedrock 创建知识库
  3. 将数据集上传到亚马逊 S3
  4. 为亚马逊 Bedrock 创建代理
  5. 测试代理
  6. 清理创建的资源

1. 导入所需的库

第一步是安装先决条件包

!pip install --upgrade -q -r requirements.txt
import os
import time
import boto3
import logging
import pprint
import json

from knowledge_base import BedrockKnowledgeBase
from agent import create_agent_role_and_policies, create_lambda_role, delete_agent_roles_and_policies
from agent import create_dynamodb, create_lambda, clean_up_resources
#Clients
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session()
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id
suffix = f"{region}-{account_id}"
agent_name = 'booking-agent'
knowledge_base_name = f'{agent_name}-kb'
knowledge_base_description = "知识库包含餐厅菜单集合"
agent_alias_name = "booking-agent-alias"
bucket_name = f'{agent_name}-{suffix}'
agent_bedrock_allow_policy_name = f"{agent_name}-ba"
agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{agent_name}'
agent_foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

agent_description = "负责餐厅预订的代理"
agent_instruction = """
我们是一名餐厅代理,帮助客户检索预订信息、创建新预订或删除现有预订
"""

agent_action_group_description = """
获取餐桌预订信息、创建新预订或删除现有预订的操作"""

agent_action_group_name = "TableBookingsActionGroup"

2. 为亚马逊 Bedrock 创建知识库

让我们从创建一个亚马逊 Bedrock 知识库 开始,以存储餐厅菜单。知识库允许我们与不同的向量数据库集成,包括亚马逊 OpenSearch Serverless亚马逊 AuroraPinecone 。对于这个例子,我们将把知识库与亚马逊 OpenSearch Serverless 集成。为此,我们将使用助手类 BedrockKnowledgeBase,它将创建知识库及其所有先决条件:

  1. IAM 角色和策略
  2. S3 存储桶
  3. Amazon OpenSearch Serverless 加密、网络和数据访问策略
  4. Amazon OpenSearch Serverless 集合
  5. Amazon OpenSearch Serverless 向量索引
  6. 知识库
  7. 知识库数据源
knowledge_base = BedrockKnowledgeBase(
    kb_name=knowledge_base_name,
    kb_description=knowledge_base_description,
    data_bucket_name=bucket_name
)

3. 将数据集上传到亚马逊 S3

现在我们已经创建了知识库,让我们用菜单数据集来填充它。知识库数据源希望数据可用于与之连接的 S3 存储桶,并且可以使用 StartIngestionJob API 调用将对数据的更改同步到知识库。在这个例子中,我们将使用 boto3 抽象 来调用 API,通过我们的助手类。

首先让我们将 dataset 文件夹中可用的菜单数据上传到 s3

def upload_directory(path, bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                file_to_upload = os.path.join(root,file)
                print(f"uploading file {file_to_upload} to {bucket_name}")
                s3_client.upload_file(file_to_upload,bucket_name,file)

upload_directory("dataset", bucket_name)

现在我们启动摄取作业

# ensure that the kb is available
time.sleep(30)
# sync knowledge base
knowledge_base.start_ingestion_job()

最后,我们收集知识库 ID 以便稍后与我们的代理集成

kb_id = knowledge_base.get_knowledge_base_id()

3.1 测试知识库

现在知识库可用,我们可以使用 <strong>retrieve</strong><strong>retrieve_and_generate</strong> 函数对其进行测试。

使用检索和生成 API 测试知识库

让我们首先使用检索和生成 API 测试知识库。使用此 API,Bedrock 负责从知识库中检索必要的引用,并使用 Bedrock 的 LLM 模型生成最终答案

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": "儿童菜单中有哪 5 种主菜?"
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, agent_foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

如我们所见,使用检索和生成 API,我们直接获得最终响应,我们看不到用于生成此响应的不同来源。现在让我们使用检索 API 从知识库中检索源信息。

使用检索 API 测试知识库

如果我们需要额外的控制层,我们可以使用检索 API 检索最能匹配我们查询的块。在此设置中,我们可以配置所需的结果数量,并使用自己的应用程序逻辑控制最终答案。该 API 然后提供我们与匹配内容、其 S3 位置、相似度分数和块元数据

response_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        'text': '儿童菜单中有哪 5 种主菜?'
    }
)

def response_print(retrieve_resp):
#structure 'retrievalResults': list of contents. Each list has content, location, score, metadata
    for num,chunk in enumerate(response_ret['retrievalResults'],1):
        print(f'Chunk {num}: ',chunk['content']['text'],end='\n'*2)
        print(f'Chunk {num} Location: ',chunk['location'],end='\n'*2)
        print(f'Chunk {num} Score: ',chunk['score'],end='\n'*2)
        print(f'Chunk {num} Metadata: ',chunk['metadata'],end='\n'*2)

response_print(response_ret)

4. 为亚马逊 Bedrock 创建代理

在本节中,我们将经历创建亚马逊 Bedrock 代理的所有步骤。

这些是要完成的步骤:

  1. 创建亚马逊 DynamoDB 表
  2. 创建 AWS Lambda 函数
  3. 创建代理所需的 IAM 策略
  4. 创建代理
  5. 创建代理操作组
  6. 允许代理调用操作组 Lambda
  7. 将知识库与代理关联
  8. 准备代理并创建别名

4.1 创建 DynamoDB 表

我们将创建一个 DynamoDB 表,其中包含餐厅预订信息。

table_name = 'restaurant_bookings'
create_dynamodb(table_name)

4.2 创建 Lambda 函数

我们现在将创建一个 lambda 函数,与 DynamoDB 表进行交互。为此,我们将:

  1. 创建 lambda_function.py 文件,其中包含 lambda 函数的逻辑
  2. 创建 Lambda 函数的 IAM 角色
  3. 使用所需的权限创建 lambda 函数

创建函数代码

在创建亚马逊 Bedrock 代理时,我们可以将 Lambda 函数连接到操作组,以执行代理所需的功能。在此选项中,我们的代理负责执行我们的功能。让我们创建 lambda 函数,实现 get_booking_detailscreate_bookingdelete_booking 功能

%%writefile lambda_function.py
import json
import uuid
import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('restaurant_bookings')

def get_named_parameter(event, name):
    """
    从 lambda 事件中获取参数
    """
    return next(item for item in event['parameters'] if item['name'] == name)['value']


def get_booking_details(booking_id):
    """
    检索餐厅预订的详细信息
    
    Args:
        booking_id (string): 要检索的预订 ID
    """
    try:
        response = table.get_item(Key={'booking_id': booking_id})
        if 'Item' in response:
            return response['Item']
        else:
            return {'message': f'没有找到 ID 为 {booking_id} 的预订'}
    except Exception as e:
        return {'error': str(e)}


def create_booking(date, name, hour, num_guests):
    """
    创建新的餐厅预订
    
    Args:
        date (string): 预订日期
        name (string): 预订标识名称
        hour (string): 预订时间
        num_guests (integer): 预订人数
    """
    try:
        booking_id = str(uuid.uuid4())[:8]
        table.put_item(
            Item={
                'booking_id': booking_id,
                'date': date,
                'name': name,
                'hour': hour,
                'num_guests': num_guests
            }
        )
        return {'booking_id': booking_id}
    except Exception as e:
        return {'error': str(e)}


def delete_booking(booking_id):
    """
    删除现有的餐厅预订
    
    Args:
        booking_id (str): 要删除的预订 ID
    """
    try:
        response = table.delete_item(Key={'booking_id': booking_id})
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            return {'message': f'ID 为 {booking_id} 的预订已成功删除'}
        else:
            return {'message': f'无法删除 ID 为 {booking_id} 的预订'}
    except Exception as e:
        return {'error': str(e)}
    

def lambda_handler(event, context):
    # 获取在调用 lambda 函数期间使用的操作组
    actionGroup = event.get('actionGroup', '')
    
    # 应该调用的函数名称
    function = event.get('function', '')
    
    # 调用函数时的参数
    parameters = event.get('parameters', [])

    if function == 'get_booking_details':
        booking_id = get_named_parameter(event, "booking_id")
        if booking_id:
            response = str(get_booking_details(booking_id))
            responseBody = {'TEXT': {'body': json.dumps(response)}}
        else:
            responseBody = {'TEXT': {'body': '缺少 booking_id 参数'}}

    elif function == 'create_booking':
        date = get_named_parameter(event, "date")
        name = get_named_parameter(event, "name")
        hour = get_named_parameter(event, "hour")
        num_guests = get_named_parameter(event, "num_guests")

        if date and hour and num_guests:
            response = str(create_booking(date, name, hour, num_guests))
            responseBody = {'TEXT': {'body': json.dumps(response)}}
        else:
            responseBody = {'TEXT': {'body': '缺少必需的参数'}}

    elif function == 'delete_booking':
        booking_id = get_named_parameter(event, "booking_id")
        if booking_id:
            response = str(delete_booking(booking_id))
            responseBody = {'TEXT': {'body': json.dumps(response)}}
        else:
            responseBody = {'TEXT': {'body': '