以下の内容はhttps://touch-sp.hatenablog.com/entry/2025/03/31/130048より取得しました。


【SmolAgents】エージェントっぽいのを作ってみたけど理想と現実はだいぶ違った

はじめに

完全ローカルのAIエージェントをSmolAgentsを使って作ってみました。もちろん簡単なことしかできません。

使用した言語モデル:gemma-3-12b-it-Q4_K_M.gguf(llama.cppで実行)
./llama-server -m ~/models/gemma-3-12b-it-Q4_K_M.gguf -c 8192 -ngl 30 --host 0.0.0.0

「-c」は6000以上ないとSmolAgentsがうまく動きませんでした。
VRAM用量によって「-ngl」を調整する必要があります。
Dockerコンテナからアクセスするので「--host 0.0.0.0」が必要です。

理想

〇〇(フォルダ)内のJPEG画像をPNG画像に変換して。

現実

〇〇(フォルダ)内のファイルを検索してJPEG画像があればそれをPIL.Imageで開いてPNGフォーマットに変換後保存して下さい。

このように依頼しないと目的は達成できませんでした。

変換にPIL.Imageを使うように指示を出さないと「ツール」がありませんと返って来ました。
保存するように指示を出さないと保存してくれませんでした。

SmolAgentsの限界なのか言語モデルの限界なのかはわかりませんが、なんか残念な結果になりました。

システムプロンプトを書き換えるとうまくいくのかもしれませんが今回は試してません。

方法

関数を定義する

ファイルを検索する関数をMCPサーバーとして記述しました。
汎用性があるかどうかは確認できていませんがSmolAgentsからは使えました。

SmolAgentsはPythonコードを書いてそれを実行してくれるのでわざわざ定義しなくてもglobなどを使って勝手に検索してくれます。
Pythonで記述したMCPサーバーがちゃんと動作するかを試す目的で今回は関数を自作しました。

テンプレートはこちらです。

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("file_search") # 名前は任意

@mcp.tool()
def my_function():

    (ここに記述)

if __name__ == "__main__":
    mcp.run(transport='stdio')

実際にはこのように書きました。「filesystem.py」という名前にしました。

from pathlib import Path
import json
import fnmatch
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("file_search")

@mcp.tool()
def list_files(base_dir: str = "", recursive: bool = True) -> str:
    """List filenames in the specified directory and return as JSON.
    
    Args:
        base_dir: base directory
        recursive: Whether to search recursively in subdirectories
        
    Returns:
        JSON string with the structure:
        {
            "status": "success" or "error",
            "message": Optional message (especially for errors),
            "data": {
                "directory": base directory path,
                "recursive": boolean flag,
                "files": list of files
            }
        }
    """
    try:
        dir_path = Path(base_dir)
        if not dir_path.exists() or not dir_path.is_dir():
            return json.dumps({
                "status": "error",
                "message": f"Directory {dir_path} does not exist or is not a directory.",
                "data": None
            }, indent=2)
        
        files = []
        
        # Helper function to collect files recursively
        def collect_files(path, current_dir=""):
            for f in path.iterdir():
                if f.is_file() and not f.name.startswith('.'):
                    if current_dir:
                        files.append(f"{current_dir}/{f.name}")
                    else:
                        files.append(f.name)
                elif f.is_dir() and recursive:
                    new_dir = f"{current_dir}/{f.name}" if current_dir else f.name
                    collect_files(f, new_dir)
        
        collect_files(dir_path)
        
        # Sort files alphabetically
        files.sort()
        
        # Create result in JSON format
        result = {
            "status": "success",
            "message": "Files retrieved successfully" if files else "No files found",
            "data": {
                "directory": str(dir_path.absolute()),
                "recursive": recursive,
                "files": files
            }
        }
            
        return json.dumps(result, indent=2)
    except Exception as e:
        return json.dumps({
            "status": "error",
            "message": f"Error listing files: {str(e)}",
            "data": None
        }, indent=2)

@mcp.tool()
def search_files(pattern: str, base_dir: str = "", recursive: bool = True) -> str:
    """Search for filenames matching the given pattern with wildcard support and return as JSON.
    
    Args:
        pattern: Search pattern (supports wildcards: *, ?, [seq], [!seq])
        base_dir: Base directory path
        recursive: Whether to search recursively in subdirectories
        
    Returns:
        JSON string with the structure:
        {
            "status": "success" or "error",
            "message": Optional message (especially for errors),
            "data": {
                "pattern": search pattern,
                "directory": base directory path,
                "recursive": boolean flag,
                "files": list of matching files
            }
        }
    """
    try:
        # ディレクトリパスを変換
        dir_path = Path(base_dir)
            
        if not dir_path.exists() or not dir_path.is_dir():
            return json.dumps({
                "status": "error",
                "message": f"Directory {dir_path} does not exist or is not a directory.",
                "data": None
            }, indent=2)
        
        matching_files = []
        
        # Helper function to search files recursively
        def search_in_dir(path, current_dir=""):
            for f in path.iterdir():
                if f.is_file() and not f.name.startswith('.'):
                    # fnmatchを使用してワイルドカードパターンとマッチング
                    if fnmatch.fnmatch(f.name.lower(), pattern.lower()):
                        if current_dir:
                            matching_files.append(f"{current_dir}/{f.name}")
                        else:
                            matching_files.append(f.name)
                elif f.is_dir() and recursive:
                    new_dir = f"{current_dir}/{f.name}" if current_dir else f.name
                    search_in_dir(f, new_dir)
        
        search_in_dir(dir_path)
        
        # Sort files alphabetically
        matching_files.sort()
        
        # 結果のベースディレクトリを設定
        base_for_results = str(dir_path.absolute())
        result_files = matching_files
        
        # Create result in JSON format
        message = "Files found successfully" if matching_files else f"No files matching '{pattern}' found"
        result = {
            "status": "success",
            "message": message,
            "data": {
                "pattern": pattern,
                "directory": base_for_results,
                "recursive": recursive,
                "files": result_files
            }
        }
            
        return json.dumps(result, indent=2)
    except Exception as e:
        return json.dumps({
            "status": "error",
            "message": f"Error searching files: {str(e)}",
            "data": None
        }, indent=2)

if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport='stdio')

jsonフォーマットで返すようにしていますがSmolAgentsで使うことに限定するならPythonリストを返すように書いた方がうまくいきます。そのスクリプトは記事の最後に載せておきます。

Dockerfile

FROM python:3.12-bullseye

# 必要な依存関係のインストールと環境設定
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        build-essential net-tools

RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir 'smolagents[openai,gradio,mcp]' uv

RUN apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# プロジェクト設定
WORKDIR /app
COPY filesystem.py /app/mcp-project/

# 仮想環境セットアップ
RUN uv init mcp-project && \
    cd mcp-project && \
    uv venv && \
    uv add 'mcp[cli]'

# 起動設定
CMD ["python", "-c", "print('Container ready')"]

ポイントは以下の4点です。

  • uvをインストールする
  • 仮想環境を作る
  • mcp[cli]をインストールする
  • 関数を定義したファイルをコピーする
docker build --force-rm=true -t agent-sandbox .

Dockerコンテナをサンドボックスとして使うためのファイル

「sandbox.py」という名前にしました。

import docker
import time

class DockerSandbox:
    def __init__(self, image_name="agent-sandbox"):
        self.client = docker.from_env()
        self.container = None
        self.image_name = image_name

    def create_container(self):
        try:
            # コンテナを作成(ポートマッピングを追加)
            self.container = self.client.containers.run(
                self.image_name,
                command="tail -f /dev/null",  # コンテナを実行状態に保つ
                detach=True,
                tty=True,
                extra_hosts={"host.docker.internal": "host-gateway"},
                network_mode="bridge",
                ports={'7860/tcp': 7860},  # Gradioのデフォルトポート
                volumes={
                    "/home/hoge/data": {"bind": "/app/data", "mode": "rw"}
                }
            )
            print(f"コンテナを作成しました (ID: {self.container.id[:8]}...)")
        except Exception as e:
            raise Exception(f"コンテナ作成エラー: {e}")

    def gradio_run(self, code: str) -> None:
        if not self.container:
            self.create_container()
        
        # バックグラウンドでPythonスクリプトを実行
        self.container.exec_run(
            cmd=["python", "-c", code],
            detach=True
        )
        
        # ポート待機確認
        print("Gradioサーバーを起動中...", end="", flush=True)
        max_attempts = 10
        for attempt in range(max_attempts):
            time.sleep(1)
            print(".", end="", flush=True)
            
            # netstatを使用してポートのリスニング状態を確認
            netstat_result = self.container.exec_run(
                cmd=["bash", "-c", "netstat -tulpn 2>/dev/null | grep 7860 || echo ''"]
            )
            
            if netstat_result.output:
                print(" 完了!")
                print("\n✅ Gradioアプリが起動しました")
                print("📊 http://localhost:7860 でアクセスできます")
                return None
        
        print("\n❌ サーバー起動に失敗しました")
        return None
    
    def _safe_decode(self, data, encoding='utf-8', errors='strict'):
        """バイト列か文字列かを判断して適切に処理する"""
        if isinstance(data, bytes):
            return data.decode(encoding, errors=errors)
        return data
            
    def cleanup(self):
        if self.container:
            try:
                self.container.stop()
                self.container.remove()
                print("Container stopped and removed successfully")
            except Exception as e:
                print(f"エラー: {e}")
            finally:
                self.container = None
    
    def get_logs(self):
        """コンテナ内のプロセス状態とログを取得"""
        if not self.container:
            return "コンテナが起動していません"
            
        # プロセス確認
        ps_cmd = "ps aux | grep python | grep -v grep"
        ps_result = self.container.exec_run(cmd=["bash", "-c", ps_cmd])
        ps_output = self._safe_decode(ps_result.output).strip()
        
        # ポート確認
        port_cmd = "netstat -tulpn 2>/dev/null | grep 7860 || echo 'ポートが開いていません'"
        port_result = self.container.exec_run(cmd=["bash", "-c", port_cmd])
        port_output = self._safe_decode(port_result.output).strip()
        
        return f"プロセス状態:\n{ps_output}\n\nポート状態:\n{port_output}"
        
    def exec_command(self, command):
        """コンテナ内でコマンドを実行"""
        if not self.container:
            return "コンテナが起動していません"
        
        result = self.container.exec_run(cmd=["bash", "-c", command])
        return self._safe_decode(result.output, errors='ignore')

SmolAgentを実行する

from sandbox import DockerSandbox

# DockerSandboxのインスタンスを作成
sandbox = DockerSandbox()

agent_code = """
try:
    from smolagents import CodeAgent, OpenAIServerModel, GradioUI, ToolCollection
    from mcp import StdioServerParameters

    model = OpenAIServerModel(
        model_id="gemma-3-12b-it-4bit",
        api_base="http://host.docker.internal:8080",
        api_key="EMPTY"
    )

    server_parameters = StdioServerParameters(
        command="/usr/local/bin/uv",
        args=[
            "--directory",
            "/app/mcp-project",
            "run",
            "filesystem.py"
        ]
    )

    with ToolCollection.from_mcp(server_parameters, trust_remote_code=True) as tool_collection:
        agent = CodeAgent(
            model=model,
            tools=[*tool_collection.tools],
            additional_authorized_imports=["PIL", "os"]
        )

        # エージェントの実行
        GradioUI(agent).launch(server_name='0.0.0.0', server_port=7860, share=False)
except Exception as e:
    print(f"エラーが発生しました: {str(e)}")
    with open('/tmp/error.log', 'w') as f:
        f.write(f"スタートアップエラー: {str(e)}\\n")
"""

try:
    # エージェントコンテナに関する情報を確認
    print("\n⚙️ コンテナ環境を確認しています...")
    
    # コンテナを作成して基本的な情報を確認
    sandbox.create_container()
    
    # Pythonとパッケージの確認
    print("\nPython環境:")
    print(sandbox.exec_command("which python || which python3 || echo 'Pythonが見つかりません'"))
    print("\nPythonバージョン:")
    print(sandbox.exec_command("python --version || python3 --version || echo 'バージョン情報を取得できません'"))
    
    # 必要なパッケージの確認
    print("\n必要なパッケージ確認:")
    print(sandbox.exec_command("pip list | grep -E 'smolagents|gradio' || echo 'パッケージが見つかりません'"))
    
    # Gradioアプリを起動
    print("\n🚀 Gradioアプリを起動します...")
    sandbox.gradio_run(agent_code)
    
    # ユーザーが終了するまで待機
    print("\nアプリ実行中... Ctrl+C で終了します")
    
    while True:
        try:
            cmd = input("\n> ")
            if cmd.lower() == "exit" or cmd.lower() == "quit":
                break
            elif cmd.lower() == "status":
                print("\n" + sandbox.get_logs())
            elif cmd.lower() == "exec":
                command = input("実行するコマンド: ")
                print("\n" + sandbox.exec_command(command))
            elif cmd.lower() == "help":
                print("\nコマンド一覧:")
                print("  status - サーバー状態を確認")
                print("  exec   - コンテナ内でコマンドを実行")
                print("  exit   - アプリを終了")
                print("  help   - このヘルプを表示")
            elif cmd.strip() == "":
                pass
            else:
                print(f"不明なコマンド: {cmd}. 'help'と入力してコマンド一覧を表示")
        except KeyboardInterrupt:
            print("\n終了します...")
            break
    
except Exception as e:
    print(f"エラーが発生しました: {e}")
finally:
    # 終了処理
    sandbox.cleanup()

ポイントはここの部分です。

server_parameters = StdioServerParameters(
    command="/usr/local/bin/uv",
    args=[
        "--directory",
        "/app/mcp-project",
        "run",
        "filesystem.py"
    ]
)

上記スクリプトを実行するだけです。名前は「agent_runner.py」としています。

python agent_runner.py

実行するPythonには「docker」ライブラリのインストールが必要です。

Pythonリストを返すスクリプト

from pathlib import Path
import fnmatch
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("file_search")

@mcp.tool()
def list_files(base_dir: str = "", recursive: bool = True) -> list:
    """List filenames in the specified directory and return as Python list.
    
    Args:
        base_dir: base directory
        recursive: Whether to search recursively in subdirectories
        
    Returns:
        List of filenames found in the directory (empty list if directory doesn't exist)
    """
    try:
        dir_path = Path(base_dir)
        if not dir_path.exists() or not dir_path.is_dir():
            return []
        
        files = []
        
        # Helper function to collect files recursively
        def collect_files(path, current_dir=""):
            for f in path.iterdir():
                if f.is_file() and not f.name.startswith('.'):
                    if current_dir:
                        files.append(f"{current_dir}/{f.name}")
                    else:
                        files.append(f.name)
                elif f.is_dir() and recursive:
                    new_dir = f"{current_dir}/{f.name}" if current_dir else f.name
                    collect_files(f, new_dir)
        
        collect_files(dir_path)
        
        # Sort files alphabetically
        files.sort()
        
        return files
    except Exception:
        return []

@mcp.tool()
def search_files(pattern: str, base_dir: str = "", recursive: bool = True) -> list:
    """Search for filenames matching the given pattern with wildcard support and return as a list.
    
    Args:
        pattern: Search pattern (supports wildcards: *, ?, [seq], [!seq])
        base_dir: Base directory path
        recursive: Whether to search recursively in subdirectories
        
    Returns:
        List of matching filenames (empty list if directory doesn't exist or no matches found)
    """
    try:
        # ディレクトリパスを変換
        dir_path = Path(base_dir)
            
        if not dir_path.exists() or not dir_path.is_dir():
            return []
        
        matching_files = []
        
        # Helper function to search files recursively
        def search_in_dir(path, current_dir=""):
            for f in path.iterdir():
                if f.is_file() and not f.name.startswith('.'):
                    # fnmatchを使用してワイルドカードパターンとマッチング
                    if fnmatch.fnmatch(f.name.lower(), pattern.lower()):
                        if current_dir:
                            matching_files.append(f"{current_dir}/{f.name}")
                        else:
                            matching_files.append(f.name)
                elif f.is_dir() and recursive:
                    new_dir = f"{current_dir}/{f.name}" if current_dir else f.name
                    search_in_dir(f, new_dir)
        
        search_in_dir(dir_path)
        
        # Sort files alphabetically
        matching_files.sort()
        
        return matching_files
    except Exception:
        return []

if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport='stdio')

追記

Pythonスクリプトはほぼ全部Claude 3.7 Sonnetが書いています。Dockerfileももちろんそうです。






以上の内容はhttps://touch-sp.hatenablog.com/entry/2025/03/31/130048より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14