流写入文件时造成的文件内容被清空问题

· 3 min read

ssh2-sftp-client提供了put方法,可以传入可读流将数据持续写入远程文件。最近遇到问题是当写入流刚开始程序终端的话,会发现文件内容被清空。这里分析下原因

put方法中localSrc可以是可读流。remotePath即远程服务器中文件路径。

  async put(localSrc, remotePath, options) {
    try {
      if (typeof localSrc === 'string') {
        const localCheck = haveLocalAccess(localSrc);
        if (!localCheck.status) {
          throw this.fmtError(
            `Bad path: ${localSrc} ${localCheck.details}`,
            'put',
            localCheck.code,
          );
        }
      }
      return await this._put(localSrc, remotePath, options);
    } catch (e) {
      throw e.custom ? e : this.fmtError(`Re-thrown: ${e.message}`, 'put', e.code);
    }
  }

根据远程文件路径,sftp创建了可写流,writeStreamOptions设置了可写流的一些配置。可读流不断读取写入SFTP的可写流中。

  _put(lPath, rPath, opts, addListeners = true) {
    let listeners, wtr, rdr;
    return new Promise((resolve, reject) => {
      if (addListeners) {
        listeners = addTempListeners(this, '_put', reject);
      }
      opts = {
        readStreamOptions: { ...opts?.readStreamOptions, autoClose: true },
        writeStreamOptions: { ...opts?.writeStreamOptions, autoClose: true },
        pipeOptions: { ...opts?.pipeOptions, end: true },
      };
      if (haveConnection(this, '_put', reject)) {
        wtr = this.sftp.createWriteStream(rPath, opts.writeStreamOptions);
        ...
        if (lPath instanceof Buffer) {
          this.debugMsg('put source is a buffer');
          wtr.end(lPath);
        } else {
          rdr =
            typeof lPath === 'string'
              ? fs.createReadStream(lPath, opts.readStreamOptions)
              : lPath;
          ...
          rdr.pipe(wtr, opts.pipeOptions);
        }
      }
    }).finally(() => {
      if (addListeners) {
        removeTempListeners(this, listeners, '_put');
      }
    });
  }

sftp的可写流是基于nodejs的可写流进行的继承改造。flag如果没有设置默认为w

查询NodeJS官网,'w': Open file for writing. The file is created (if it does not exist) or truncated (if it exists).

到此其实可以知道内容被清空是因为这个w。但SFTP又是怎么受这个flag影响的呢,继续。

function WriteStream(sftp, path, options) {
  ...
  WritableStream.call(this, options);

  this.path = path;
  this.flags = options.flags === undefined ? 'w' : options.flags;
  this.mode = options.mode === undefined ? 0o666 : options.mode;

  this.start = options.start;
  this.autoClose = options.autoClose === undefined ? true : options.autoClose;
  this.pos = 0;
  this.bytesWritten = 0;
  this.isClosed = false;

  this.handle = options.handle === undefined ? null : options.handle;
  this.sftp = sftp;
  this._opening = false;

  if (this.start !== undefined) {
    checkPosition(this.start, 'start');

    this.pos = this.start;
  }

  if (options.encoding)
    this.setDefaultEncoding(options.encoding);

  // Node v6.x only
  this.on('finish', function() {
    if (this._writableState.finalCalled)
      return;
    if (this.autoClose)
      this.destroy();
  });

  if (!Buffer.isBuffer(this.handle))
    this.open();
}

写入流的第一步是打开流,而打开时候有传入该参数flags。sftp的open方法中,stringToFlags(flags_)会对该stream的flag进行转换即转换成SFTP协议中打开文件的flag值。

WriteStream.prototype.open = function() {
  if (this._opening)
    return;

  this._opening = true;

  this.sftp.open(this.path, this.flags, this.mode, (er, handle) => {
    this._opening = false;

    if (er) {
      this.emit('error', er);
      if (this.autoClose)
        this.destroy();
      return;
    }

    this.handle = handle;

    const tryAgain = (err) => {
      if (err) {
        // Try chmod() for sftp servers that may not support fchmod() for
        // whatever reason
        this.sftp.chmod(this.path, this.mode, (err_) => tryAgain());
        return;
      }

      // SFTPv3 requires absolute offsets, no matter the open flag used
      if (this.flags[0] === 'a') {
        const tryStat = (err, st) => {
          if (err) {
            // Try stat() for sftp servers that may not support fstat() for
            // whatever reason
            this.sftp.stat(this.path, (err_, st_) => {
              if (err_) {
                this.destroy();
                this.emit('error', err);
                return;
              }
              tryStat(null, st_);
            });
            return;
          }

          this.pos = st.size;
          this.emit('open', handle);
          this.emit('ready');
        };

        this.sftp.fstat(handle, tryStat);
        return;
      }

      this.emit('open', handle);
      this.emit('ready');
    };

    this.sftp.fchmod(handle, this.mode, tryAgain);
  });
}
  open(path, flags_, attrs, cb) {
    if (this.server)
      throw new Error('Client-only method called in server mode');

    if (typeof attrs === 'function') {
      cb = attrs;
      attrs = undefined;
    }

    const flags = (typeof flags_ === 'number' ? flags_ : stringToFlags(flags_));
    if (flags === null)
      throw new Error(`Unknown flags string: ${flags_}`);

    let attrsFlags = 0;
    let attrsLen = 0;
    if (typeof attrs === 'string' || typeof attrs === 'number')
      attrs = { mode: attrs };
    if (typeof attrs === 'object' && attrs !== null) {
      attrs = attrsToBytes(attrs);
      attrsFlags = attrs.flags;
      attrsLen = attrs.nb;
    }

    /*
      uint32        id
      string        filename
      uint32        pflags
      ATTRS         attrs
    */
    const pathLen = Buffer.byteLength(path);
    let p = 9;
    const buf = Buffer.allocUnsafe(4 + 1 + 4 + 4 + pathLen + 4 + 4 + attrsLen);

    writeUInt32BE(buf, buf.length - 4, 0);
    buf[4] = REQUEST.OPEN;
    const reqid = this._writeReqid = (this._writeReqid + 1) & MAX_REQID;
    writeUInt32BE(buf, reqid, 5);

    writeUInt32BE(buf, pathLen, p);
    buf.utf8Write(path, p += 4, pathLen);
    writeUInt32BE(buf, flags, p += pathLen);
    writeUInt32BE(buf, attrsFlags, p += 4);
    if (attrsLen) {
      p += 4;

      if (attrsLen === ATTRS_BUF.length)
        buf.set(ATTRS_BUF, p);
      else
        bufferCopy(ATTRS_BUF, buf, 0, attrsLen, p);

      p += attrsLen;
    }
    this._requests[reqid] = { cb };

    const isBuffered = sendOrBuffer(this, buf);
    this._debug && this._debug(
      `SFTP: Outbound: ${isBuffered ? 'Buffered' : 'Sending'} OPEN`
    );
  }
const stringFlagMap = {
  'r': OPEN_MODE.READ,
  'r+': OPEN_MODE.READ | OPEN_MODE.WRITE,
  'w': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE,
  'wx': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
  'xw': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
  'w+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE,
  'wx+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
         | OPEN_MODE.EXCL,
  'xw+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
         | OPEN_MODE.EXCL,
  'a': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE,
  'ax': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
  'xa': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
  'a+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE,
  'ax+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
         | OPEN_MODE.EXCL,
  'xa+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
         | OPEN_MODE.EXCL
};

到此也就明白了为什么会为空即上传文件时如果原文件存在,实际上是先trucate进行清空的。

那如何避免该问题呢?

解决办法就是在sftp上传文件时首先上传到临时目录下,之后mv重命名为原文件比较保险。

相关文档