PandasのDataFrameに対して、Amazon Bedrockを利用した処理を並列で呼び出す方法

Page content

PandasのDataFrameに対して、Amazon Bedrockを利用した処理を並列で呼び出す方法

PandasのDataFrameに対して、Amazon Bedrockを利用した処理を並列で呼び出す方法を調べてみました。PandasのDataFrameに対して、OpenAI APIを利用した処理を並列で呼び出す方法のAmazon Bedrockバージョンです。こちらも端的にはpandarallelを使いましょう、です。

from pandarallel import pandarallel

pandarallel.initialize()

summary_df["summary"] = summary_df["url"].parallel_apply(get_summary)

display(summary_df)

前提条件

  • boto3==1.34.51
  • pandarallel==1.6.5

事前準備

from __future__ import annotations
import boto3
import json
import urllib
import pandas as pd

urls = [
    "https://www.inoue-kobo.com/llm/openai-reduce-embedding-dim/",
    "https://www.inoue-kobo.com/aws/selenium-serverless/",
    "https://www.inoue-kobo.com/aws/aws-service-summary/",
    "https://www.inoue-kobo.com/ai_ml/duckduckgo-langchain-langsmith/",
    "https://www.inoue-kobo.com/ai_ml/llamaindex-pdf-gradio/",
]

summary_df = pd.DataFrame(urls, columns=["url"])
pd.set_option("display.max_colwidth", None)

単純に apply するだけ

def get_summary(url: str) -> str | None:
    res_web = urllib.request.urlopen(url)  # type: ignore
    content = res_web.read().decode("utf-8")

    res_bedrock = bedrock.invoke_model(
        modelId="anthropic.claude-3-haiku-20240307-v1:0",
        accept="application/json",
        contentType="application/json",
        body=json.dumps(
            {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 4096,
                "system": "以下はWebサイトの内容です。HTMLタグを削除した上で、150文字以内で要約してください。",
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "text",
                                "text": content,
                            }
                        ],
                    },
                ],
            }
        ),
    )
    response_body = json.loads(res_bedrock.get("body").read())

    return response_body.get("content")[0].get("text")
summary_df["summary"] = summary_df["url"].apply(get_summary)

display(summary_df)

実行時間は17.0sでした。applyしただけでは並列処理は行われないため、この処理時間が基準になります。

asyncを使う

import asyncio


async def aget_summary(url: str) -> str | None:
    res_web = urllib.request.urlopen(url)  # type: ignore
    content = res_web.read().decode("utf-8")

    res_bedrock = bedrock.invoke_model(
        modelId="anthropic.claude-3-haiku-20240307-v1:0",
        accept="application/json",
        contentType="application/json",
        body=json.dumps(
            {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 4096,
                "system": "以下はWebサイトの内容です。HTMLタグを削除した上で、150文字以内で要約してください。",
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "text",
                                "text": content,
                            }
                        ],
                    },
                ],
            }
        ),
    )
    response_body = json.loads(res_bedrock.get("body").read())

    return response_body.get("content")[0].get("text")
summary_df["summary"] = await asyncio.gather(*[aget_summary(url) for url in urls])

display(summary_df)

実行時間は15.8sでした。asyncioでは期待したような並列処理(正確にはノンブロッキングIO)が行われていないことが確認できます。

ThreadPoolExecutorを使う

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
    summary_df["summary"] = list(executor.map(get_summary, urls))

display(summary_df)

実行時間は5.8sでした。applyを適用しただけの順次処理よりも大幅に処理時間が短くなっています。並列処理が行われていることが確認できます。

pandarallelを使う

from pandarallel import pandarallel

pandarallel.initialize()
summary_df["summary"] = summary_df["url"].parallel_apply(get_summary)

display(summary_df)

実行時間は5.3sでした。こちらも並列処理が行われていることが確認できます。

参考文献