Node.jsを理解していくにあたり、Streamの理解は必須になってくるので、Streamのドキュメントを(まずは、さらっと理解程度に)読んでみた。
Streamとは?
Node.jsでストリーミングデータを操作するためのインターフェースのこと。インターフェースなので、ストリームという実装があるわけではなく、概念が決まっているだけ。
その概念は色々なもの( fs, process.stdin/stdout/stderrなど, HTTP request on the server/response on the client)で使用される。
ストリームには、以下4種類のインターフェースが定義されている。
- Writable: データを書き込むことができるストリーム
- Readable: データを読み取ることができるストリーム
- Duplex: ReadableとWritableの両方ができるストリーム
- Transform: Duplexデータ読み込みおよび書き込み時に、データの変更、変換ができるストリーム
オブジェクトモード
Node.js APIによって作成されたすべてのストリームは、文字列または、Bufferオブジェクトでのみ動作する。
バッファリング
WritableとReadable共にデータを内部バッファに保存する仕組みを持つ。
理由としては、一度に大量のデータを読み込み、または書き込みを行わないために、予め閾値(highWaterMark)を決めており、その分のデータを内部バッファに保存するためだそう。
注意点として、highWaterMarkは閾値であり、制限ではないとのこと。「一般に、厳密なメモリ制限は適用されません」とのこと。
EventEmitter
ストリームは、EventEmitterを継承している。EventEmitterとはイベント駆動でプログラムを動かすライブラリのこと。
たとえば以下の場合、 .onで、イベント名と実行内容を定義して、 .emit(イベント名)でイベントを発火させて実行する。
import EventEmitter from 'events'
const event = new EventEmitter();
console.log('1')
event.on('event', () => { // `.on`でイベントと実行内容を定義
console.log('2')
})
console.log('3')
event.emit('event') // `.emit`でイベントを発火。結果は、132となる
ストリームもこのEventEmitterを継承しているため、動かし方も上記のような方法になる。
import fs from "fs"
const readStream = fs.createReadStream("assets/sample1.txt", { encoding: "utf-8" })
readStream.on("data", chunk => console.log(chunk)); // `.on`でイベント名と実行内容を定義
ストリームの種類とイベント・API
Writable: 書き込み
用意されているイベント: close, drain, error, finish, pipe, unpipe
Readable: 読み込み
用意されているイベント: close, data, end, error, pause, readable, resume
Duplex & Transform: 書き込み、読み込み、変換
用意されているイベント: WritableとReadableの両方
stream API
ストリームを実装できるように、同期版APIが用意されている。
stream/promises API
ストリームを実装できるように、非同期版APIが用意されている。(ぇ、そうなの?)
感覚を掴んでみる
Node.jsの Fyle System (fs) モジュールは、Streamの仕組みが使われている。
createReadStreamやcreateWriteStreamを使えば、Streamを生成できるので、 createReadStream を使ってStreamの感覚を掴んでみることにする。
import * as fs from "fs"
// こうすることで、28byteずつ読み込む
const readStream = fs.createReadStream("assets/sample1.txt", {
encoding: "utf-8",
highWaterMark: 28
});
// 28byteずつ"data"イベントが発火する
readStream.on("data", chunk => console.log(chunk))
// すべての読み込みが完了したときにイベントが発火する
readStream.on("end", () => console.log('すべての読み込みが完了しました。'))
Streamについて、調べていると以下のような事を仰っている記事に辿り着いた。
Promise におけるフロー制御に Stream を導入することが難しい
なるほど。これを読むと、たしかにStreamとPromiseの相性の悪さがわかります。
"data"イベントを使用するのではなくfor awaitを使って制御するのが良さそうですね。以後、参考にさせていただきますm(= =)m
import * as fs from "fs"
// こうすることで、28byteずつ読み込む
const readStream = fs.createReadStream("assets/sample1.txt", {
encoding: "utf-8",
highWaterMark: 28
});
// すべての読み込みが完了したときにイベントが発火する
readStream.on("end", () => console.log('すべての読み込みが完了しました。'))
// async/awaitを使用する
async function main() {
for await ( const chunk of readStream ) {
console.log(chunk)
}
}
main()
まとめ
というわけで、今回は以下を学んだ。
そもそもStreamを学びたいと思った理由は、AWS S3のファイルを取得した際、型がStreamだったので「やべぇ・・・わからん!」となったのがキッカケ。 とりあえず、それを解決する程度には理解できたと思う。。。
- Streamはインターフェースであり、Node.jsの様々なモジュールで使用されている。
- StreamはEventEmitterであり、イベント駆動型でPromiseでのフロー制御には気をつけること。