blog.euxn.me

Nodejs で stream を使って gzip ファイル全体をメモリに乗せずに先頭一行だけを取得する

2019-12-23 Mon.

この記事は Node.js アドベントカレンダー 2019 の 23 日目です。

はじめに

gzip ファイルなどの圧縮されたファイルを読み込む際、たとえば csv など圧縮率の高いファイル形式かつ大きなファイルの場合、全てをメモリに乗せ切れないことが稀にあります。 そもそも csv のカラムだけ欲しいなどの場合にデータ全体を取得するのは時間もかかるし無駄です。 そこで、 Node.js は stream を扱いやすい言語なので、これを使って簡単に解決できるため紹介します。

なお、 S3 からのデータ取得であっても createReadStream() すれば stream.Readable 型になるため、同様の手法が可能です。そもそも、この話自体がローカルよりはクラウド絡みの方が多いケースになると思いますが……。

おさらい: Node.js での stream でのファイル読み書き

例えば、ファイルを読み込んで標準出力に表示します。

1import fs from "fs";
2
3const input = fs.createReadStream("tsconfig.json", "utf-8");
4
5input.pipe(process.stdout);

書き込みの例として、大きなサイズの csv を生成するスクリプトをファイルに書き込みます。

src/create-big-csv.ts
1import fs from 'fs';
2
3const out = fs.createWriteStream('bigdata.csv', 'utf-8');
4const arr = [...Array(100000)].map((_, idx) => idx);
5
6out.write("id,pow\n");
7arr.forEach(idx => {
8 out.write(`${idx}, ${idx * idx}\n`);
9})

大きなデータなので複数に別れて buffer で流れます。データが来るたびに区切り文字を表示して標準出力に表示する場合はこうです。

1import fs from "fs";
2
3const input = fs.createReadStream("bigdata.csv", "utf-8");
4
5input.on("data", (buf) => {
6 console.log(buf.toString());
7 console.log("---");
8});

gzip の展開を stream に適用する

標準ライブラリの zlib から pipe を作成し適用します。

1import zlib from "zlib";
2import fs from "fs";
3
4const gzip = zlib.createGunzip();
5
6async function main() {
7 const readStream = fs.createReadStream("bigdata.csv.gzip");
8 readStream.pipe(gzip).on("data", (buf) => {
9 console.log(buf.toString());
10 console.log("---");
11 });
12}
13
14main().catch((e) => {
15 console.error(e);
16 process.exit(1);
17});

先頭一行を取得する

普通に buf.toString() した値を "\n" で split できます。 なお、 stream.destroy() が間に合わず次のデータが流れてくることは普通にあるので、一度限りの処理に限定できるよう関数に切り出すのが良さそうです。

1import zlib from "zlib";
2import fs from "fs";
3
4const gzip = zlib.createGunzip();
5
6async function main() {
7 const readStream = fs.createReadStream("bigdata.csv.gzip");
8 const firstLine = await getFirstLineFromStream(readStream.pipe(gzip));
9 console.log(firstLine);
10}
11
12async function getFirstLineFromStream(stream: Readable) {
13 return new Promise((resolve, reject) => {
14 stream.on("data", (buf) => {
15 stream.destroy();
16 const string = buf.toString();
17 const [firstLine] = string.split("\n");
18
19 resolve(firstLine);
20 });
21 stream.on("error", reject);
22 });
23}
24
25main().catch((e) => {
26 console.error(e);
27 process.exit(1);
28});

おまけ: S3 から取得した gzip の先頭一行を取得する

s3.getObject().createReadStream() するだけです。 await は要りません。

1import zlib from "zlib";
2import fs from "fs";
3
4import { S3 } from "aws-sdk";
5
6const gzip = zlib.createGunzip();
7
8async function main() {
9 const s3 = new S3();
10 const readStream = s3
11 .getObject({ Bucket: "your-awesome-bucket", Key: "bigdata.csv.gzip" })
12 .createReadStream();
13
14 const firstLine = await getFirstLineFromStream(readStream.pipe(gzip));
15 console.log(firstLine);
16}
17
18async function getFirstLineFromStream(stream: Readable) {
19 return new Promise((resolve, reject) => {
20 stream.on("data", (buf) => {
21 stream.destroy();
22 const string = buf.toString();
23 const [firstLine] = string.split("\n");
24
25 resolve(firstLine);
26 });
27 stream.on("error", reject);
28 });
29}
30
31main().catch((e) => {
32 console.error(e);
33 process.exit(1);
34});

おわりに

大きなデータを扱うときは、メモリに乗り切らないこともあるので stream を使いましょう。