流写入文件时造成的文件内容被清空问题

ssh2-sftp-client提供了put方法,可以传入可读流将数据持续写入远程文件。最近遇到问题是当写入流刚开始程序终端的话,会发现文件内容被清空。这里分析下原因

put方法中localSrc可以是可读流。remotePath即远程服务器中文件路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async put(localSrc, remotePath, options) {
try {
if (typeof localSrc === 'string') {
const localCheck = haveLocalAccess(localSrc);
if (!localCheck.status) {
throw this.fmtError(
`Bad path: ${localSrc} ${localCheck.details}`,
'put',
localCheck.code,
);
}
}
return await this._put(localSrc, remotePath, options);
} catch (e) {
throw e.custom ? e : this.fmtError(`Re-thrown: ${e.message}`, 'put', e.code);
}
}

根据远程文件路径,sftp创建了可写流,writeStreamOptions设置了可写流的一些配置。可读流不断读取写入SFTP的可写流中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
_put(lPath, rPath, opts, addListeners = true) {
let listeners, wtr, rdr;
return new Promise((resolve, reject) => {
if (addListeners) {
listeners = addTempListeners(this, '_put', reject);
}
opts = {
readStreamOptions: { ...opts?.readStreamOptions, autoClose: true },
writeStreamOptions: { ...opts?.writeStreamOptions, autoClose: true },
pipeOptions: { ...opts?.pipeOptions, end: true },
};
if (haveConnection(this, '_put', reject)) {
wtr = this.sftp.createWriteStream(rPath, opts.writeStreamOptions);
...
if (lPath instanceof Buffer) {
this.debugMsg('put source is a buffer');
wtr.end(lPath);
} else {
rdr =
typeof lPath === 'string'
? fs.createReadStream(lPath, opts.readStreamOptions)
: lPath;
...
rdr.pipe(wtr, opts.pipeOptions);
}
}
}).finally(() => {
if (addListeners) {
removeTempListeners(this, listeners, '_put');
}
});
}

sftp的可写流是基于nodejs的可写流进行的继承改造。flag如果没有设置默认为w

查询NodeJS官网,'w': Open file for writing. The file is created (if it does not exist) or truncated (if it exists).

到此其实可以知道内容被清空是因为这个w。但SFTP又是怎么受这个flag影响的呢,继续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
function WriteStream(sftp, path, options) {
...
WritableStream.call(this, options);

this.path = path;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;

this.start = options.start;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.pos = 0;
this.bytesWritten = 0;
this.isClosed = false;

this.handle = options.handle === undefined ? null : options.handle;
this.sftp = sftp;
this._opening = false;

if (this.start !== undefined) {
checkPosition(this.start, 'start');

this.pos = this.start;
}

if (options.encoding)
this.setDefaultEncoding(options.encoding);

// Node v6.x only
this.on('finish', function() {
if (this._writableState.finalCalled)
return;
if (this.autoClose)
this.destroy();
});

if (!Buffer.isBuffer(this.handle))
this.open();
}

写入流的第一步是打开流,而打开时候有传入该参数flags。sftp的open方法中,stringToFlags(flags_)会对该stream的flag进行转换即转换成SFTP协议中打开文件的flag值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
WriteStream.prototype.open = function() {
if (this._opening)
return;

this._opening = true;

this.sftp.open(this.path, this.flags, this.mode, (er, handle) => {
this._opening = false;

if (er) {
this.emit('error', er);
if (this.autoClose)
this.destroy();
return;
}

this.handle = handle;

const tryAgain = (err) => {
if (err) {
// Try chmod() for sftp servers that may not support fchmod() for
// whatever reason
this.sftp.chmod(this.path, this.mode, (err_) => tryAgain());
return;
}

// SFTPv3 requires absolute offsets, no matter the open flag used
if (this.flags[0] === 'a') {
const tryStat = (err, st) => {
if (err) {
// Try stat() for sftp servers that may not support fstat() for
// whatever reason
this.sftp.stat(this.path, (err_, st_) => {
if (err_) {
this.destroy();
this.emit('error', err);
return;
}
tryStat(null, st_);
});
return;
}

this.pos = st.size;
this.emit('open', handle);
this.emit('ready');
};

this.sftp.fstat(handle, tryStat);
return;
}

this.emit('open', handle);
this.emit('ready');
};

this.sftp.fchmod(handle, this.mode, tryAgain);
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
open(path, flags_, attrs, cb) {
if (this.server)
throw new Error('Client-only method called in server mode');

if (typeof attrs === 'function') {
cb = attrs;
attrs = undefined;
}

const flags = (typeof flags_ === 'number' ? flags_ : stringToFlags(flags_));
if (flags === null)
throw new Error(`Unknown flags string: ${flags_}`);

let attrsFlags = 0;
let attrsLen = 0;
if (typeof attrs === 'string' || typeof attrs === 'number')
attrs = { mode: attrs };
if (typeof attrs === 'object' && attrs !== null) {
attrs = attrsToBytes(attrs);
attrsFlags = attrs.flags;
attrsLen = attrs.nb;
}

/*
uint32 id
string filename
uint32 pflags
ATTRS attrs
*/
const pathLen = Buffer.byteLength(path);
let p = 9;
const buf = Buffer.allocUnsafe(4 + 1 + 4 + 4 + pathLen + 4 + 4 + attrsLen);

writeUInt32BE(buf, buf.length - 4, 0);
buf[4] = REQUEST.OPEN;
const reqid = this._writeReqid = (this._writeReqid + 1) & MAX_REQID;
writeUInt32BE(buf, reqid, 5);

writeUInt32BE(buf, pathLen, p);
buf.utf8Write(path, p += 4, pathLen);
writeUInt32BE(buf, flags, p += pathLen);
writeUInt32BE(buf, attrsFlags, p += 4);
if (attrsLen) {
p += 4;

if (attrsLen === ATTRS_BUF.length)
buf.set(ATTRS_BUF, p);
else
bufferCopy(ATTRS_BUF, buf, 0, attrsLen, p);

p += attrsLen;
}
this._requests[reqid] = { cb };

const isBuffered = sendOrBuffer(this, buf);
this._debug && this._debug(
`SFTP: Outbound: ${isBuffered ? 'Buffered' : 'Sending'} OPEN`
);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const stringFlagMap = {
'r': OPEN_MODE.READ,
'r+': OPEN_MODE.READ | OPEN_MODE.WRITE,
'w': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE,
'wx': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'xw': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'w+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE,
'wx+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL,
'xw+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL,
'a': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE,
'ax': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'xa': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'a+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE,
'ax+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL,
'xa+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL
};

到此也就明白了为什么会为空即上传文件时如果原文件存在,实际上是先trucate进行清空的。

那如何避免该问题呢?

解决办法就是在sftp上传文件时首先上传到临时目录下,之后mv重命名为原文件比较保险。

相关文档