以下の内容はhttps://developersblog.dmm.com/entry/2025/01/22/110000より取得しました。


DMM.博士 通信 Vol.2 - 分散型RAGサテライトシステムの構想

thumbnail

はじめに

DMM.博士通信の2回目の投稿になります。今回は前回のおわりに記したとおりRetrieval Augmented Generation(aka. RAG)について記載します。DMM.博士については前回までの投稿をお読みいただければと思います。一応簡潔に紹介しますと、DMMグループ内向けに提供中のSlack上で働くChatGPT様式のサービスになります。

汎用的なAIチャットアプリから始まったDMM.博士ですが、DMM.博士通信Vol.1で紹介したように外部関数呼び出しを利用してワークフローを担うなど発展しています。次の課題の取り組みは、社内情報に基づき一定の信憑性が確保された対話を可能にすることです。結論から述べると2024年5月に社内規定を対象としてこれを実現しました。DMM.博士 分散型RAGサテライトシステムと呼称する最初のRAG事例について説明します。

着想に至るまで

もともとは生成AIに着目する以前から社内横断検索はトライするべき課題として着目していました。OpenAIの流行後間もなく、企業内情報を扱う手法についていくつかのサービス提供が始まりました。例えばAzureではOn Your Dataと呼ばれる機能が発表されました。実際にそれを使用し検証しました。しかしおそらくは複合的な理由から信憑性の高い対話の実現には至りませんでした。またその時期にはRAGアーキテクチャも一般に知られるようになり、取り込むべき対象データを浮動小数点数にしベクトルストアに格納したうえでコサイン類似度を測る仕組みと、生成AIからそれを呼び出し利活用する手法についても検討しました。しかしRAGの問題はベクトルストアのデータ更新管理コストの高さになることは容易に想像できました。一方社内データを保有する他システム側でセマンティック検索機能を持ち始めるならば、生成AI側は外部API呼び出しからその結果を使わせてもらう方が効率よくコストを抑えられるのではないかということも考えていました。整理すると以下のとおりです。

  1. RAGマネジメントサービスの利活用検討 → 対象データの所在とデータの最適化が課題
  2. 一般的なRAGアーキテクチャの採用検討 → ベクトルストアのデータ更新が課題
  3. 生成AIからの外部関数呼び出しの利活用検討 → 現実的ではあるが提供される保証はない

これらのことを考えるにあたり問題として共通するのは次のことでした。
「扱いたい企業内データは様式も格納場所も様々」。

1と2は対象データを集約させる必要があり、その時点で現実的ではないと判断しました。3についてはデータ保持するシステムは様々あり、生成AIから利用できる機能APIがいつ提供されるかわからないという問題だけでした。セマンティック検索機能搭載の期待ができるシステム対象は後回しにして、期待の薄いトラディショナルなデータから対象にすればよいと考えました。

ここまでで3を基本として次の着想に至り、「分散型RAGサテライトシステム」構想となります。

  • 対象データを移動したり、扱いやすいようにデータ加工するなど元の状態に影響を与えることはしない。
  • セマンティック検索が提供されない対象データについてのみ個別にRAG APIシステムを構築すれば良い。
  • DMM.博士は外部関数呼び出しをエージェントに任せ、必要に応じて各RAG APIをコールすれば良い。

対象データはどうするか

当社の場合、次の条件を満たすのは社内規定であり、最初のモデルケースとして対象データの選定には悩みませんでした。

  • 利用者から需要がある
  • データ格納場所が見つけにくくシステムは最新化されていない
  • テキストベースであること(様々なデータ様式に対応する必要がない)

なにで開発をするか

DMM.博士自体はAzure OpenAIで動くシステムとして開発してきましたが、この時期にはOpenAI社のモデルに対抗する生成AIモデルがいくつか出てきており、柔軟に取り入れられる状態が理想的だと考えていました。具体的にはAnthropic社のClaude3の性能が話題になっていましたし、Amazon Bedrockはそれを提供するクラウドプロパイダになっていました。構想するRAG APIはDMM.博士から外部関数コールできる場所にあれば良いため、Azureに拘る必要はありません。OpenAI SDKに依存する必要が希薄になることに加えて、データ加工やベクトルストアのライブラリが揃っていることからLangChainを開発の中心軸に据えることを検討しました。

LangChainで扱えるデータストアは様々あるのですが、初期段階ではFAISSを使用していました。ローカル開発中には特に使いやすい実装だと思います。しかしFAISSが扱うデータはローカルファイルシステムにのみ対応しているため、そのままではAWS上のシステムで扱うには制約が強すぎて難しいと思います。ElasticSearchなど他の実装に目を向けることもできますが、対象データ別に小さくサブAPIシステムを構築するというコンセプトからは外れてしまいます。

FAISSを動かす計算機からAWS S3に配置した対象データを扱うことができれば問題ないわけですが、その部分は独自実装で補うことができたためベクトルストアにはFAISSを採用することにしました。

整理すると次のとおり。

  • 対象データはAWS S3上に配置
  • AWS Fargate 上に LangChainで実装するRAG APIを設ける
  • 生成AIはAmazon Bedrockを使用

システム全体像

Fig1

システム構成要素は次のとおりです。

  • Data Collector
  • Indexer
  • Monolithicalized Indexer
  • Vector Store
  • Retriever
  • Agent

Data Collector

Data Collectorは規約サイトからHTMLやPDFデータを取得してS3バケットにデータ保存します。元規約文書は複数ページに分かれているため、ファイル数は100を超えます。実装は PlaywriteでFargateタスクとして定義し、EventBridge Schedulerから定期実行する想定で設計しています。

Indexer

前述のとおりLangChainとFAISSで実装しています。Data Collectorのバッチ処理終了を起点にイベント駆動する設計でFargateタスクとして定義しています。バケットに置かれた対象データファイルを順次読み込んで処理しますが、一つのファイルデータをさらに所定のチャンクサイズに分割してインデックス生成します。1stモデルケースでは慎重になっていたこともあり、一つのファイルデータに対して一つのインデックスデータを生成出力してS3に保存する仕様としていました(後続のモデルケースでは複数ファイルデータをメモリ内で処理して単一のインデックスを出力する仕様にしています)。

またインデックス生成には組み込み生成AIモデルが必要となりますが、amazon.titan-embed-text-v1 を使用しました。

一部抜粋になりますが当該コードを掲載します(StreamPDFLoader(), StreamTextLoader(), _save_s3()は非公開とさせていだきます)。

from langchain_community.vectorstores.faiss import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter

class VectorStoreIndexer:

    def __init__(self, embeddings, aws_client) -> None:
        self.embeddings = embeddings
        self.aws_client = aws_client
        self.vector_store = None

    def init_vector_store(self):
        self.vector_store = FAISS.from_texts(["dummy"], self.embeddings, ids=[0])
        self.vector_store.delete([0])

    def create_vector_store_from_s3_objects(self, s3_objects):
        for object in s3_objects:
            self.init_vector_store()
            file_key = object["Key"]
            if file_key.endswith(".pdf"):
                logger.info(f"ファイル形式: {file_key}")
                s3_object = self.aws_client.get_s3_object(file_key)
                loader = StreamPDFLoader(BytesIO(s3_object["Body"].read()))
                self._making_docs(loader, file_key)
            elif file_key.endswith(".txt") or file_key.endswith(".md"):
                logger.info(f"ファイル形式: {file_key}")
                s3_object = self.aws_client.get_s3_object(file_key)
                loader = StreamTextLoader(BytesIO(s3_object["Body"].read()))
                self._making_docs(loader, file_key)
            else:
                continue

    def _making_docs(self, loader, file):
        pages = loader.load_and_split()

        if pages:
            page_text = pages[0].page_content if pages else None
            header_dict = self._get_header_summary(file, page_text)
            if not header_dict:
                return

            for page in pages:
                page.metadata[
                    "source"
                ] = f"{self.aws_client.data_corrector_bucket_name}/{file}"
                page.metadata["filename"] = file.split("/")[-1]
                for key, value in header_dict.items():
                    page.metadata[key] = value

            logger.info(f"metadata: {pages[0].metadata}")

            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=1000,
                chunk_overlap=0,
                separators=[
                    "\n\n",
                    "\n",
                    " ",
                    ".",
                    ",",
                    "\u200b",  # Zero-width space
                    "\uff0c",  # Fullwidth comma
                    "\u3001",  # Ideographic comma
                    "\uff0e",  # Fullwidth full stop
                    "\u3002",  # Ideographic full stop
                    "",
                ],
            )

            docs = text_splitter.split_documents(pages)
            self.vector_store.merge_from(FAISS.from_documents(docs, self.embeddings))
            self._save_index_s3(file)

Monolithicalized Indexer

Indexerの説明でも触れましたが、対象ファイルごとにインデックスを生成する仕様としていました。後述するAPIサーバは起動時に一つのインデックスを読み込む仕様となっているため、Indexerの処理終了を起点にMonolizerが起動して複数出力されたインデックスを単一インデックスに結合して出力する想定で設計されました。

ソース掲載は割愛しますが、langchain_community.vectorstores.faiss.FAISS.merge_from を使用して実装可能です。

参照: https://api.python.langchain.com/en/latest/vectorstores/langchain_community.vectorstores.faiss.FAISS.html#langchain_community.vectorstores.faiss.FAISS.merge_from

Vector Store & Retriever

Vector Storeについてはインデックス生成で説明しました。Retrieverはデータストアのインスタンスからコールされる検索処理を担います。これにRAGとして不足する次の機能の連鎖(チェーン)を追加し、API化にはLangServeを使用してRAG APIサーバとして成立させています。

  1. 検索のための入力指示を受け付ける
  2. retrieve(類似度検索)して結果を出力する
  3. 出力結果からプロンプトに従って文字列生成する
  4. 最終出力を整形する

retrieve結果から回答生成する処理には、anthropic.claude-3-haiku-20240307-v1:0 または anthropic.claude-3-sonnet-20240229-v1:0 を使用しました。

Agent

AgentはDMM.博士自体に実装されます。これ自体はDMM.博士通信 Vol.1でも記載した内容と同様、利用者がDMM.博士に入力する内容を判断して外部APIコールを選択する挙動の制御実装となります。例えば、「DMMでは男性も育児休暇を取得できますか?」といった入力であれば、DMMの社内規定を確認して回答しなければいけないと判断し、RAG APIをコールするのがAgentの役割です。

抜粋例示すると次のようになります。この場合、Agentの自動判定でrag_dmm_group_regulationsがセレクトされるので、そのメソッドに必要な制御処理を実装すれば良いことになります。

class HakaseFunctionsManifest(object):
    tools = [
        {
            "type": "function",
            "function": {
                "name": "rag_dmm_group_regulations",
                "description": "DMMグループの社内規程の問い合わせに関する回答を生成する。",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "message": {
                            "type": "string",
                            "description": "DMMの社内規程に関する情報を調べて回答します。DMMグループ外の一般的な内容については扱わない。次の会社はDMMグループに含まれます [合同会社DMM.com, 合同会社EXNOA, 株式会社Studio KUMASAN, 株式会社DGホールディングス]。",
                        },
                    },
                    "required": ["message"],
                }
            }
        },
    ]

どうなったか

実際のSlackからの使用例は次のようになっています。

Fig2

回答文だけではなく、RAGにより類似度検索にヒットした参照ドキュメントを箇条書きで掲載し...はそのリンクとしています。

回答文の最期には次の注釈を含めています。

※この文章は、RAG(Retrieval-Augmented Generation)技術を活用して生成されています。
※回答内容および参照情報が不適当な場合はより具体的な入力文の変更をお試しください。

これは利用者の入力内容によってはAgentが誤作動を起こし、RAGを使わず汎用的な回答を生成する可能性があるための処置です。
この注釈のない回答はRAGによるものではなくハルシネーションが起きているので注意する必要があります。

ちなみにこの社内規定RAGは比較的利用機会が多く、博士には他にも多くの便利機能がある中で、社内規定RAGの利用率は52.2%(提供開始から8ヶ月間の機能別利用内訳より)となっています。

おわりに

今回は2024年6月頃迄のDMM.博士の機能追加のうち、分散型RAGサテライトシステムの最初のモデルケースの成り立ちについて記載しました。その後2ndモデルケースではよりデータ様式の複雑なCMSを対象にRAGを構築しました。そこでは今回のようにテキスト、PDFだけではなく、CMSのコンテンツに埋め込まれた画像やスプレッドシートが存在し、更にはコンテンツ組み立てのレギュレーションがなく自由に記載されたページを扱うことになります。ベクトルストアに入力するデータが規則性を持たなければノイズデータとなり類似度検索の品質は保てなくなるという課題と向き合うことになりました。またFAISSを扱ううえで考慮するべき計算機CPUのベクトル演算処理制御についても注意が必要となります。次回のDMM博士 通信 Vol.3ではそのあたりを記載したいと思います。

最期になりますが私たちは共に働ける仲間を募っています。ご興味のある方は以下よりご応募ください。




以上の内容はhttps://developersblog.dmm.com/entry/2025/01/22/110000より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14