M12i.

学術書・マンガ・アニメ・映画の消費活動とプログラミングについて

Node.js Streamのリファレンスを読んでみた(Node.js v7.1.0)

Gulpの使い方を調べる中でNode.jsが提供しているStream APIについて気になったのでリファレンスを読んでみました(Node.js v7.1.0、2016/11/17時点)。Node.jsの日本語サイトにもAPIリファレンスの翻訳があるのですが、英語の原典と比べると記述の違う部分があるのでちょっとバージョンが古いのかも、ということで後者を参照しました。

導入部にもある通りStreamについてのリファレンスはユーザ向けと開発者向けにわかれており、私の現在の関心は「ユーザ向け」だけでしたのでそちらだけ訳出してみました。

          * * *

ストリーム

安定性: 2 - Stable

ストリームはNode.jsにおいてストリーミング・データ〔訳者:定義がはっきりしませんが、JavaC#におけるストリームのようなデータ構造および処理形態を指しているのでしょう。例えばファイルを読み込みながらその完了を待たずに読み込まれたデータを処理していくような。〕とともにはたらく抽象インターフェースです。streamモジュールはストリーム・インターフェースを実装するオブジェクトを容易に構築することを可能にする基本的なAPIを提供しています。

Node.jsでは多くのストリーム・オブジェクトが提供されています。例えば、HTTPサーバに対するリクエストprocess.stdoutはいずれもストリームのインスタンスです。

ストリームは読み込み可能なもの、書き込み可能なもの、そしてそのいずれもが可能なものとがあります。すべてのストリームはEventEmitterインスタンスです。
streamモジュールには次のようにしてアクセス可能です:

const stream = require('stream');

すべてのNode.jsユーザにとってストリームがどのように機能するかを理解することは非常に重要なのですが、streamモジュールそれ自体はストリームを実装する新しい型を作成しようとしている開発者にとって非常に便利なものです。したがってストリーム・オブジェクトを主として消費する側の開発者がstreamモジュールを直接利用する必要があるケースは(仮にそれがあるとしても)稀なケースでしょう。

このドキュメントの構成

このドキュメントは大きく2つのセクションに分かれています。そして第3のセクションは補註です。第1のセクションでは、ストリームAPIの諸要素のうちアプリケーションのなかでストリームを使用するのに必要とされるものについて説明しています。対して、第2のセクションでは、ストリームAPIの諸要素のうちストリームの新しい型を実装するのに必要とされるものについて説明しています。

ストリームの型

Node.jsには4つの基本的なストリームの型が存在します:

  • Readableは読み込み可能なデータのストリームで、例えば fs.createReadStream()メソッドが返すオブジェクトが該当します。
  • Writableは書き込み可能なデータのストリームで、例えばfs.createWriteStream()メソッドが返すオブジェクトが該当します。
  • Duplexは読み込み可能かつ書き込み可能なストリームで、例えばnet.Socketが該当します。
  • Transformはそれを通じて読み書きされたデータに対して変更もしくは変換を加えるストリームで、例えばzlib.createDeflate()メソッドが返すオブジェクトが該当します。
オブジェクト・モード

Node.jsのAPIで作成されるすべてのストリームはもっぱら文字列もしくはBufferオブジェクトを操作するためのものです。しかしもちろんJavaScriptのそれ以外の型とともに働くストリームを実装することは可能です(ただしnullは例外です。この値はストリームでは特別な目的のために使用されています)。そうしたストリームは「オブジェクト・モード」で動作するものと見なされます。

ストリーム・インスタンスはストリーム作成時のobjectModeオプションを使用することでオブジェクト・モードに切り替えられます。一方、すでに出来上がっているストリームをオブジェクト・モードに切り替えるのは危険です。

バッファリング

WritableストリームとReadableストリームはいずれもその内部バッファにデータを格納します。このバッファはそれぞれwritable._writableState.getBuffer()readable._readableState.bufferにより取得することができます。

ストリーム内部にバッファリング可能なデータの総量はコンストラクタのhighWaterMarkオプションにより決まります。通常のストリームでは、highWaterMarkはバッファリング可能な総バイト数を規定します。オブジェクト・モードのストリームでは、highWaterMarkはバッファリング可能な総オブジェクト数を規定します。

Readableストリームではstream.push(chunk)メソッドが呼び出されるとデータがバッファリングされます。そのデータはstream.read()メソッドが呼び出されるまで、ストリームの内部のキューにとどまり続けます。

ひとたび内部読み込みバッファがhighWaterMarkにより定められた閾値に到達すると、それらのデータが消費されるまで、ストリームはそのベースとなるリソースからのデータ読み込みを一時停止します(つまり、読み込みバッファを充填するためのreadable._read()メソッドの内部的な呼び出しをストップします)。

Writableストリームではwritable.write(chunk)メソッドが繰り返し呼び出されることでデータがバッファリングされます。内部書き込みバッファの総量がhighWaterMarkにより定められた閾値を下回っているうちは、writable.write()メソッドはtrueを返します。そして、ひとたび内部バッファのサイズがhighWaterMarkに到達もしくは超過すると、同メソッドはfalseを返します。

streamAPI──とりわけstream.pipe()メソッド──の重要な目標は、スピードの異なる入力元と出力先とが利用可能なメモリを逼迫させることのないよう、バッファリングされるデータを許容可能なレベルに制限することです。

DuplexとTransformは読み込みも書き込みも可能なストリームなので、読み込み用途と書き込み用途のそれぞれのため2つの個別の内部バッファを管理しています。これにより読み込みと書き込みを独立させ、それぞれの側でデータ・フローを適切かつ効率的に処理することが可能になります。例えばnet.SocketインスタンスはDuplexストリームの一種で、読み込み側バッファはソケットから取得されたデータを消費することを可能にし、書き込み側バッファはソケットに対するデータの書き込みを可能にします。データの書き込みはデータの読み取りに比較して速くなったり遅くなったりするので、それぞれの処理(とバッファ)を独立させることが重要なのです。

ストリーム利用者のためのAPI

ほとんどのNode.jsアプリケーションは、例えどんなにシンプルなものでも、何かしらのストリームを利用しています。次のコードはHTTPサーバを実装するNode.jsアプリケーションにおけるストリーム利用の例です:

const http = require('http');

const server = http.createServer( (req, res) => {
  // 引数reqはhttp.IncomingMessageのインスタンスで、Readableストリーム
  // 引数resはhttp.ServerResponseのインスタンスで、Writableストリーム

  let body = '';
  // データをUTF-8の文字列として取得。
  // エンコードをしていしないとBufferオブジェクトは受け取られない。
  req.setEncoding('utf8');

  // リスナーを登録しておくとReadableストリームはdataイベントを発生させる。
  req.on('data', (chunk) => {
    body += chunk;
  });

  // endイベントはリクエスト本文全体の受信が終わったことを示す。
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // ユーザに対して何かしらの応答を返す:
      res.write(typeof data);
      res.end();
    } catch (er) {
      // あー何たること!  おかしなJSONが送られてきた!
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o

Writableストリーム(上記の例ではresが該当)はwrite()end()といったメソッド──ストリームに対してデータを書き出す──を提供しています。

Readableストリームは、データを読み取ることが可能になったとき、アプリケーション・コードに対して通知を行うためEventEmitterAPIを使用しています。ストリームからのデータの読み取りには複数の方法が用意されています。

いずれのストリームも、その目下の状態を伝えるために様々なEventEmitterAPIを利用しています。

DuplexとTransformは前述の2つのストリームが提供するいずれをも提供しています。

アプリケーションはそれがデータを書き出すものであれ読み込むものであれ、ストリーム・インターフェースを直接に実装したりする必要はありませんし、一般にrequire('stream')を呼び出したりする必要もありません。

ストリーム・インターフェースを実装する新しい型をつくろうとしている開発者の方は「ストリーム実装のためのAPI」の節を参照してください。

Writableストリーム

Writableストリームはデータの書き込み先を抽象化するものです。

Writableの例としては次のものがあります:

注意:Writableインターフェースの実装の中には実際にはDuplexの実装例でもあるものもいくつか含まれています。

すべてのWritableストリームはstream.Writableクラスにより定義されたインターフェースを実装します。

Writableストリームの個々の実装はさまざまな面で異なっていますが、すべてのWritableストリームは以下の例に示されるような基本的な利用パターンをサポートしています:

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

●クラス: stream.Writable

追加:バージョン0.9.4

○イベント: 'close'

追加:バージョン0.9.4

'close'イベントはストリームとその背後のリソース(例えばファイル記述子)がクローズされたときに発行されます。このイベントはそのストリームに関して以後いかなるイベントも、いかなる計算処理も発生しないことを示します。
すべてのWritableストリームが'close'イベントを発行するわけではありません。

○イベント:'drain'

追加:バージョン0.9.4

stream.write(chunk)メソッドの呼び出しがfalseを返したあと、当該ストリームへのデータの書き込みを再開すべきタイミングとなったとき、'drain' イベントは発行されます。

// 関数は提供された書き込み可能ストリームに対して100万回のデータ書き込みを行う。
// "背圧"に注意されたし。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i--;
      if (i === 0) {
        // 最後の回!
        writer.write(data, encoding, callback);
      } else {
        // 継続すべきか確認し、結果が否なら待機する
        //まだ処理は完了していないから、コールバックは渡さない。
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // すべての書き込みが終わるまえに停止してしまった!
      // ドレイン(排出)ができたらもう一度書き込みをはじめる
      writer.once('drain', write);
    }
  }
}

○イベント: 'error'

追加:バージョン0.9.4

'error'イベントは書き込み中やパイプ中にエラーが発生した場合に発行されます。リスナー・コールバック関数には単一の引数としてErrorが渡されます。
注意: 'error'イベントが発行されたときストリームはクローズされません。

○イベント: 'finish'

追加:バージョン0.9.4

'finish'イベントはstream.end() メソッドが呼び出され、すべてのデータが背後のシステムでフラッシュされたときに発行れます。

const writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
  writer.write('やあ, #${i}!\n');
}
writer.end('これでおしまい\n');
writer.on('finish', () => {
  console.error('今すべての書き込みが完了。');
});

○イベント: 'pipe'

追加:バージョン0.9.4

  • src <stream.Readable> この書き込み可能ストリームにパイプされている入力元のストリーム

'pipe'イベントは読み込み可能ストリームのstream.pipe()メソッドが呼び出され、その出力先としてこの書き込み可能ストリームが追加されたときに発行されます。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('何かがこのライターにパイプされている!');
  assert.equal(src, reader);
});
reader.pipe(writer);

○イベント: 'unpipe'

追加:バージョン0.9.4

  • src <Readable Stream> この書き込み可能ストリームからアンパイプされた〔unpiped〕入力元のストリーム

'unpipe'イベントはReadableストリームのstream.unpipe()メソッドが呼び出され、この書き込み可能ストリームの出力先から取り除かれたとき発行されます。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('このライターに対するパイプが中断された! ');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

○メソッド: writable.cork()

追加:バージョン0.11.2

writable.cork()メソッドはすべての書き込みデータをメモリ上にバッファリングするよう強制します。バッファされたデータはstream.uncork()もしくはstream.end()メソッドが呼び出されたときにフラッシュされます。

writable.cork()の一番の目的は、内部的なバッファの補助機能を持たないストリームに対して多くの小さなデータのかたまりを書き込むことによって性能面での悪影響が発生するような状況を避けることです。このような状況では、writable._writev()メソッドを持つストリーム実装はより最適化された方法で書き込みのバッファリングを実現することができます。

○メソッド:writable.end([chunk][, encoding][, callback])

追加:バージョン0.9.4

  • chunk <String> | <Buffer> | 追加の書き込みデータ。オブジェクト・モードで動作していないストリームでは、chunkは文字列もしくはBufferである必要があります。オブジェクト・モードのストリームでは、chunknull以外のいかなるJavaScriptの値も指定することが可能です。
  • encoding <String>   chunkがStringである場合、エンコーディング名
  • callback <Function> ストリームが終了したときに呼び出されるコールバック関数

writable.end()メソッドの呼び出しは当該のWritableストリームにもはや書き込むべきデータはないことを知らせます。オプションの引数chunkencoding はストリームをクローズするまえにただちに書き込みすべき最後のデータを指定するものです。オプションの引数としてcallback関数が指定されている場合、それは'finish'イベントのリスナーとして使用されます。

stream.write()メソッドをstream.end()メソッドのあとに呼び出すとエラーが発生します。

// 'hello, 'と書き込んだ後、続いて'world!'とともにendを呼び出す
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// これ以降は何も書き込めない!

○writable.setDefaultEncoding(encoding)

追加:バージョン0.11.15

  • encoding <String> 新しいデフォルト・エンコーディング
  • 戻り値: this

writable.setDefaultEncoding()はWritableストリームにデフォルトのencodingを設定します。

○writable.uncork()

追加:バージョン0.11.2

writable.uncork()メソッドはstream.cork()が呼び出されて以降のバッファリングされているすべてのデータをフラッシュします。

writable.cork()writable.uncork()を使ってストリームへの書き込みのバッファリングを管理する場合、writable.uncork()の呼び出しはprocess.nextTick()による一時的な処理の繰り延べのあとで行うことをおすすめします。そうすることで、特定Node.jsイベント・ループ・フェーズのなかのwritable.write()呼び出しをまとめて実行することができます〔訳注:あきらかに説明が足りない〕。

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

writable.cork()メソッドが複数回呼び出される場合、バッファされたデータをフラッシュするために同じ回数だけwritable.uncork()メソッドも呼び出される必要があります。

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // データは2度目のuncork()までフラッシュされない
  stream.uncork();
});

○writable.write(chunk[, encoding][, callback])

追加:バージョン0.9.4

  • chunk <String> | <Buffer>  書き込むデータ
  • encoding <String> chunkが文字列の場合、エンコーディング
  • callback <Function> 指定したデータのチャンクがフラッシュされたときに実行されるコールバック関数
  • 戻り値: <Boolean> 'drain'イベントが発行されるまでさらなるデータの書き込みは待機すべき場合false。さもなくばtrue

writable.write()はストリームにデータを書き込み、そのデータが処理されきったあとで指定されたcallbackを呼び出します。エラーが発生した場合、callbackはエラー・オブジェクトを第1引数に呼び出されるかもしれませんが、呼び出されないかもしれません。エラーを確実に検知するには'error'イベントのリスナーを設定してください。

メソッドの戻り値はchunkが内部的なバッファに追加され、当該のバッファがストリームが生成された際に指定されたhighWaterMarkを超過したかどうかを示します。falseが返された場合、ストリームへのさらなるデータの書き込みは'drain'イベントが発行されるまで一旦停止するべきです。

Writableストリームがオブジェクト・モードの場合、encodingは常に無視されます。

Readableストリーム

Readableストリームはデータの読み込み元を抽象化するものです。

Readableストリームの例としては以下のものがあります:

すべてのReadableストリームはstream.Readableクラスで定義されたインターフェースを実装しています。

●2つのモード

Readableストリームは2つのモードのいずれかで動作します: flowing とpausedです。

flowingモードのとき、データは背後のシステム・リソースから自動的に読み込まれ、そのデータはEventEmitterインターフェースが発行するイベントを通じてできる限り迅速にアプリケーションに供給されます。

pausedモードのとき、ストリームからデータのチャンクを読み込むためには明示的にstream.read()メソッドを呼び出してやる必要があります。

すべてのReadableストリームははじめpausedモードですが、次にあげるいずれかの方法でflowingモードに切り替えることができます:

  • 'data'イベント・ハンドラを追加する
  • stream.resume()メソッドを呼び出す
  • stream.pipe()メソッドを呼び出しデータを送り込むべきWritableを指定する

following モードのReadableストリームは次にあげるいずれかの状況でpausedモードに戻ります:

  • パイプ先が存在しない状況で、stream.pause()メソッドが呼び出された
  • パイプ先が存在しない状況で、'data'イベント・ハンドラが削除された、あるいはstream.unpipe()メソッドによりすべてのパイプ先が削除された

覚えておいていただきたいのは、データを消費するか破棄するかするメカニズム〔訳注:明示的メソッド呼び出し、イベント・ハンドラ、そしてパイプ先ストリームを指す〕が存在しない限りReadableはデータを生成しないということです。メカニズムが無効化されたり取り払われたりすると、ストリームはデータの生成を停止させようと試みます。

注意:Note: 後方互換性のため、'data'イベント・ハンドラの削除はストリームを自動的に停止〔pause〕させません。加えてまた、パイプ先が存在する状況でstream.pause()メソッドを呼び出しても、パイプ先の排出〔drain〕や追加のデータの問合せがあった場合、ストリームはモードであり続ける保証はありません。

注意:データを処理する消費側〔訳注:イベント・ハンドラやパイプ先〕が存在しない状態でReadableストリームがflowingモードに切り替わると、データは喪失してしまいます。これは例えば、readable.resume()メソッドが'data'イベント・リスナーが登録されていないか、'data'イベント・ハンドラが当該ストリームから削除された状態で呼び出されたときに発生します。

●3つの状態

Readableストリームの「2つの動作モード」は、ストリームの実装コードの中で行われているより複雑な状態管理を簡略化するものです。

厳密に言えば、ある特定の時点において、Readableストリームはかならず3つの状態のいずれかをとります:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

readable._readableState.flowingnullのとき、当該ストリームには読み込まれたデータを消費するメカニズム〔訳注:イベント・ハンドラやパイプ先〕が存在せず、したがってストリームはデータを生成しません。

'data'イベントのためのリスナーを登録したり、readable.pipe()メソッドを呼び出したり、readable.resume()メソッドを呼び出したりすることで、readable._readableState.flowingtrueに切り替わります。そしてReadableストリームはデータを生成してイベントを発行します。

readable.pause()もしくはreadable.unpipe()が呼び出されるか、「背圧」〔訳注:back pressure。Writable.write(...)メソッドがfalseを返した場合を指しているものと思われる〕を受けるかすると、readable._readableState.flowingfalseに切り替わります。イベントの発行は一時的に停止されますが、データの生成自体は停止しません。

readable._readableState.flowingfalseである間、データはストリーム内部のバッファに充填されていきます。

●1つを選ぶ

ReadableストリームAPIはNode.jsのバージョン・アップの過程で進化してきました。そしてストリームが生成するデータを消費する方法も複数提供されてきました。一般に、開発者は1つのストリームについてそれらの方法のうち1つを選んで使用すべきであり、複数の方法を使用すべきではありません。

readable.pipe()メソッドはストリームのデータを消費するための平易な方法であり、ほとんどのユーザにはこのメソッドの使用をおすすめします。もちろんデータの転送と生成に関してより粒度の細かい制御をする必要がある開発者は、EventEmitter インターフェースとreadable.pause()/readable.resume()メソッドを使用することもできます。

●クラス:stream.Readable

追加:バージョン0.9.4

○イベント: 'close'

追加:バージョン0.9.4

'close'イベントは、ストリームとその背後のシステム・リソース(ファイル記述子など)がクローズされたときに発行されます。このイベントは、当該ストリームに関して以後いかなるイベントもいかなる計算処理も発生しないことを示すものです。

すべてのReadableストリームが'close'イベントを発行するわけではありません。

○イベント:'data'

追加:バージョン0.9.4

  • chunk <Buffer> | <String> | データのチャンク。

オブジェクト・モードで動作していないストリームの場合、チャンクには文字列もしくはBufferの値が設定されます。オブジェクト・モードで動作しているストリームの場合、チャンクにはnull以外のいかなるJavaScriptオブジェクトも設定される可能性があります。

'data'イベントは、ストリームがバッファに溜め込んだデータ・チャンクを手放そうとしたときに発行されます。readable.pipe()readable.resume()メソッドが呼び出されたり、'data'イベント・リスナーが設定されたりすることでストリームがflowingモードに切り替わった場合にも発行されます。 'data'イベントはまたreadable.read()メソッドが呼び出され、データ・チャンクが呼び出し元に返却可能になったタイミングでも発行されます。

明示的にpausedモードに設定されていないストリームに関して言えば、'data'イベント・リスナーを登録することでflowingモードに切り替わります。データはそれが利用可能になると可能な限り迅速にリスナーに渡されます。

readable.setEncoding()メソッドによりデフォルトのエンコーディングが設定されている場合、リスナー・コールバック関数に渡されるデータ・チャンクは文字列型になります。それ以外の場合Buffer型です。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`${chunk.length}バイトのデータを受け取った`);
});

○イベント:'end'

追加:バージョン0.9.4

'end'イベントは、ストリームから取得可能なデータがもはや存在しないという状態になったとき発行されます。

注意:'end'イベントはデータが取得されきるまで発行されません。これはストリームをflowingモードに切り替えるか、データがなくなるまで繰り返しstream.read()を呼び出すかすることで実現できます。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`${chunk.length}バイトのデータを受け取った`);
});
readable.on('end', () => {
  console.log('もうデータはない');
});

○イベント:'error'

追加:バージョン0.9.4

Readableストリームの実装はいかなる時点でも'error'イベントを発行する可能性があります。一般的には、背後で起きる内部的なエラーによりストリームがデータを生成できない状態になったときや、ストリームの実装が不正なデータ・チャンクをバッファに追加しようとしたりしたときにそれは起きます。

リスナー・コールバック関数には単一の引数としてErrorオブジェクトが渡されます。

○イベント:'readable'

追加:バージョン0.9.4

'readable'イベントはストリームからデータを読み取ることが可能になったときに発行されます。場合によっては、この'readable'イベントのためのリスナー登録そのものが、ある程度の量のデータが内部的なバッファに読み込まれるきっかけとなることもあります。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 現時点で読み込み可能なデータがある
});

'readable'イベントは、ストリームのデータ終端に到達したときにも、'end'イベントが発行される前に一度発行されます。

実際のところ、'readable'イベントはストリームが新しい情報を持つことを示すものです:新しいデータが読み取り可能になったか、ストリームの終端に到達したか。前者の場合stream.read()メソッドはデータを返しますが、後者の場合stream.read()nullを返します〔訳注:かようにしてnullは特殊な意味合いで使われているので、データ・チャンクそのものとしてnullを含めることはできないわけです〕。例えば次に示す例で、foo.txt は空のファイルです:

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log('readable:', rr.read());
});
rr.on('end', () => {
  console.log('end');
});

このスクリプトを実行すると次のような出力がなされます:

$ node test.js
readable: null
end

注意:Note: 一般的にはreadable.pipe()メソッドと'data'イベントの提供する機能のほうが'readable'イベントより好ましいものでしょう。

○メソッド:readable.isPaused()

readable.isPaused()メソッドはReadableストリームの現在の動作状態を返します。このメソッドは何よりもまずreadable.pipe()メソッドの提供するメカニズムの中で使用されます。ほとんどの場合においてこのメソッドを直接使用することはないでしょう。

const readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

○メソッド:readable.pause()

追加:バージョン0.9.4

  • 戻り値: this

readable.pause()メソッドはモードのストリームに対して'data'イベント発行を停止させ、flowingモードでなくさせます。このとき内部的なバッファには引き続き利用可能なデータが残る可能性があります。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`${chunk.length}バイトのデータを受け取った`);
  readable.pause();
  console.log('1秒間、追加のデータは受け取らない');
  setTimeout(() => {
    console.log('この時点でまたデータが流れ出す');
    readable.resume();
  }, 1000);
});

○メソッド:readable.pipe(destination[, options])

追加:バージョン0.9.4

  • destination <stream.Writable> データの書き込み先
  • options <Object> パイプのオプション
    • end <Boolean> 読み込み側が終了したら書き込み側も終了させる。デフォルトはtrue

readable.pipe()メソッドはWritableストリームをreadableに接続させます。ストリームのモードは自動的にflowingに切り替わり、接続されたWritableに対してすべてのデータが投入されます。Writableストリームがより高速なReadableストリームによって圧倒されてしまうようなことのないよう、データの流れは自動的に管理されます。

次の例では、readableから得られるデータのすべてがfile.txtというファイルに書き込まれています:

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readableから得られたすべてのデータを'file.txt'の中に
readable.pipe(writable);

単一のReadable ストリームに対して複数のWritableを割り当てることができます。
連鎖的なパイプ処理を表現するためにreadable.pipe()メソッドは書き込み先のストリームへの参照を返します。:

const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

デフォルトでは、書き込み先にあたるWritable ストリームのstream.end()メソッドは、読み取り元にあたるReadableストリームの'end'イベントが発行されたときに呼び出されます。これにより書き込み先ストリームは書き込み不可となります。この挙動を無効化する場合はendオプションにfalseを指定します。これで次の例で示すように、書き込み先ストリームは依然としてオープンされたままとなります:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

1つ重要な警告をしておきますが、Readableストリームの処理中にエラーが発生した場合、Writableストリームは自動的にクローズされません。エラーが起きた場合、メモリー・リークを抑止するには手動でクローズさせてやる必要があります。

注意:オプションの内容に関わらず、process.stderrprocess.stdoutのWritableストリームはNode.jsプロセスが終了するまで決してクローズされません。

○メソッド:readable.read([size])

追加:バージョン0.9.4

  • size どれだけのデータを読み取るかを指定するためのオプションの引数
  • 戻り値: | |

readable.read()メソッドは内部的なバッファからいくらかのデータを取り出して呼び出し元に返します。読み取り可能なデータがない場合、nullを返します。デフォルトでは、データはBufferオブジェクトとして返されますがreadable.setEncoding()によりエンコーディングが指定されている場合や、ストリームがオブジェクト・モードで動いている場合はこの限りではありません。

オプションの引数sizeは読み取るバイト数を指定するものです。データ読み取りに際して引数sizeが使用できない場合、ストリームが終了していない限りnullが返されることはなく、バッファに残されたすべてのデータが返されます(もしsizeで指定されたバイト数を上回るものであっても)〔訳注:「sizeに満たない場合」ではなく「sizeが使用できない場合」ということなので、ようするに「ストリームの実装やストリームが抽象化しているシステム・リソースの性格上サイズ指定が無効である場合size指定は無視される」ということと思われます〕。

sizeが指定されていない場合は、内部的なバッファに格納されたすべてのデータが返されます。

readable.read()メソッドはReadableストリームがpausedモードで動作しているときのみ呼び出すようにすべきです。flowingモードでは、readable.read()メソッドは内部的なバッファが空になるとともに自動的に呼び出されます。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`${chunk.length}バイトのデータを受け取った`);
  }
});

一般に、開発者は'readable'イベントとreadable.read()メソッドの使用を避け、代わりにむしろreadable.pipe()メソッドもしくは 'data'イベントを利用することが推奨されています。

オブジェクト・モードのReadableストリームではreadable.read(size)はいつでも単一の値を返します。引数sizeの値は考慮されません。

注意: readable.read()メソッドがデータ・チャンクを返すとき、'data'イベントもまた発行されます。

注意: stream.read([size])'end'イベントのあと呼び出すとnullが返されます。実行時エラーは発生しません。

○メソッド:readable.resume()

追加:バージョン0.9.4

  • 戻り値: this

readable.resume()メソッドは一時停止していたReadableストリームを明示的に再開させ、 'data'イベントの発行を再開させ、動作モードをflowingモードに切り替えます。

readable.resume()メソッドは、次の例で解説しているように、ストリームの生成するデータを実際には何らの利用もせずに済ませるのにも使用することができます:

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('ストリームは終了、しかし結局なにも読み込んでいない');
  });

○メソッド:readable.setEncoding(encoding)

追加:バージョン0.9.4

  • encoding 読み込みに利用するエンコーディング
  • 戻り値: this

readable.setEncoding()メソッドはデータ読み込みに利用するデフォルトのエンコーディングを設定します。

エンコーディングの設定をすることで、データはBufferオブジェクトとしてでなく指定されたエンコーディングの文字列として返されるようになります。例えば、 readable.setEncoding('utf8')の呼び出しにより、データはUTF-8のデータとして読み取られ、〔dataイベントのコールバック関数には〕文字列型の値として渡されるようになります。readable.setEncoding('hex')の呼び出しにより、データは16進数表現の文字列形式でエンコードされたものとなります。

〔エンコーディング指定された〕Readableストリームはマルチバイト文字を適切に処理することができますが、他方でBuffer オブジェクトを返すストリームはこれを正しくデコードすることができません。

readable.setEncoding(null)を呼び出すことでエンコーディングを無効化することができます。これはバイナリ・データや非常に多くのチャンク分かれた巨大なマルチバイトの文字列を処理する場合に有効なアプローチです。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('%d 文字からなる文字列データを受け取った', chunk.length);
});

○メソッド:readable.unpipe([destination])

追加:バージョン0.9.4

  • destination アンパイプするストリームを指定するオプション引数

readable.unpipe()メソッドはstream.pipe() メソッドにより接続されていたWritableストリームを接続解除します。

destinationが指定されていない場合、すべてのストリームを接続解除します。

destinationが指定されているものの、当該ストリームに接続されているものがない場合は、何も起こりません。

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readableから読み込まれたデータはすべて'file.txt'に送られる
// ただし初っ端の1秒間だけ
readable.pipe(writable);
setTimeout(() => {
  console.log('file.txtへの書き込みをストップ');
  readable.unpipe(writable);
  console.log('ファイル・ストリームを手動でクローズ');
  writable.end();
}, 1000);

○メソッド:readable.unshift(chunk)

追加:バージョン0.9.11

  • chunk | 読み込みキューの先頭に追加するデータ・チャンク

readable.unshift()メソッドはデータ・チャンクを内部的なバッファの先頭に追加します。この機能はいくつかの状況──This is useful in certain situations where a 当該のストリームが楽観的に取り出された一定の量のデータを「消費しない」〔un-consume〕ままでおき、他のアプリケーション・コードに渡す必要があるような──では便利なものです。

注意:stream.unshift(chunk)メソッドは'end'イベントが発行されたあとに呼び出すことはできません。あえて実行すればエラーがスローされます。

stream.unshift()を使用する開発者は、それをTransformストリームの使用に切り替えられないかを検討してみるべきでしょう。詳細については後続の「ストリーム開発者のためのAPI」のセクションも参照してみてください。

// \n\nで区切られたヘッダーを取り除ける
// もし読み込み過ぎていたらunshift()を使う
// コールバック関数はエラー、ヘッダー、ストリームの3つを引数に呼び出される
const StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // ヘッダーの境界が見つかった
        var split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // unshiftするまえにこのリスナー関数を登録解除する
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // これでストリームからはメッセージ・ボディ(だけ)を読み込める
        callback(null, header, stream);
      } else {
        // まだヘッダーの読み込みが終わっていない
        header += str;
      }
    }
  }
}

注意: stream.push(chunk)と異なり、stream.unshift(chunk)はストリーム内部の処理状態をリセットすることで読み込みプロセスを停止させるようなことはしません。readable.unshift()は読み込み(カスタムメイドのストリーム実装の is called during a read (i.e. from within a stream._read()メソッドでの)の最中に呼び出されると意図しない結果を引き起こす可能性があります。readable.unshift()呼び出しの直後にstream.push('')を続けることで、読み込み状態を適切なもにリセットすることができます。しかしながら読み込みプロセスの真っ最中にreadable.unshift()を呼び出すことを避けるのが最良ではあります。

○メソッド:readable.wrap(stream)

追加:バージョン0.9.4

  • stream 「旧式の」読み込み可能ストリーム

バージョン0.10より前のNode.jsには現在定義されているようなstreamモジュールAPIを完全に実装していないストリームが存在していました(詳しくは互換性のセクションを参照)。

'data'イベントを発行しかつアドバイス・レベルに留まる〔訳注:つまり実効性が保証されない〕stream.pause()メソッドを持つ古いNode.jsライブラリを使用するときは、readable.wrap()メソッドによりそのような古いストリームをデータ・ソースとする新しいReadableストリームを作成することができます。

readable.wrap()を使用せねばならないケースは稀なはずですが、古いNode.jsアプリケーションとライブラリとやり取りする場合には便利なものとなります。
例えば:

const OldReader = require('./old-api-module.js').OldReader;
const Readable = require('stream').Readable;
const oreader = new OldReader;
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
});
DuplexストリームとTransformストリーム

●クラス:stream.Duplex

追加:バージョン0.9.4

DuplexストリームはReadableとWritableのいずれストリームのインターフェースをも実装したものです。

Duplexストリームの例としては以下のものがあります:

●クラス: stream.Transform

追加:バージョン0.9.4

Transformストリームはその出力と入力が何かしらの方法で関連付けられたDuplexストリームです。Duplexと同じく、TransformストリームはReadableとWritableのいずれのインターフェースをも実装しています。

Transformストリームの例としては以下のものがあります:

          * * *

原典はNode.jsのリファレンスのSteamについてのページ。Node.js v7.1.0、2016/11/17時点のものです。

原典ではこのあとに「ストリーム実装のためのAPI」が続きます。」が続きます。