Skip to content

Commit

Permalink
由子进程方案改为原生多线程方案,并导入了回调、同步方法的简化 API
Browse files Browse the repository at this point in the history
  • Loading branch information
LongTengDao committed Dec 18, 2017
1 parent 7faddb4 commit 3e763e8
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 181 deletions.
47 changes: 28 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,46 @@
# `require('@ltd/j-gzip')`

调用**常驻**子进程进行 `gzip` 相关的**异步** `CPU` 密集型运算,以利用多核硬件资源,避免阻塞主进程
基于原生多线程的 `promisify` 异步 `gzip` 相关的 `CPU` 密集运算

子进程一旦发生错误(除了数据源问题导致的解压失败),模块将自行锁闭并拒绝任何新的请求,以避免引发更为严重的后果。
原本是 `spawn` 子进程实现对多核 `CPU` 的充分利用,并避免阻塞主进程。
现在基于原生 `gzip` `unzip` 异步方法派生线程的方案,更好地达到这一目的。

压缩选项参数被简化为压缩等级参数;
压缩占用内存等级为最高,其它选项值缺省。

## API

### `.gzipBuffer(buffer, gzipLevel)` `async` 方法
### `.gzipAsync(data, level)` `async` 方法

- 用途:对 `Buffer` 进行指定等级的 `gzip` 压缩。
- 用途:对 `data` 进行指定等级的 `gzip` 压缩。
- 参数:
* `buffer`
- 用途:要压缩的 `Buffer`
- 类型:`Buffer`
+ `gzipLevel`
- 用途:压缩等级
- 用途:要压缩的数据
- 类型:`Buffer` `string`
* `level`
- 用途:指定压缩等级
- 类型:`1``9` 之间的整数

### `.gzipString(string, gzipLevel)` `async` 方法
### `.gzipSync` 方法

- 用途:对字符串进行指定等级的 `gzip` 压缩。
- 参数:
* `buffer`
- 用途:要压缩的字符串。
- 类型:`string`
+ `gzipLevel`
- 用途:压缩等级。
- 类型:`1``9` 之间的整数
原生的同步压缩方法;压缩选项参数被简化为压缩等级参数。

### `.gzip` 方法

### `.unzipBuffer(buffer)` `async` 方法
原生的异步压缩方法(基于回调);压缩选项参数被简化为压缩等级参数。

### `.unzipAsync(data)` `async` 方法

- 用途:对 `gzip` 类压缩数据进行解压。
- 参数:
* `buffer`
- 用途:要解压的 `Buffer`
- 用途:要解压的数据
- 类型:`Buffer`

### `.unzipSync` 方法

原生的同步压缩方法。

### `.unzip` 方法

原生的异步压缩方法(基于回调)。
135 changes: 26 additions & 109 deletions src/lib/@.js
Original file line number Diff line number Diff line change
@@ -1,127 +1,44 @@
'use strict';

const { from } = Buffer;
const { gzip, unzip, gzipSync, unzipSync } = require('zlib');

const gzipEmpty = function(){
const OPTIONS = Object.create(null);{
const { assign, create } = Object;
const { gzipSync, unzipSync } = require('zlib');
const empty = Buffer.alloc(0);
return new Array(10).fill(null).map((_,index)=> index===0 ?
new Promise((resolve)=>resolve(unzipSync(empty))) :
Promise.resolve(gzipSync(empty,assign(create(null),{ level:index, memLevel:9 })))
);
}();
const unzipEmpty = gzipEmpty[0];
const typeBuffer = Promise.reject(new TypeError('{@ltd/j-gzip} gzipBuffer 和 gzipAsync 仅接受 Buffer 作为第一个参数。'));
const typeString = Promise.reject(new TypeError('{@ltd/j-gzip} gzipString 仅接受 string 作为第一个参数。'));
const typeNumber = Promise.reject(new TypeError('{@ltd/j-gzip} gzipBuffer 和 gzipString 仅接受 1 至 9 作为第二个参数。'));
{
const catcher = $=>{};
unzipEmpty.catch(catcher);
typeBuffer.catch(catcher);
typeString.catch(catcher);
typeNumber.catch(catcher);
let level = 9;
while( level ){
OPTIONS[level] = assign(create(null),{ level, memLevel:9 });
--level;
}
}

const LEVEL = new Array(10).fill(null).map((_,index)=>from(index+''));
const UNZIP = LEVEL[0];

const queue = [];
let task = null;

let dead = false;
let deadError = null;
let deadPromise = null;

const { stdin } = function({ onError, onDataError, onData }){
const subProcess = require('child_process').spawn(
process.execPath,
[__dirname+require('path').sep+'worker.js'],
{ windowsHide:true }
);
subProcess.on('error',onError);
subProcess.stdin.on('error',onError);
subProcess.stderr.on('error',onError);
subProcess.stderr.on('data',onDataError);
subProcess.stdout.on('error',onError);
subProcess.stdout.on('data',onData);
return subProcess;
}({
module.exports = {

onError(error){
if( dead ){ return; }
dead = true;
deadError = new Error('{@ltd/j-gzip} 服务因为之前的错误,已经终止。历史错误信息:\n\n'+error.stack);
deadPromise = Promise.reject(deadError); deadPromise.catch($=>{});
if( task===null ){ return; }
task.reject(deadError);
task = null;
for( const { reject } of queue ){ reject(deadError); }
queue.length = 0;
gzipAsync(data,level){
return new Promise((resolve,reject)=>
gzip(data,OPTIONS[level],(error,data)=>
error===null ? resolve(data) : reject(error)
)
);
},

onDataError(errorData){
if( dead ){ return; }
task.reject(new Error(errorData.toString()));
if( queue.length===0 ){ task = null; return; }
task = queue.shift();
stdin.write(task.level);
stdin.write(task.data);
gzipSync(data,level){
return gzipSync(data,OPTIONS[level]);
},

onData(data){
if( dead ){ return; }
task.resolve(data);
if( queue.length===0 ){ task = null; return; }
task = queue.shift();
stdin.write(task.level);
stdin.write(task.data);
gzip(data,level,callback){
return gzip(data,OPTIONS[level],callback);
},

});

module.exports = {

gzipBuffer(buffer,gzipLevel){
if( dead ){ return deadPromise; }
if(!(buffer instanceof Buffer)){ return typeBuffer; }
return gzip(buffer,gzipLevel);
unzipAsync(data){
return new Promise((resolve,reject)=>
unzip(data,(error,data)=>
error===null ? resolve(data) : reject(error)
)
);
},

gzipString(string,gzipLevel){
if( dead ){ return deadPromise; }
if( typeof string!=='string' ){ return typeString; }
return gzip(from(string),gzipLevel);
},
unzipSync,

unzipBuffer(buffer){
if( dead ){ return deadPromise; }
if(!(buffer instanceof Buffer)){ return typeBuffer; }
if( buffer.length===0 ){ return unzipEmpty; }
return send(buffer,UNZIP);
},
unzip,

};

function gzip(data,level){
switch( level ){
case 1:case 2:case 3:case 4:case 5:case 6:case 7:case 8:case 9:
if( data.length===0 ){ return gzipEmpty[level]; }
return send(data,LEVEL[level]);
default:
return typeNumber;
}
}

function send(data,level){
return new Promise(function(resolve,reject){
if( task===null ){
task = { resolve, reject, data, level };
stdin.write(level);
stdin.write(data);
}
else{
queue.push({ resolve, reject, data, level });
}
});
}
51 changes: 0 additions & 51 deletions src/lib/worker.js

This file was deleted.

4 changes: 2 additions & 2 deletions src/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name":"@ltd/j-gzip",
"version":"1.0.1",
"description":"基于子进程的 gzip CPU 密集运算。从属于“简计划”。",
"version":"2.0.0",
"description":"基于原生多线程的 promisify 异步 gzip 相关的 CPU 密集运算。从属于“简计划”。",
"bugs":"https://GitHub.com/LongTengDao/j-gzip/issues",
"license":"LGPL",
"author":"龙腾道 <[email protected]> (www.LongTengDao.com)",
Expand Down

0 comments on commit 3e763e8

Please sign in to comment.