KEELチーム の相原です。
前回のエントリは「小さい経路最適化ミドルウェアを実装してあらゆるAZ間通信を削減する」でした。
今回は、MCPサーバを比較的安全に動かすために色々やってた話を書きたいと思います。
MCPについて
MCPはModel Context Protocolの略で、Anthropicが標準化を主導するLLMとその外部を繋ぐプロトコルです。
これによりGitHubやPlaywrightといった外部のツールをLLMが自律的に利用できるようになり、OpenAIもサポートを表明したことから大きな注目を集めています。
要は標準化されたFunction Callingということになるでしょう。
我々KEELチームでは2年ほど前から無限にスケールする汎用AI(仮)を開発してきており、これは内部でFunction Callingするエージェントを複数協調させて動かすことであらゆるタスクを解決することを狙っています。
Function Calling(MCPサーバ)を横に大量に並べて使おうとするとトークン消費が無視できなくなるはずで、当時我々はそれを解決するためにマルチエージェントとして複数のエージェントを協調させることを選びました。 (最近の A2A の動きを見ていても、まだこの設計でやっていけそうだなと感じています)
また、エージェントを簡単に開発できるフレームワークも提供することで、社内のContributorが自由に社内システムとの連携を実装できるようにもなっています。
ということもあり、我々目線ではMCPの登場は「コミュニティが成熟すればFunction Calling実装の手間が省けるな~」くらいの受け止め方だったわけですが、今となっては職種問わず多くの人がローカルでMCPサーバを動かそうとし、社内システムのMCPサーバも開発されるようになってきました。
そろそろMCPが無視できなくなってきており、(本業はマルチテナントなKubernetesクラスタの開発ではありつつ)プラットフォーマーとして動くことにしました。
MCPサーバのリスク
2025年5月現在のよくあるMCPサーバの実行方法は npx や docker run 経由のものだと理解しています。
Docker for Desktop有償化とかCPU Virtualizationが必要であることを考えると、職種問わず利用者が多いのは npx でしょうか。
多くの場合どちらも素朴に最新のMCPサーバの実装を取得してきて、実行ユーザの権限でstdio Transport経由でMCPホストが利用します。
そしてその場合、悪意のあるMCPサーバを誤って実行してしまうと、踏み台として利用されたり npx 経由での実行では環境変数やファイルを奪われてしまうというリスクがあります。
最近ではMCPホストのUIからは見えない形でLLMに対して悪意のある指示をする攻撃手法が発見されるなど、MCPサーバを対象にした攻撃は巧妙化しつつあり注意が必要です。(それでも私たちはAuto-approveを使ってしまうわけですが)
とりあえずバージョンは固定したいですし、バージョンがプログラムの中身を一意に識別するものでないことを考えると、最低限 docker run ではDigestでの参照や、他もChecksumまでやれると理想でしょう。
ただそれだけではまだ不十分だと感じていて、欲を言えばWebAssemblyやDenoのようなサンドボックス上で明示的にCapabilityを与えた上で実行したいところです。
一方でこれらはMCPサーバの実装に依存する部分が多くあり、その中で比較的安全にMCPサーバを運用できないかと色々やってきました。
なるべくローカルで動かさない
そもそも各々がローカルで大量のMCPサーバを実行することが会社的には少し厳しさがあると感じています。
もちろんPlaywrightのMCPサーバのように手元のデスクトップ環境を操作したい場合はローカルで動かす必要がありますが、社内システム用のMCPサーバやOrganizationレベルで権限を持ったGitHubなどのMCPサーバは、社内共通で一つ動かしておいてそれを各々が参照するだけでよい可能性があります。
MCPにはいくつかのTransport実装があり、よく使われる(?)stdio TransportはMCPホストが直接コマンドとしてMCPサーバを実行し、その名の通りstdioを介してMCPサーバとやり取りをします。
一方で、Server Sent Eventsを利用したTransport実装やそれを改良したStreamable HTTP Transport と呼ばれるHTTPに載ったものもあり、これらはリモートにあるMCPサーバを利用可能です。
そのため、LIFULLではこれらのTransportを実装したMCPサーバはまとめて前述のマルチテナントなKubernetesクラスタで運用して提供するようにしています。
apiVersion: apps/v1 kind: Deployment spec: template: spec: automountServiceAccountToken: false securityContext: fsGroup: 65532 seccompProfile: type: RuntimeDefault containers: - name: mcp securityContext: privileged: false allowPrivilegeEscalation: false capabilities: drop: - ALL readOnlyRootFilesystem: true runAsUser: 65532 runAsNonRoot: true seccompProfile: type: RuntimeDefault <snip>
Kubernetesのベストプラクティス(Pod Security Standards)に沿って厳格な権限で動かしています。
明示的に許可しない限り、MCPサーバはrootfsへの書き込み権限すら与えられません。
また、LIFULLではすべてのコンテナにIstioのプロキシを入れることを義務付けており、MCPサーバにも同様にIstioのプロキシを入れて、許可した接続先以外にはリクエストできないようにしています。
apiVersion: networking.istio.io/v1beta1 kind: Sidecar spec: egress: - captureMode: DEFAULT hosts: - istio-system/istiod.istio-system.svc.cluster.local - istio-system/otel-agent.istio-system.svc.cluster.local outboundTrafficPolicy: mode: REGISTRY_ONLY <snip>
これにより、ローカルで動かす必要がないMCPサーバをプラットフォームから提供することで、リソースの効率化とともに比較的安全に運用することができました。
ローカルではせめてDockerで動かす
とはいえ全てのMCPサーバをリモートで動かせるわけではありません。
前述のようにデスクトップ環境に依存するようなMCPサーバはローカルで動かす必要がありますし、Personal Access Tokenを利用せざるを得ないものも同様です。
その際は、Kubernetesで実現していたものと似たような隔離空間をDocker Composeを使って実現することを推奨しています。
# docker-compose.yaml services: tcp-proxy: image: ghcr.io/lifull/keel/proxy:latest command: - https://example.com - --mode - tcp - --address - 0.0.0.0:443 networks: default: {} internal: ipv4_address: 172.16.255.1 my-mcp-server: image: ghcr.io/lifull/keel/my-mcp-server:latest cap_drop: - ALL environment: - PERSONAL_ACCESS_TOKEN=${PERSONAL_ACCESS_TOKEN} extra_hosts: - "example.com:172.16.255.1" networks: internal: {} networks: internal: internal: true ipam: driver: default config: - subnet: 172.16.0.0/16 default: driver: bridge
Dockerはデフォルトでbridgeネットワーク上でコンテナを動かし、bridgeを介してホストのネットワークからインターネットに出ていきます。 そのため、悪意のあるMCPサーバをDockerコンテナとして動かしてしまうとそのまま外にリクエストされてしまうわけです。
そこで、この例ではDocker Compose内に閉じたネットワーク internal を作成し、MCPサーバにはその internal のみを割り当てることでインターネットに疎通できないようにしています。
しかし、ものによってはGitHubのAPIを叩くなどインターネットに疎通する必要があるものもあります。
その際はTCP Proxyを同じく internal ネットワーク内に用意したうえで、そのTCP Proxyのみをbridgeネットワークにも割り当てることで、許可した接続先にのみリクエストできるようにしています。
extra_hosts を利用することでMCPサーバのプログラムを一切変更することなく、安全にTCP Proxyを経由させることが可能です。
いちいち解説するまでもないと思いますが、TCP Proxyの実装はこんな感じです。
func runTCPServer(listener net.Listener, target *url.URL, a *Args) error { shutdown := make(chan struct{}, 1) semaphore := make(chan struct{}, a.MaxConnections) wg := sync.WaitGroup{} go func() { for { local, err := listener.(*net.TCPListener).AcceptTCP() if err != nil { select { case <-shutdown: return default: continue } } semaphore <- struct{}{} wg.Add(1) go func() { defer func() { <-semaphore wg.Done() }() defer local.Close() remoteAddress := target.Host if target.Port() == "" { switch target.Scheme { case "http": remoteAddress = net.JoinHostPort(target.Hostname(), "80") case "https": remoteAddress = net.JoinHostPort(target.Hostname(), "443") } } remote, err := net.DialTimeout("tcp", remoteAddress, 10*time.Second) if err != nil { return } defer remote.Close() c := make(chan struct{}, 2) f := func(c chan struct{}, dst io.Writer, src io.Reader) { _, _ = io.Copy(dst, src) c <- struct{}{} } go f(c, remote, local) go f(c, local, remote) select { case <-c: case <-shutdown: local.CloseWrite() } }() } }() quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGTERM) <-quit time.Sleep(time.Duration(a.Lameduck) * time.Second) close(shutdown) listener.Close() wg.Wait() return nil }
docker-compose.yaml を配布するだけで便利なMCPサーバを社員に行き届かせることができるので一石二鳥ですね。
一つトレードオフとして、この場合MCPサーバは docker compose up でまとめて起動するため、必然的にMCPクライアントからはstdio TransportではなくHTTP Transportを利用することになります。(JetBrainsのJunieはHTTP Transportに未対応だったりします)
無理やりHTTP Transportに対応する
一方で、一つ目のリモートで動かすにせよ二つ目のDocker Composeで動かすにせよ、MCPサーバがHTTP Transportに対応していることが前提となります。
しかし野良のMCPサーバの中にはstdio Transportしか実装していないものがそれなりにあります。
そういったMCPサーバでは上記のアプローチが取れないので工夫が必要です。
そこで、我々KEELチームはstdio Transportにのみ対応しているMCPサーバを無理やりSSE Transportに対応する小さいプロキシを開発しました。
バイナリとして簡単に配布できるようにGoで書いています。
package main import ( "bufio" "context" "encoding/json" "errors" "flag" "fmt" "log" "net" "net/http" "os" "os/exec" "os/signal" "runtime/debug" "sync" "syscall" "time" "github.com/google/uuid" ) type MCPMethod string // https://github.com/modelcontextprotocol/modelcontextprotocol/blob/main/schema/2025-03-26/schema.ts const ( CancelledNotification MCPMethod = "notifications/cancelled" InitializeRequest = "initialize" InitializedNotification = "notifications/initialized" PingRequest = "ping" ProgressNotification = "notifications/progress" ListResourcesRequest = "resources/list" ListResourceTemplatesRequest = "resources/templates/list" ReadResourceRequest = "resources/read" ResourceListChangedNotification = "notifications/resource/list/changed" SubscribeRequest = "resources/subscribe" UnsubscribeRequest = "resources/unsubscribe" ResourceUpdatedNotification = "notifications/resource/updated" ListPromptsRequest = "prompts/list" GetPromptRequest = "prompts/get" PromptListChangedNotification = "notifications/prompt/list/changed" ListToolsRequest = "tools/list" CallToolRequest = "tools/call" ToolListChangedNotification = "notifications/tool/list/changed" ) type MCPMessage struct { JSONRPC string `json:"jsonrpc"` Method MCPMethod `json:"method"` ID any `json:"id,omitempty"` } type session struct { id string responseQueue chan []byte requestQueue chan []byte } func main() { var address string var terminationGracePeriodSeconds int var lameduck int var keepAlive bool var verbose bool flag.StringVar(&address, "address", "0.0.0.0:8080", "") flag.IntVar(&terminationGracePeriodSeconds, "termination-grace-period-seconds", 10, "The duration in seconds the application needs to terminate gracefully") flag.IntVar(&lameduck, "lameduck", 1, "A period that explicitly asks clients to stop sending requests, although the backend task is listening on that port and can provide the service") flag.BoolVar(&keepAlive, "http-keepalive", true, "") flag.BoolVar(&verbose, "verbose", false, "") flag.Parse() args := flag.Args() if len(args) == 0 { log.Fatalf("command not specified") } name := args[0] var arg []string if len(args) > 1 { arg = args[1:] } sessions := &sync.Map{} mux := http.NewServeMux() mux.HandleFunc("GET /sse", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } cmd := exec.Command(name, arg...) stdin, err := cmd.StdinPipe() if err != nil { http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } stdout, err := cmd.StdoutPipe() if err != nil { http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } if err := cmd.Start(); err != nil { http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } s := &session{ id: uuid.New().String(), requestQueue: make(chan []byte, 65536), responseQueue: make(chan []byte, 65536), } sessions.Store(s.id, s) go func() { scanner := bufio.NewScanner(stdout) for scanner.Scan() { s.responseQueue <- scanner.Bytes() } }() defer func() { sessions.Delete(s.id) _ = cmd.Process.Kill() _ = cmd.Wait() close(s.requestQueue) close(s.responseQueue) }() _, _ = fmt.Fprintf(w, "event: endpoint\ndata: %s\n\n", fmt.Sprintf("http://%s/messages?sessionId=%s", r.Host, s.id)) flusher.Flush() for { select { case data := <-s.requestQueue: _, _ = stdin.Write(data) case data := <-s.responseQueue: _, _ = fmt.Fprint(w, fmt.Sprintf("event: message\ndata: %s\n\n", data)) flusher.Flush() case <-r.Context().Done(): return } } }) mux.HandleFunc("POST /messages", func(w http.ResponseWriter, r *http.Request) { id := r.URL.Query().Get("sessionId") if id == "" { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } sany, ok := sessions.Load(id) if !ok { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } s := sany.(*session) var rawMessage json.RawMessage if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } var mcpMessage MCPMessage if err := json.Unmarshal(rawMessage, &mcpMessage); err != nil { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } if verbose { log.Printf("%s: %s", id, mcpMessage.Method) } s.requestQueue <- rawMessage s.requestQueue <- []byte("\n") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) _, _ = w.Write([]byte(http.StatusText(http.StatusAccepted))) }) mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(http.StatusText(http.StatusOK))) }) listener, err := net.Listen("tcp", address) if err != nil { log.Fatalf("failed to listen: %+v", err) } server := &http.Server{ Handler: mux, } server.SetKeepAlivesEnabled(keepAlive) go func() { defer func() { if err := recover(); err != nil { log.Printf("panic: %+v\n%s", err, debug.Stack()) } }() if err := server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("failed to listen: %+v", err) } }() quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGTERM) <-quit time.Sleep(time.Duration(lameduck) * time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(terminationGracePeriodSeconds)*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { log.Fatalf("failed to shutdown: %+v", err) } }
開発当時はStreamable HTTP TransportがなかったのでSSE Transportにのみ対応しています。
それでは実装を軽く解説していきます。
セッションの開始
SSE Transportはこちらの仕様の通り、/sse のようなSSEのエンドポイントで初回のリクエストを受けると、ペイロードを受け取るための別のエンドポイントを event: endpoint として返す必要があります。
mux.HandleFunc("GET /sse", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } <snip> _, _ = fmt.Fprintf(w, "event: endpoint\ndata: %s\n\n", fmt.Sprintf("http://%s/messages?sessionId=%s", r.Host, s.id)) flusher.Flush() <snip> }
これはSSEがサーバからクライアントにメッセージを送信するための一方向の仕組みであるためです。
そのためSSE Transportでは2つのHTTPエンドポイントを実装することで、MCPクライアントとの双方向通信を実現しています。
もう一つのエンドポイントの実装は後述するため、ここではSSEのエンドポイントのみ解説します。
sessions := &sync.Map{}
mux.HandleFunc("GET /sse", func(w http.ResponseWriter, r *http.Request) {
<snip>
s := &session{
id: uuid.New().String(),
requestQueue: make(chan []byte, 65536),
responseQueue: make(chan []byte, 65536),
}
sessions.Store(s.id, s)
<snip>
defer func() {
sessions.Delete(s.id)
<snip>
close(s.requestQueue)
close(s.responseQueue)
}()
<snip>
}
UUIDを発行してセッションを開始しています。
ここで開始されたセッションはMCPクライアントに通知したペイロードを受け取るためのエンドポイントからも利用するため、一つ上のスコープで sessions として保持します。
コマンドの起動
セッションの開始と同時にstdio Transportを実装したMCPサーバの起動もしましょう。
注意点として、stdio Transportで実装されたMCPサーバは内部で状態を持つため、多くの場合セッションごとにMCPサーバを起動する必要があります。
sessions := &sync.Map{}
mux.HandleFunc("GET /sse", func(w http.ResponseWriter, r *http.Request) {
<snip>
cmd := exec.Command(name, arg...)
stdin, err := cmd.StdinPipe()
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
stdout, err := cmd.StdoutPipe()
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
if err := cmd.Start(); err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
s.responseQueue <- scanner.Bytes()
}
}()
defer func() {
_ = cmd.Process.Kill()
_ = cmd.Wait()
<snip>
}()
<snip>
for {
select {
case data := <-s.requestQueue:
_, _ = stdin.Write(data)
case data := <-s.responseQueue:
_, _ = fmt.Fprint(w, fmt.Sprintf("event: message\ndata: %s\n\n", data))
flusher.Flush()
case <-r.Context().Done():
return
}
}
}
stdio TransportのMCPサーバの標準出力を s.responseQueue を介してMCPクライアントに通知し、セッション開始時にMCPクライアントに通知したペイロードを受け取るためのエンドポイントから送信される s.requestQueue から受け取ったリクエストは stdin.Write(data) で標準入力に書き込んでいます。
セッション終了時にコマンドを終了することも忘れないようにしましょう。
ペイロードを受け取るためのエンドポイントの実装
mux.HandleFunc("POST /messages", func(w http.ResponseWriter, r *http.Request) { id := r.URL.Query().Get("sessionId") if id == "" { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } sany, ok := sessions.Load(id) if !ok { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } s := sany.(*session) var rawMessage json.RawMessage if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } var mcpMessage MCPMessage if err := json.Unmarshal(rawMessage, &mcpMessage); err != nil { http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) return } if verbose { log.Printf("%s: %s", id, mcpMessage.Method) } s.requestQueue <- rawMessage s.requestQueue <- []byte("\n") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) _, _ = w.Write([]byte(http.StatusText(http.StatusAccepted))) })
こちらはシンプルで、クライアントに通知したメッセージをもとに sessionId が送られてくるため、そこからセッションを取得し、送られてきたペイロードをそのまま s.requestQueue に流しています。
前述の通り s.requestQueue に送信されたペイロードはセッションごとに起動しているコマンドの標準入力に書き込まれます。
ここまでのシンプルな実装で、stdio Transportのみに対応したMCPサーバを無理やりSSE Transportに対応させることができました。
後はこんな感じで使えばよさそうです。
FROM ghcr.io/lifull/keel/mcp-stdio-proxy:main AS mcp-stdio-proxy FROM ghcr.io/github/github-mcp-server@sha256:fdf04e33b437c523d2f091b7bf8dc3724c88dbf9913d6568f12f0fcf48aaff95 COPY --link --from=mcp-stdio-proxy /usr/local/bin/mcp-stdio-proxy /usr/local/bin/mcp-stdio-proxy ENTRYPOINT ["/usr/local/bin/mcp-stdio-proxy", "/server/github-mcp-server", "stdio"]
まとめ
このようにLIFULLでは比較的安全にMCPサーバを動かしています。
過渡期ゆえの混沌のような気もしますが、ほどほどのハックで秩序を手に入れられた気がします。
SSE Transportには負荷分散がしづらいという問題があり、それを受けてStreamable HTTP Transportが開発されるなど、MCPはまだ進化の途上です。
セキュリティに気を配りながらこれからもMCPを使っていきたいです。
コミュニティの実装も大分増えてきていて、我々が開発する無限にスケールする汎用AI(仮)にもMCPクライアントを実装して、Function CallingとMCPのエコシステムを統合したマルチエージェントで真の汎用AIを目指しています。
プラットフォーマーとしてのLLMにまつわる活動に興味がある方がいれば、是非カジュアル面談させてください!