1. 当设计模式遇见流处理

在Node.js开发中,数据流就像城市下水道系统——如果不精心设计就会导致信息洪灾。本文咱们通过真实场景案例,解读如何用管道模式构建数据处理流水线,用装饰器模式给数据流"加特效",让字节流动既有秩序又充满可能性。

2. 管道模式:搭建数据处理流水线

2.1 文件处理流水线实战

const fs = require('fs');
const { Transform } = require('stream');

// 创建可读流(消防栓开水)
const readStream = fs.createReadStream('input.txt', {
  highWaterMark: 1024 // 水龙头流量控制
});

// 创建转换流(过滤车间)
const filterStream = new Transform({
  transform(chunk, encoding, callback) {
    // 擦除敏感信息
    const filtered = chunk.toString().replace(/password/g, '***');
    this.push(filtered);
    callback();
  }
});

// 创建可写流(下水道出口)
const writeStream = fs.createWriteStream('output.txt');

// 组装管道(连接城市管网)
readStream
  .pipe(filterStream)
  .pipe(writeStream)
  .on('finish', () => console.log('数据净化完成!'));

2.2 HTTP请求处理管道

const http = require('http');
const { PassThrough } = require('stream');

// 创建转发中间件流
const requestLogger = new PassThrough();
requestLogger.on('data', chunk => {
  console.log(`收到请求数据块:${chunk.length}字节`);
});

http.createServer((req, res) => {
  req
    .pipe(requestLogger)
    .pipe(new Transform({
      transform(chunk, _, cb) {
        this.push(chunk.toString().toUpperCase());
        cb();
      }
    }))
    .pipe(res);
}).listen(3000);

3. 装饰器模式:给数据流戴面具

3.1 请求流量监控装饰器

function withTrafficMonitor(stream) {
  let bytesProcessed = 0;
  
  const monitor = new Proxy(stream, {
    get(target, prop) {
      if (prop === 'write') {
        return function(chunk) {
          bytesProcessed += chunk.length;
          console.log(`当前流量:${bytesProcessed}字节`);
          return target.write.apply(target, arguments);
        };
      }
      return target[prop];
    }
  });

  return monitor;
}

// 使用装饰器增强的写入流
const originalStream = fs.createWriteStream('data.log');
const monitoredStream = withTrafficMonitor(originalStream);

setInterval(() => {
  monitoredStream.write(Buffer.alloc(1024)); // 每秒写入1KB
}, 1000);

3.2 缓存装饰器实现

function createCachedStream(originStream) {
  const buffer = [];
  let isFlowing = false;

  return new Proxy(originStream, {
    get(target, prop) {
      if (prop === 'write') {
        return function(chunk) {
          if (isFlowing) {
            target.write(chunk);
          } else {
            buffer.push(chunk);
          }
        };
      }
      if (prop === 'resume') {
        return function() {
          isFlowing = true;
          buffer.forEach(chunk => target.write(chunk));
          buffer.length = 0;
          return target.resume();
        };
      }
      return target[prop];
    }
  });
}

// 应用缓存装饰器
const rawStream = fs.createWriteStream('cache.log');
const cachedStream = createCachedStream(rawStream);

cachedStream.write('第一块数据'); // 缓存中
setTimeout(() => {
  cachedStream.resume(); // 5秒后释放缓存
}, 5000);

4. 组合技:当管道遇上装饰

4.1 实时数据清洗系统

class DataPipeline {
  constructor() {
    this.steps = [];
  }

  addStep(transformFunc) {
    // 动态添加装饰器层
    const step = new Transform({
      transform: transformFunc
    });
    this.steps.push(step);
    return this;
  }

  build() {
    // 管道连接所有处理节点
    return this.steps.reduce((prev, current) => prev.pipe(current));
  }
}

// 构建数据处理管道
const pipeline = new DataPipeline()
  .addStep((chunk, _, cb) => { // 首层清洗
    const data = chunk.toString().trim();
    cb(null, data);
  })
  .addStep((chunk, _, cb) => { // 加密处理
    const encrypted = Buffer.from(chunk).toString('base64');
    cb(null, encrypted);
  })
  .build();

fs.createReadStream('source.data')
  .pipe(pipeline)
  .pipe(fs.createWriteStream('encrypted.data'));

5. 应用场景分析

5.1 金融交易系统

在证券交易场景中,每秒处理上万笔订单需要:1)管道模式分阶段验证订单合法性 2)装饰器实时计算成交统计指标 3)内存控制避免OOM

5.2 视频直播平台

视频流经过:解码管道 -> 水印装饰器 -> 加密管道 -> 自适应码率装饰器。每个环节保持流式处理,避免内存爆炸。

6. 技术选型利弊谈

6.1 管道模式优势

  • 内存效率高(数据分块流动)
  • 天然支持背压控制
  • 清晰的阶段划分

6.2 装饰器模式陷阱

  • 过度装饰导致调试困难
  • 包装层次过多性能下降
  • 需要防范循环引用

7. 避坑指南

  1. 背压平衡:当慢消费者遇上快生产者,不处理背压会导致内存泄漏。建议使用pipeline代替pipe自动处理
  2. 错误冒泡:流水线中间环节错误可能静默失败,务必监听每个流的error事件
  3. 内存管理:Node.js的流默认Buffer是常驻内存的,超大文件处理建议设置objectMode

8. 最佳实践锦囊

  1. 善用pipeline(stream1, stream2, ..., callback)替代链式pipe
  2. 为装饰器实现Symbol.hasInstance保证instanceof可用
  3. 采用stream.finished()API进行可靠性检测
  4. 推荐使用readable-stream替代核心模块获得更好兼容性

9. 未来趋势展望

下一代Node.js流的ECMA提案已出现三股趋势:1)Web Streams API的深度整合 2)TypeScript装饰器语法的官方支持 3)基于Async Iteration的流操作