これは、なにをしたくて書いたもの?
前に、Rustのasync-awaitについてメモしていました。
Rustのasync-awaitに関するドキュメントなどのメモ - CLOVER🍀
Rustではサーバーサイドのアプリケーションを書く時には非同期処理が出てくることが多いようで、慣れておいた方がよさそうです。
またRustで非同期処理を扱おうと思うとなにかしら導入しなくてはならないらしく、tokioというものを試していこうと思います。
tokio
tokioはRustの非同期ランタイムです。
Tokio is an asynchronous runtime for the Rust programming language.
Tokio - An asynchronous Rust runtime
tokioのトップページを見ると、特徴は以下のように書かれています。
- リライアブル(信頼性がある)
- 高速
- マルチスレッドのワークステアリングスケジューラーを提供
- 簡単
- async-awaitを使った非同期アプリケーション作成の複雑さを軽減
- フレキシブル
- すぐに使えるデフォルト設定が付属しているが、様々なユースケースに合わせて微調整することも可能
Tokio - An asynchronous Rust runtime
さらに下の方を見ると、以下のようなことも特徴として書かれています。
- 本番環境ですぐに使うために必要なものが揃っている
- IO、タイマー、ファイルシステム、同期、およびスケジュール機能を含む基盤を提供
- HTTP 1と2をサポート
- gRPCをサポート
- 信頼性の高いクライアントとサーバーを構築するためのモジュールコンポーネントを提供
- 再試行、負荷分散、フィルタリング、レートリミットなどが含まれる
- OSのイベントIO APIをベースにした最小限のポータブルAPI
- 構造化されたイベントベースのデータ収集とロギング機能を提供
- バイト配列を操作するための豊富なユーティリティ
ドキュメントはこちらです。
Tutorial | Tokio - An asynchronous Rust runtime
概要ページにはここまで見てきたことが書かれていますが、tokioを使わない方がよいケースについても書かれています。
- CPUバウンドな処理で並列性を求める場合
- tokioはIOバウンドなアプリケーション向けに設計されている
- このようなケースではRayonを使った方がよい(組み合わせることも可能)
- 大量のファイルの読み取り
- OSが一般に非同期ファイルAPIを提供していないため
- 単一のWebリクエストを送信する場合
Tutorial / When not to use Tokio
使用するライブラリーが同期APIを提供しない場合は、以下のページを参考にするとよいみたいです。
Bridging with sync code | Tokio - An asynchronous Rust runtime
APIドキュメントはこちら。
では、ドキュメントのページのいくつかを見ながら、TCP Echoサーバー/クライアントをお題にまずはtokioを使ってみましょう。
前に書いたTCP Echoサーバー/クライアントの内容を引き継ぎつつ、
RustでTCP Echoサーバー/クライアントを書いてみる - CLOVER🍀
このあたりを参考に書いていこうと思います。
Hello Tokio | Tokio - An asynchronous Rust runtime
Spawning | Tokio - An asynchronous Rust runtime
I/O | Tokio - An asynchronous Rust runtime
環境
今回の環境はこちら。
$ rustup --version rustup 1.27.1 (54dd3d00f 2024-04-24) info: This is the version for the rustup toolchain manager, not the rustc compiler. info: The currently active `rustc` version is `rustc 1.84.0 (9fc6b4312 2025-01-07)`
準備
Cargoパッケージの作成。
$ cargo new --vcs none tokio-tcp-echo $ cd tokio-tcp-echo
tokioのインストール。
$ cargo add tokio --features full
フィーチャーにfullを付けていますが、これはtokioの機能をすべて含むものだそうです。
実際に使う時には必要なフィーチャーに絞った方が良さそうですが、ひとまずtokioのチュートリアルではfullが前提になっています。
指定できるフィーチャーはこちら。
あとはログ出力用にlog、env_logger、chronoクレートを追加。
$ cargo add log env_logger chrono
Cargo.toml
[package]
name = "tokio-tcp-echo"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.39"
env_logger = "0.11.6"
log = "0.4.25"
tokio = { version = "1.43.0", features = ["full"] }
サーバーとクライアントそれぞれをバイナリークレートとして扱うことにします。main.rsは要らないので削除。
$ rm src/main.rs
これで準備ができました。
バイナリークレートで使用するライブラリークレートを作成する
元ネタと同じように、サーバー/クライアントのバイナリークレートから使うコードは、ライブラリークレートとして
作成することにします。
ひとまず完成形はこちら。
src/lib.rs
use log::{error, info}; use tokio::{ io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, Result}, net::{TcpListener, TcpStream}, spawn, }; pub async fn start_server(address: &str, port: i32) -> Result<()> { let listener = TcpListener::bind(format!("{}:{}", address, port)).await?; info!("tcp echo server[{}:{}] startup.", address, port); loop { let (stream, address) = listener.accept().await?; info!("accept[{}]", address); spawn(async move { let result = reply(stream).await; match result { Ok(_) => {} Err(error) => { error!("{}", error) } } }); } } async fn reply(mut stream: TcpStream) -> Result<()> { let mut reader = BufReader::new(&mut stream); let mut line = String::new(); reader.read_line(&mut line).await?; let message = line.trim(); info!("received message = {}", message); let response_message = format!("★★★{}★★★", message); stream.write_all(response_message.as_bytes()).await?; stream.flush().await?; Ok(()) } pub struct Client { stream: TcpStream, } impl Client { pub async fn send(self, message: &str) -> Result<String> { let (mut rd, mut wr) = split(self.stream); let msg = message.to_string(); let _ = spawn(async move { wr.write_all(msg.as_bytes()).await?; wr.write_all("\r\n".as_bytes()).await?; wr.flush().await?; Ok::<_, tokio::io::Error>(()) }) .await?; let mut reader = BufReader::new(&mut rd); let mut line = String::new(); reader.read_line(&mut line).await?; let received_message = line.trim(); Ok(String::from(received_message)) } } pub async fn connect(address: &str, port: i32) -> Result<Client> { let stream = TcpStream::connect(format!("{}:{}", address, port)).await?; Ok(Client { stream }) }
パッと見、useで指定しているモジュール名こそ違うものの、Rustの標準ライブラリーを使って作ったものとかなり似た感じに
なっています。
それもそのはずで、tokioの型はRustの標準ライブラリー(同期型)と同じ名前が付けられているからですね。
違いは非同期(async fn)であることです。
Many of Tokio's types are named the same as their synchronous equivalent in the Rust standard library. When it makes sense, Tokio exposes the same APIs as std but using async fn.
サーバー側
まずはサーバー側から見ていきましょう。tokioの型はRustの標準ライブラリーと同じ名前が付けられているという話だったので、
TCPソケットをリッスンするのもTcpListenerです。
pub async fn start_server(address: &str, port: i32) -> Result<()> { let listener = TcpListener::bind(format!("{}:{}", address, port)).await?; info!("tcp echo server[{}:{}] startup.", address, port);
この関数はサーバー側のクレートから呼び出すエントリーポイントになりますが、async fnとして定義します。
Resultが初めて使う形なのですが、これはstd::result::Resultではなくstd::io::Resultです。そしてtokioがstd::io::Resultを
再エクスポートしてtokio::io::Resultとして使えるようにしています。
これはstd::result::Resultを少し簡単に使えるようにしたtypeですね。
pub type Result<T> = Result<T, Error>;
クライアントからメッセージを読み出し、メッセージを返すところも関数がasync fnとなり、各関数の呼び出しにawait?が
付いている以外は標準ライブラリーとほぼ同じです。
async fn reply(mut stream: TcpStream) -> Result<()> { let mut reader = BufReader::new(&mut stream); let mut line = String::new(); reader.read_line(&mut line).await?; let message = line.trim(); info!("received message = {}", message); let response_message = format!("★★★{}★★★", message); stream.write_all(response_message.as_bytes()).await?; stream.flush().await?; Ok(()) }
async fnは、std::future::Futureを返す関数です。そして実際に操作を実行して戻り値を得るには、awaitを使います。
Hello Tokio / Compile-time green-threading
Hello Tokio / Using async/await
関数の戻り値にはResultを使っています。
pub async fn start_server(address: &str, port: i32) -> Result<()> {
これはtokioのドキュメントの例に習っているものですが、useしているのはtokio::io::Resultです。
use tokio::{ io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, Result}, net::{TcpListener, TcpStream}, spawn, };
これはstd::io::Resultを再エクスポートしたものです。
そしてstd::io::Resultはstd::result::Resultの型エイリアス(type)です。
pub type Result<T> = Result<T, Error>;
つまりErrorを省略できるという感じですね。
クライアントからの接続を扱う部分を見てみます。
loop { let (stream, address) = listener.accept().await?; info!("accept[{}]", address); spawn(async move { let result = reply(stream).await; match result { Ok(_) => {} Err(error) => { error!("{}", error) } } }); }
loopでループを繰り返していますが、特徴的なのはspawnとasync modeですね。
tokio::spawnは非同期タスクを開始するtokioの関数です。
Spawning / Concurrency / Tasks
tokioのタスクは非同期グリーンスレッドで、asyncブロックをtokio::spawn関数に渡すことで作成します。
tokio::spawnタスクはJoinHandleを返し、また非同期タスクは戻り値を持つこともできます。
A Tokio task is an asynchronous green thread. They are created by passing an async block to tokio::spawn. The tokio::spawn function returns a JoinHandle, which the caller may use to interact with the spawned task. The async block may have a return value.
非同期タスクがどのスレッドで実行されるかはスケジューラーによって決まり、タスクを生成したスレッドと同じスレッドで
実行されることもあれば、別のスレッドで実行されることもあり、さらにスレッド間を移動することもあります。
Tasks are the unit of execution managed by the scheduler. Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do. The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread. The task can also be moved between threads after being spawned.
Spawning / Concurrency / Tasks
tokioのタスクは非常に軽量で、内部的には1度のアロケーションと64バイトのメモリーを必要とします。アプリケーションは
数千、数百万のタスクを生成できます。
Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.
tokioで非同期タスクを生成する場合、その有効期間は'staticである必要があります。つまり、タスク外部で所有されている
データへの参照を含めることができません。
When you spawn a task on the Tokio runtime, its type's lifetime must be 'static. This means that the spawned task must not contain any references to data owned outside the task.
Spawning / Concurrency / 'static bound
ここでasync moveを使うと、タスクの外部で生成された変数の所有権を非同期タスク側に移動できるようになります。
spawn(async move { let result = reply(stream).await;
Changing line 7 to task::spawn(async move { will instruct the compiler to move v into the spawned task. Now, the task owns all of its data, making it 'static.
ちなみに、単一のデータに複数のタスクから同時にアクセスするような場合は、Arcというものを使うようです。
If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.
非同期タスクは、Sendトレイトを実装している必要があり、これでtokioはawaitで中断されているタスクをスレッド間で
移動できます。
Tasks spawned by tokio::spawn must implement Send. This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await.
Spawning / Concurrency / Send bound
クライアント側
続いてはクライアント側です。
今回も構造体を用意しました。TcpStreamはtokioのものです。
pub struct Client { stream: TcpStream, }
サーバーにメッセージを送信するメソッド定義。
impl Client { pub async fn send(self, message: &str) -> Result<String> { let (mut rd, mut wr) = split(self.stream); let msg = message.to_string(); let _ = spawn(async move { wr.write_all(msg.as_bytes()).await?; wr.write_all("\r\n".as_bytes()).await?; wr.flush().await?; Ok::<_, tokio::io::Error>(()) }) .await?; let mut reader = BufReader::new(&mut rd); let mut line = String::new(); reader.read_line(&mut line).await?; let received_message = line.trim(); Ok(String::from(received_message)) } }
渡されたメッセージを素直にTcpStreamを使って書き込み、その後で読み出せばいいのですが、今回はここをマネて
書き込みと読み込みを分割することにしました。
I/O / Echo server / Splitting a reader + writer
ドキュメントのマネになっていますが、tokio::io::splitでTcpStreamを`tokio::io::AsyncReadとtokio::io::AsyncWriteに
分割します。
let (mut rd, mut wr) = split(self.stream);
AsyncWrite in tokio::io - Rust
まずは書き込みを非同期タスクで行います。
let _ = spawn(async move { wr.write_all(msg.as_bytes()).await?; wr.write_all("\r\n".as_bytes()).await?; wr.flush().await?; Ok::<_, tokio::io::Error>(()) }) .await?;
ここでasync moveを使い、AsyncWriteの所有権を非同期タスクに移しています。
書き込みをawaitで待った後、AsyncReadを使ってサーバーから返ってくる結果を読み出します。
let mut reader = BufReader::new(&mut rd); let mut line = String::new(); reader.read_line(&mut line).await?; let received_message = line.trim(); Ok(String::from(received_message))
ここでtokio::io::splitを使わずにTcpStreamをそのまま非同期タスクに渡してしまうと、所有権が非同期タスクに移って
しまうのでその後のメッセージの読み出しで困ったことになります。
このような用途のためにTcpStreamを読み込みと書き込みで分割できるようにしているようです。
この説明は、tokio::io::copyを使う時に読み込み元も書き込み先もTcpStreamとなってしまう問題を回避する例として
書かれています。
As seen earlier, this utility function takes a reader and a writer and copies data from one to the other. However, we only have a single TcpStream. This single value implements both AsyncRead and AsyncWrite. Because io::copy requires &mut for both the reader and the writer, the socket cannot be used for both arguments.
I/O / Echo server / Using io::copy()
残りは、TcpStream::connectで接続を確立するコードです。
pub async fn connect(address: &str, port: i32) -> Result<Client> { let stream = TcpStream::connect(format!("{}:{}", address, port)).await?; Ok(Client { stream }) }
サーバー側のバイナリークレートを作成する
では、サーバー側のバイナリークレートを作成します。src/bin配下に作成します。
src/bin/server.rs
use std::env::args; use std::io::Write; use std::thread; use chrono::Local; use tokio::io::Result; use tokio_tcp_echo::start_server; #[tokio::main] async fn main() -> Result<()> { env_logger::builder() .format(|buf, record| { writeln!( buf, "[{} {} server] {} - {}", Local::now().format("%Y-%m-%d %H:%M:%S"), record.level(), thread::current().name().unwrap(), record.args() ) }) .init(); let address = args().nth(1).unwrap(); let port = args().nth(2).unwrap().parse::<i32>().unwrap(); start_server(&address, port).await?; Ok(()) }
コマンドライン引数は、バインドするアドレスとリッスンポートです。またenv_loggerの初期化の際に、スレッド名を
ログの内容に含めるようにしています。
1番のポイントは、main関数の書き方ですね。
#[tokio::main] async fn main() -> Result<()> {
#[tokio::main]属性を付けます。これはマクロで、#[tokio::main]を使うことで以下のmain関数を書くと
#[tokio::main] async fn main() { println!("hello"); }
以下のように変換されるようです。
fn main() { let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { println!("hello"); }) }
Hello Tokio / Breaking it down / Async main function
起動してみましょう。
$ RUST_LOG=info cargo run --bin server localhost 5000
確認。
$ echo hello | nc localhost 5000 ★★★hello★★★ $ echo こんにちは、世界 | nc localhost 5000 ★★★こんにちは、世界★★★
OKですね。
この時のログを見ると、非同期タスクはtokioのスレッドで動作していることが確認できます。
[2025-01-25 14:39:13 INFO server] main - accept[127.0.0.1:46604] [2025-01-25 14:39:13 INFO server] tokio-runtime-worker - received message = hello [2025-01-25 14:39:14 INFO server] main - accept[127.0.0.1:46620] [2025-01-25 14:39:14 INFO server] tokio-runtime-worker - received message = こんにちは、世界
クライアント側のバイナリークレートを作成する
最後はクライアント側のバイナリークレートを作成します。
src/bin/client.rs
use std::env::args; use log::info; use tokio::io::Result; use tokio_tcp_echo::connect; #[tokio::main] async fn main() -> Result<()> { env_logger::init(); let address = args().nth(1).unwrap(); let port = args().nth(2).unwrap().parse::<i32>().unwrap(); let message = args().nth(3).unwrap(); let client = connect(&address, port).await?; info!("connected tcp server[{}:{}]", &address, port); info!("send message = {}", message); let received_message = client.send(&message).await?; info!("received message = {}", received_message); info!("disconnect"); Ok(()) }
引数は3つで、接続先のアドレス、ポート、そして送信するメッセージです。
特徴的なものはあまりないので、そのまま実行
$ RUST_LOG=info cargo run --bin client localhost 5000 hello
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/client localhost 5000 hello`
[2025-01-25T05:41:55Z INFO client] connected tcp server[localhost:5000]
[2025-01-25T05:41:55Z INFO client] send message = hello
[2025-01-25T05:41:55Z INFO client] received message = ★★★hello★★★
[2025-01-25T05:41:55Z INFO client] disconnect
$ RUST_LOG=info cargo run --bin client localhost 5000 こんにちは、世界
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/client localhost 5000 'こんにちは、世界'`
[2025-01-25T05:42:13Z INFO client] connected tcp server[localhost:5000]
[2025-01-25T05:42:13Z INFO client] send message = こんにちは、世界
[2025-01-25T05:42:13Z INFO client] received message = ★★★こんにちは、世界★★★
[2025-01-25T05:42:13Z INFO client] disconnect
OKですね。
おわりに
前のエントリーの焼き直しですが、tokioを使ってTCP Echoサーバー/クライアントを書いてみました。
async、await自体はそうハマらなかったのですが、tokioの型の書き方でハマったり、所有権の移動のところで
よくわからなくなったり、Echoクライアントの書き方をコロッと忘れていたりしてけっこう苦労しました。
まずは基本的なところから確認しておいてよかったですが、ちょっとずつ慣れていかないとですね。