以下の内容はhttps://let.blog.jp/tag/streamより取得しました。


Node.js の stream とマルチバイト文字列
Node.js で stream を使うとき デフォルトだと Buffer として受け取ります

const stream = require("stream")
const timer = require("timers/promises")

const readable = new stream.Readable({
read() {},
})

readable.on("data", chunk => {
console.log(chunk)
})
readable.on("end", () => {
console.log("END")
})

const push = async () => {
await timer.setTimeout(1000)
readable.push(Buffer.from("abc"))
await timer.setTimeout(1000)
readable.push(Buffer.from("def"))
await timer.setTimeout(1000)
readable.push(null)
}
push()
<Buffer 61 62 63>
<Buffer 64 65 66>
END

文字列が欲しいときは Buffer の toString で変換できます
そういうコードを結構見かけます
データを受け取るたび toString した文字列を結合していって最終的な文字列を作ってたりです
でもこれが安全なのは英語圏だけです
日本語などのマルチバイト文字のときに問題が起きます

たとえば日本語の「あ」「い」「う」は UTF-8 ではこういう 3 バイトで表現されます

あ: 227, 129, 130
い: 227, 129, 132
う: 227, 129, 134

stream では基本的に複数のチャンクに分割されています
分割される場所が 1 文字の途中だった場合はこうなります

const stream = require("stream")
const timer = require("timers/promises")

const readable = new stream.Readable({
read() {},
})

readable.on("data", chunk => {
console.log(chunk, chunk.toString())
})
readable.on("end", () => {
console.log("END")
})

const push = async () => {
await timer.setTimeout(1000)
readable.push(Buffer.from([227, 129, 130, 227])) // い の 1 バイト目まで
await timer.setTimeout(1000)
readable.push(Buffer.from([129, 132, 227, 129, 134])) // い の 2 バイト目から
await timer.setTimeout(1000)
readable.push(null)
}
push()
<Buffer e3 81 82 e3> あ�
<Buffer 81 84 e3 81 86> ��う
END

不正なデータとなり 「い」 のそれぞれのバイトが 1 文字の�とされています
こういうことがあるので全部受け取ってから Buffer を結合後に文字列化したほうが安全です
でも streaming 的に処理したい場合は最後を待ってられないです
こういうときは encoding を utf-8 にしておくと 受け取るデータが最初から文字列になってるだけじゃなくてマルチバイト文字の切れ目もうまく扱ってくれます

const stream = require("stream")
const timer = require("timers/promises")

const readable = new stream.Readable({
encoding: "utf-8", // ←追加
read() {},
})

readable.on("data", chunk => {
console.log(chunk)
})
readable.on("end", () => {
console.log("END")
})

const push = async () => {
await timer.setTimeout(1000)
readable.push(Buffer.from([227, 129, 130, 227])) // い の 1 バイト目まで
await timer.setTimeout(1000)
readable.push(Buffer.from([129, 132, 227, 129, 134])) // い の 2 バイト目から
await timer.setTimeout(1000)
readable.push(null)
}
push()

いう
END
Node.js でファイルを 1 行ずつ読み取る
ビルトインの readline モジュールはユーザーからの入力受付用かと思っていましたが ファイルの読み取りにも使えるみたいです

const fs = require("fs")
const readline = require("readline")

const fn = async () => {
fs.writeFileSync("test.txt", "foo\nbar\nbaz")

const rl = readline.createInterface({ input: fs.createReadStream("test.txt") })
for await (const line of rl) {
console.log({ line })
}
}

fn()
{ line: 'foo' }
{ line: 'bar' }
{ line: 'baz' }

readline で作成した Interface は async iterator を持ってるので for-await-of で使えます

const readline = require("readline")

const rl = readline.createInterface({
input: { on(){}, resume() {} }
})

console.log(rl[Symbol.iterator])
// undefined
console.log(rl[Symbol.asyncIterator])
// [Function (anonymous)]

Node.js 17 から readline に Promise API が追加されています
これは question みたいな関数がコールバックか Promise かの違いで 行ごとに読み取るだけなら 16 まででも使えます
18 以降でも変わりないです

注意しないといけないのがこういうケースです

const fs = require("fs")
const readline = require("readline")

const fn = async () => {
fs.writeFileSync("test.txt", "foo\nbar\nbaz")

const rl = readline.createInterface({ input: fs.createReadStream("test.txt") })

await new Promise(r => setTimeout(r, 100))

console.log(1)
for await (const line of rl) {
console.log({ line })
}
console.log(2)
}

fn()

readline の Interface を作ってから for-await-of で読むまでに非同期処理を挟む場合です
これを実行すると 1 だけが出力されます
for-await-of では待機したままデータなしになって 解決しない Promise という扱いで それ以降実行できるものがなくプロセスが終了します
その結果 2 を出力する console.log にたどり着かないので出力は 1 だけです

内部で stream の resume が呼び出されるので自動で読み進めてしまうみたいです
非同期処理を挟まず for-await-of を実行すると asyncIterator の作成処理で stream のイベントが起きる前にリスナを設定できます
その結果 正常に line イベントなどを受け取れて期待どおりに動作します
しかし 非同期処理を挟むと先にイベントが起きてしまって stream が close されたあとにリスナをつけることになるのでなんのイベントも起きず解決されない Promise になるということみたいです
複数の stream を 1 つへ pipe するとき
2 秒ごとに数字を stdout へ出力する out.js を作って それを 2 つ同時に実行する
出力は 1 つのファイルへ書き込み

const fs = require("fs")
const cp = require("child_process")

fs.writeFileSync(
"out.js",
`
const key = Math.random().toString(36).slice(2, 4)

const setNext = n => {
setTimeout(() => {
console.log(key, n)
if (n < 3) setNext(n + 1)
}, 2000)
}
setNext(0)
`
)

const stream = fs.createWriteStream("out.log")

const s1 = cp.fork("out.js", { stdio: "pipe" })
const s2 = cp.fork("out.js", { stdio: "pipe" })

s1.stdout.pipe(stream)
s2.stdout.pipe(stream)

結果は

zo 0
os 0
zo 1
os 1
zo 2
os 2
zo 3

最後の 3 が 1 つだけ
pipe のデフォルトは readstream が close したときに writestream も閉じるので先に終わったほうが終わったタイミングでスクリプトが終了してしまう
pipe のオプションで end を false にすると自動で閉じられない

s1.stdout.pipe(stream, { end: false })
s2.stdout.pipe(stream, { end: false })

あとから pipe してもいいので 1 つの writestream を使いまわして何度か実行もできる

const fs = require("fs")
const cp = require("child_process")

fs.writeFileSync(
"out.js",
`
const key = Math.random().toString(36).slice(2, 4)

const setNext = n => {
setTimeout(() => {
console.log(key, n)
if (n < 2) setNext(n + 1)
}, 2000)
}
setNext(0)
`
)

const stream = fs.createWriteStream("out.log")

const write = n => {
const s = cp.fork("out.js", { stdio: "pipe" })
s.stdout.pipe(stream, { end: false })
if (n < 3) s.stdout.once("close", write.bind(null, n + 1))
}
write(0)
xi 0
xi 1
xi 2
if 0
if 1
if 2
p9 0
p9 1
p9 2
c0 0
c0 1
c0 2



以上の内容はhttps://let.blog.jp/tag/streamより取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14