编程技术文章分享与教程

网站首页 > 技术文章 正文

5000字带你全面深入理解JS中的Stream API

hmc789 2024-11-12 11:36:42 技术文章 2 ℃

前两篇文章介绍File Access APIWeb Serial API时,都涉及到了字节流的处理,因此这篇文章我们就来深入了解 JavaScript中的流操作。

对于流的概念我们并不陌生,Http所依赖的TCP协议正是基于字节流的。虽然以往我们能通过XMLHttpRequest收发请求,但是在ES6-fetch发布之前,我们并不能直接处理流。

Streams API能够让我们直接处理通过网络接收的数据流或通过本地任何方式创建的数据流。以往我们请求媒体资源的时候,得到的都是二进制流,浏览器会自动转换成相应的编码格式然后再呈现出来,如果要处理它的话一般使用Blob对象包装后再进行操作。现在使用Streams API我们可以直接处理它,下面列举了一些可以实现的功能:

  • 视频特效:读取视频流,然后通过管道与转换流连接起来,逐帧进行转换处理,实现诸如水印、剪辑、添加音轨等功能。
  • 数据解压缩:压缩包、视频、图片解压缩等。
  • 图像转码:流处理可以是基于字节的,因此可以逐个字节地处理请求到图片资源,来对它进行转码。例如JPGPNG等格式转换。

上述功能以往需要服务端配合才能实现,现在网页也能轻松搞定。

Streamnodejs中应用非常广泛,服务器的请求和响应分别就是可读流和可写流,熟悉node开发的朋友更容易理解本篇文章所讲的概念,即使未曾用过node或使用过其中的流操作,阅读本篇文章后,相信上手也会非常容易,毕竟理念是相通的。

核心概念

为了更好地了解Steam API中的各种流处理方法,必须了解流的一些概念。

流一般指的是数据流(data stream),它是一组有序,有起点和终点的字节的数据序列。最初是通信领域使用的概念,代表传输中所使用的信息的数字编码信号序列。这个概念最初在1998年由Henzinger提出,他将数据流定义为“只能以事先规定好的顺序被读取一次的数据的一个序列”。

流按照性质分类分为:输入流输出流缓冲流等,按照类型分为字节流字节流。流的传输是单向的、不可逆的,常见的输入流有键盘输入,输出流有打印机等等。

流处理是一个由多个处理单元参与的过程,类似于工厂里的流水线,每个部分负责不同的工作。

数据块 Chunks

数据块是向流写入或从流中读取的单个数据块。在数据传输前,数据会被分成一块一块的以方便传输和处理。在JS中,它可以是任何类型的,一种流可以包含不同类型的数据块,例如一个字节流可能包含由16KBUint8Array对象组成的块,而不是单个字节。

可读流 Readable streams

可读流表示可以读取的数据源。例如我们要处理一张请求到的图片,我们可以创建一个WritableStream实例,然后使用它来加载图片,作为图片的读取流。

可写流 Writable streams

可写流表示数据写入的目的地,我们创建了图片的读取流对象后,就能够创建一个WritableStream实例,通过管道连接起来,可以直接输出也可以进行转换。

转换流 Transform streams

转换流是流操作的重点,相应地使用它也是创建一个TransformStream实例,它的通常用法是,首先连接可读流来获取数据,再连接可写流来输出转换后的数据。可能会有点抽象

管道 Pipe

流必须通过管道来进行传输。可读流可以通过pipeTo()方法直接连接可写流,也可以通过pipeThrough()连接一个或多个转换流,这些方法在各种流之间建立管道。不同的管道组成一起就叫做管链Pipe chains

背压 Backpressure

背压(Back Pressure),通常是指运动流体在密闭容器中沿其路径(譬如管路或风通路)流动时,由于受到障碍物或急转弯道的阻碍而被施加的与运动方向相反的压力。

一个流通常由多种类型的流组成,不同的流通过不同的管道汇入下一个流,管道中流传输的快慢可以用流速来表示,而流处理数据的能力也有差异,因此就会造成拥塞,就像高速塞车四车道变一车道,一眼望不到头。在流处理中,当管链中一个流已经到了数据处理的极限,它就会产生一个信号,这个信号回反向传递每一个数据源,直到它到达起始源,告诉它不要再发了,要受不了了,然后起始源就会停止发送,这是一种负反馈调节,就像人体的激素调节一样。

三通 Tee

虽然Tee的翻译是球座,但是明白了它的功能后,我觉得它更像生活中常用的三通管。

就是下面这个

tee()是可读流的一个方法,用于创建2个可读流的副本,由于一个流只能被使用一次,使用后原先的流就不存在了,如果要对流进行多种处理,可以使用该方法。

流程图

上面的概念有点抽象,下面结合流程图来理解下:

通过fetch请求一个文本文件,我们需要将里面的字母变成全大写的,因此我们可以通过流来对它进行转换。首先创建一个可读流来加载它,然后通过管道将它与转换流进行连接,这里转换流负责对文本进行大写转换,转换完成后通过管道连接可写流进行输出,但是有2种输出形式,但可写流只有一个,并且流传输完成后,它之前的那种状态将不再存在,因此总不能每种输出都创建一个流吧,因此使用tee()来为可写流开2个分支,分别进行不同的输出。

深入理解

知悉了流的上述概念后,我们就可以深入了解Stream API的一些方法了,其最核心的就是ReadableStreamWritableStream以及TransformStream对象。

ReadableStream

ReadableStream通常是流处理过程的起点,通过创建一个ReadableStream实例来作为后续各类操作的数据源。

它通常有两个来源:

  • 推流:读取推送端推送的数据,例如实时视频流中服务器读取用户上传的视频、服务器推送消息,websocket等等。
  • 拉流:通过特定地址拉取服务器资源的过程。例如发送网络请求。

下面是基于RTMP协议的流媒体服务示意图:

创建

ReadableStream()是一个构造函数,有一个可选参数underlyingSource,表示一个带有方法和属性的对象,这些方法和属性定义了可读流实例的一些行为。

const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

underlyingSource对象的三个方法是开发人员定义的接口,我们只需对它们进行重写。

  • start(controller):调用构造函数时调用,可以访问数据源,以此来执行一些初始化操作,如果调用的过程是异步的,则返回一个Promise。接收一个ReadableStreamDefaultController对象参数。
  • pull(controller):每次获取数据块时调用,直到内部的缓冲队列被填满。
  • cancel(reason):被要求停止发送流时触发,通常是收到一个背压信号。

缓冲队列

流的读取是先来先服务的,这是因为可读流内部有一个缓冲队列,用于存储从数据源那获取的数据块。创建ReadableStream实例时,我们通过传入第二个参数queuingStrategy对象来指定缓存队列的入队策略。

const readableStream = new ReadableStream({
    /* 第一个参数,underlyingSource对象 */
  },
  // 第二个参数queuingStrategy对象
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);
  • highWaterMark:非负数,表示存储数据块的最大容量,结合流的概念来说就是最高水位线。
  • size(chunk):计算并返回数据块的大小,用于背压信号的触发。

读取数据

有了ReadableStream实例后,通过getReader()方法创建一个读取器对象ReadableStreamDefaultReader,再通过该对象的read()来获得数据。

流的读取是一个独占的、持续的、异步的过程。

  • 独占是指读取流时给流上锁,当流被锁定时,在释放该读取器之前,不能获取其他读取器。通过访问可读流对象的locked判断流是否被锁定:
const locked = readableStream.locked;
console.log(`可读取流当前的锁定状态: ${locked ? '上锁' : '解锁'}`);
  • 持续是指每次只读一部分,直到读完。
  • 异步是指同步调用返回的结果是个promise实例。

promise处于完成状态时是一个对象,包含valuedone两个属性:

  • value:数据值,是一个Uint8Array对象
  • done:流读取状态,true表示读取完毕

下面是一个异步循环读取数据的栗子:

const readableStream = new ReadableStream({...},{...});
// 创建一个读取器
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('数据读取完毕');
    break;
  }
  console.log('数据块的原始值是:', value);
}

{value: val,done: res}这种形式的对象是不是很熟悉?没错,就是迭代器遍历对象。

遍历器(Iterator)是一种接口,为各种不同的数据结构提供统一的访问机制。

对迭代器不熟悉的,可以阅读阮一峰老师的Iterator 和 for...of 循环 - ECMAScript 6入门 (ruanyifeng.com)

知道了结果对象与迭代器结果对象类似,我们就能用for...of迭代获取数据了,由于是异步的,这里使用异步迭代。

异步迭代器[Symbol.asyncIterator] 是ES-2018的新特性,未被全部js引擎支持,因此必要时使用相应的polyfills。

Babel从6.16.0开始,异步迭代被包含在 Babel 的 "babel-plugin-transform-async-generator-functions" 下以及 babel-preset-stage-3

异步迭代的一种解决方案是使用一个辅助函数,返回一个包含异步迭代特性的对象

// 创建一个可读流
const readableStream = new ReadableStream({...},{...});
// 异步迭代辅助函数
function streamAsyncIterator(stream) {
  // 创建一个读取器,同时给流上锁
  const reader = stream.getReader();

  return {
    // 这里类似同步迭代器的next
    next() {
  // 读取流数据
      return reader.read();
    },
    // 异步迭代器发生异常时,给流解锁
    return() {
      reader.releaseLock();
      return {};
    },
 // 关键,声明一个异步迭代器接口
    [Symbol.asyncIterator]() {
      return this;
    },
  };
}
for await (const chunk of streamAsyncIterator(readableStream)) {
    const { done, value } = chunk
   if (done) {
     console.log('数据读取完毕');
     break;
   }
   console.log('数据块的原始值是:', value);
}

Teeing 操作

tee()是可读流对象的一个方法,创建2个读写流的副本,执行返回包含了这2个副本的数组。由于读取器读取一个流时会独占它,并且流只能被使用一次,因此tee()可以使得一个流能被2个读取器使用。

// 创建可读流的2个副本A、B
const [streamA, streamB] = readableStream.tee();
// 读取器A
const readerA = streamA.getReader();
// 读取器B
const readerB = streamB.getReader();

WritableStream

可写入流一般作为流操作过程中的最后一站,可以把它看作接收最终已处理数据的容器,也是底层接收(原始数据被写入底层I/O)之上的抽象。

创建

readableStream一样,创建一个writableStream实例也是通过调用其构造函数,同时传入一个underlyingSink对象参数来约束其行为。

const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

underlyingSink中的接口如下:

  • start(controller):构造函数被调用时调用
  • write(chunk, controller):每当读取一个数据块时触发。
  • close(controller):当完成全部数据的接收时触发。
  • abort(reason):强制终止流的写入。

写入数据

调用writableStreamgetWriter()返回一个写入器对象WritableStreamDefaultWriter实例,与读取一样,写入也会给流上锁,流处于锁定状态下不能写入到其他的流。

const writer = writableStream.getWriter();
const result = await writer.write('这是一段数据');

写入的过程同样是异步的,因此返回一个Promise实例,成功状态则意味着数据已经被接收,但是并不意味着它顺利到达目的地,例如使用Web Serial API 向外围设备发送数据,虽然返回结果显示成功,但这只意味着OS底层已经将数据通过串口发送出去,在线路传输过程中可能会出现数据丢失,最终没有到达设备处理器,因此这次传输是失败的。

通过writableStreamlocked属性获取流是否处于锁定状态:

const locked = writableStream.locked;// true or false

一个例子,通过流来逐秒打印字母,可以复制到控制台输出看看。

// 创建一个可写流
const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    await new Promise((resolve) => setTimeout(() => {
      console.log('[write]', chunk);
      resolve();
    }, 1000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});


const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // 等待写入器就绪
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  writer.write(char);
}
await writer.close();

打印结果:

从可读流获取数据

可读流可以通过pipeTo()建立一条管道将数据传输到可写流。

// 创建一个可读流实例
const readableStream = new ReadableStream({
  start(controller) {
    console.log('[start readableStream]');
    // 初始化缓冲队列,先填入一点数据
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // 当controller缓冲队列队列为空时调用。
    console.log('[pull]');
    controller.enqueue('data');
    controller.close();
  },
  cancel(reason) {
    // 流传输被取消时调用
    console.log('[cancel]', reason);
  },
});

// 可写流实例
const writableStream = new WritableStream({
  start(controller) {
    console.log('[start writableStream]');
  },
  // 当读取器调用write()时触发
  async write(chunk, controller) {
    await new Promise((resolve) => setTimeout(() => {
      console.log('[write]', chunk);
      resolve();
    }, 1000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

TransformStream

转换流对象是流处理的关键,用于对数据进行加工。

创建

其构造函数接受一个对象参数,通过重写里面的接口来定义加工规则。

const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});
  • start(controller):构造函数被调用时触发,可以用它来初始化缓存队列。
  • transform(chunk, controller): 数据转换时调用,通常在里面编写转换规则。
  • flush(controller):当最后一块数据完成,并且写入端关闭前触发,通常是在要传输的数据全部完成,并且希望在最后加上一些自定义的数据。

处理可读流

通过ReadableStream 的pipeThrough( )方法将数据通过管道传输到转换流。

下面的例子通过转换流来将可读流里的小写字母转换成大写。

// 转换流实例
const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    // 将小写字母变成大写
    controller.enqueue(chunk.toUpperCase());
  },
  flush(controller) {
    console.log('[flush]');
    // 终止转换,转换结束
    controller.terminate();
  },
});

// 可读流实例,这里与上面的栗子一样
const readableStream = new ReadableStream({
  start(controller) {
    // 预先填入一些数据
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    console.log('[pull]');
    controller.enqueue('data');
    controller.close(); 
  },
  cancel(reason) {
    console.log('[cancel]', reason);
  },
});
// 在控制台环境下允许可以使用IIFE
// 这里使用IIFE的原因是在await目前不能在顶层被调用
(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

处理网络请求

可读流也可以是一个网络请求,例如fetch:

// 返回一个转换流对象,用于将每个字母进行大写转换
function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

// 返回一个可写流,用于将数据渲染到dom上
function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}
// 请求一段文本数据
fetch('./text.txt').then((response) =>
  response.body
     // 默认是utf8编码,中文要转成'gb2312'
    .pipeThrough(new TextDecoderStream('gb2312'))
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

上面的代码通过fetch请求一段文本数据,由于响应结果Response对象的body就是一个ReadableStream,因此可以使用流的所有管道方法。先是将数据传输到一个TextDecoderStream()对象进行译码,在上一篇文章输出串口数据时就用来转换中文字符。经过译码后的数据又传输到了转换流对象进行转换,最后数据传输到了可写流进行输出渲染到dom上。上面的流程就像一条流水线以上,数据从读入到输出经一系列的加工。

其他内置流接口的对象

  • BlobBlob 对象表示一个不可变、原始数据的类文件对象。它的数据可以按文本或二进制的格式进行读取,也可以通过stream()转换成 ReadableStream 来用于数据操作。
  • FileFile接口基于Blob,继承了 blob 的功能并将其扩展使其支持用户系统上的文件。
  • FetchFetch的Response.body就是一个 ReadableStream就是一个 ReadableStream就是一个 ReadableStream就是一个 ReadableStream
  • 第一篇文章介绍的File System Access API中的 FileSystemWritableFileStream
  • 第二篇文章介绍Web Serial API中串口的port.readable

浏览器支持

通过Can I use网站可以看到谷歌浏览器以及换装chromiumEdge支持度最高,其他浏览器只有部分支持。火狐即使最新版本也依旧不支持可读流和管道相关方法。

总结

Stream API 目前在浏览器上得到了不同程度的支持,并且主要功能都在支持范围内,在主流浏览器上的支持度还非常高,因此完全可以用于生产环境。像xlxsjs-zip等库都需要将数据序列化成二进制来输出,其内部通常使用Blob来包装数据,有了Stream API之后,这样的操作会更加地简洁。由于篇幅有限,这里就不演示了,待有时间会做一个数据压缩转换的demo供各位看官使用。

写作该篇文章时,由于中文搜索引擎上资料较少,大部分是介绍node下的Stream,因此查阅许多外文文档,下面参考只列出部分,如有错误,望各位看官评论区斧正。

参考

  • Streams API - Web API 接口参考 | MDN (mozilla.org)
  • Streams Standard (whatwg.org)
  • 2016 - the year of web streams - JakeArchibald.com
  • MattiasBuelens/web-streams-polyfill: Web Streams, based on the WHATWG spec reference implementation (github.com)

Tags:

标签列表
最新留言