From 3e763e8eb8ff6bace1fa98d349be8fd3075a4d7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E8=85=BE=E9=81=93?= Date: Mon, 18 Dec 2017 16:27:51 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=B1=E5=AD=90=E8=BF=9B=E7=A8=8B=E6=96=B9?= =?UTF-8?q?=E6=A1=88=E6=94=B9=E4=B8=BA=E5=8E=9F=E7=94=9F=E5=A4=9A=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=96=B9=E6=A1=88=EF=BC=8C=E5=B9=B6=E5=AF=BC=E5=85=A5?= =?UTF-8?q?=E4=BA=86=E5=9B=9E=E8=B0=83=E3=80=81=E5=90=8C=E6=AD=A5=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E7=9A=84=E7=AE=80=E5=8C=96=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 47 +++++++++------- src/lib/@.js | 135 +++++++++------------------------------------- src/lib/worker.js | 51 ------------------ src/package.json | 4 +- 4 files changed, 56 insertions(+), 181 deletions(-) delete mode 100644 src/lib/worker.js diff --git a/README.md b/README.md index d68ee9a..d9c9132 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,46 @@ # `require('@ltd/j-gzip')` -调用**常驻**子进程进行 `gzip` 相关的**异步** `CPU` 密集型运算,以利用多核硬件资源,避免阻塞主进程。 +基于原生多线程的 `promisify` 异步 `gzip` 相关的 `CPU` 密集运算。 -子进程一旦发生错误(除了数据源问题导致的解压失败),模块将自行锁闭并拒绝任何新的请求,以避免引发更为严重的后果。 +原本是 `spawn` 子进程实现对多核 `CPU` 的充分利用,并避免阻塞主进程。 +现在基于原生 `gzip` `unzip` 异步方法派生线程的方案,更好地达到这一目的。 + +压缩选项参数被简化为压缩等级参数; +压缩占用内存等级为最高,其它选项值缺省。 ## API -### `.gzipBuffer(buffer, gzipLevel)` `async` 方法 +### `.gzipAsync(data, level)` `async` 方法 -- 用途:对 `Buffer` 进行指定等级的 `gzip` 压缩。 +- 用途:对 `data` 进行指定等级的 `gzip` 压缩。 - 参数: * `buffer` - - 用途:要压缩的 `Buffer`。 - - 类型:`Buffer` - + `gzipLevel` - - 用途:压缩等级。 + - 用途:要压缩的数据。 + - 类型:`Buffer` `string` + * `level` + - 用途:指定压缩等级。 - 类型:`1` 至 `9` 之间的整数 -### `.gzipString(string, gzipLevel)` `async` 方法 +### `.gzipSync` 方法 -- 用途:对字符串进行指定等级的 `gzip` 压缩。 -- 参数: - * `buffer` - - 用途:要压缩的字符串。 - - 类型:`string` - + `gzipLevel` - - 用途:压缩等级。 - - 类型:`1` 至 `9` 之间的整数 +原生的同步压缩方法;压缩选项参数被简化为压缩等级参数。 + +### `.gzip` 方法 -### `.unzipBuffer(buffer)` `async` 方法 +原生的异步压缩方法(基于回调);压缩选项参数被简化为压缩等级参数。 + +### `.unzipAsync(data)` `async` 方法 - 用途:对 `gzip` 类压缩数据进行解压。 - 参数: * `buffer` - - 用途:要解压的 `Buffer`。 + - 用途:要解压的数据。 - 类型:`Buffer` + +### `.unzipSync` 方法 + +原生的同步压缩方法。 + +### `.unzip` 方法 + +原生的异步压缩方法(基于回调)。 diff --git a/src/lib/@.js b/src/lib/@.js index 968e53b..133d008 100644 --- a/src/lib/@.js +++ b/src/lib/@.js @@ -1,127 +1,44 @@ 'use strict'; -const { from } = Buffer; +const { gzip, unzip, gzipSync, unzipSync } = require('zlib'); -const gzipEmpty = function(){ +const OPTIONS = Object.create(null);{ const { assign, create } = Object; - const { gzipSync, unzipSync } = require('zlib'); - const empty = Buffer.alloc(0); - return new Array(10).fill(null).map((_,index)=> index===0 ? - new Promise((resolve)=>resolve(unzipSync(empty))) : - Promise.resolve(gzipSync(empty,assign(create(null),{ level:index, memLevel:9 }))) - ); -}(); -const unzipEmpty = gzipEmpty[0]; -const typeBuffer = Promise.reject(new TypeError('{@ltd/j-gzip} gzipBuffer 和 gzipAsync 仅接受 Buffer 作为第一个参数。')); -const typeString = Promise.reject(new TypeError('{@ltd/j-gzip} gzipString 仅接受 string 作为第一个参数。')); -const typeNumber = Promise.reject(new TypeError('{@ltd/j-gzip} gzipBuffer 和 gzipString 仅接受 1 至 9 作为第二个参数。')); -{ - const catcher = $=>{}; - unzipEmpty.catch(catcher); - typeBuffer.catch(catcher); - typeString.catch(catcher); - typeNumber.catch(catcher); + let level = 9; + while( level ){ + OPTIONS[level] = assign(create(null),{ level, memLevel:9 }); + --level; + } } -const LEVEL = new Array(10).fill(null).map((_,index)=>from(index+'')); -const UNZIP = LEVEL[0]; - -const queue = []; -let task = null; - -let dead = false; -let deadError = null; -let deadPromise = null; - -const { stdin } = function({ onError, onDataError, onData }){ - const subProcess = require('child_process').spawn( - process.execPath, - [__dirname+require('path').sep+'worker.js'], - { windowsHide:true } - ); - subProcess.on('error',onError); - subProcess.stdin.on('error',onError); - subProcess.stderr.on('error',onError); - subProcess.stderr.on('data',onDataError); - subProcess.stdout.on('error',onError); - subProcess.stdout.on('data',onData); - return subProcess; -}({ +module.exports = { - onError(error){ - if( dead ){ return; } - dead = true; - deadError = new Error('{@ltd/j-gzip} 服务因为之前的错误,已经终止。历史错误信息:\n\n'+error.stack); - deadPromise = Promise.reject(deadError); deadPromise.catch($=>{}); - if( task===null ){ return; } - task.reject(deadError); - task = null; - for( const { reject } of queue ){ reject(deadError); } - queue.length = 0; + gzipAsync(data,level){ + return new Promise((resolve,reject)=> + gzip(data,OPTIONS[level],(error,data)=> + error===null ? resolve(data) : reject(error) + ) + ); }, - onDataError(errorData){ - if( dead ){ return; } - task.reject(new Error(errorData.toString())); - if( queue.length===0 ){ task = null; return; } - task = queue.shift(); - stdin.write(task.level); - stdin.write(task.data); + gzipSync(data,level){ + return gzipSync(data,OPTIONS[level]); }, - onData(data){ - if( dead ){ return; } - task.resolve(data); - if( queue.length===0 ){ task = null; return; } - task = queue.shift(); - stdin.write(task.level); - stdin.write(task.data); + gzip(data,level,callback){ + return gzip(data,OPTIONS[level],callback); }, -}); - -module.exports = { - - gzipBuffer(buffer,gzipLevel){ - if( dead ){ return deadPromise; } - if(!(buffer instanceof Buffer)){ return typeBuffer; } - return gzip(buffer,gzipLevel); + unzipAsync(data){ + return new Promise((resolve,reject)=> + unzip(data,(error,data)=> + error===null ? resolve(data) : reject(error) + ) + ); }, - gzipString(string,gzipLevel){ - if( dead ){ return deadPromise; } - if( typeof string!=='string' ){ return typeString; } - return gzip(from(string),gzipLevel); - }, + unzipSync, - unzipBuffer(buffer){ - if( dead ){ return deadPromise; } - if(!(buffer instanceof Buffer)){ return typeBuffer; } - if( buffer.length===0 ){ return unzipEmpty; } - return send(buffer,UNZIP); - }, + unzip, }; - -function gzip(data,level){ - switch( level ){ - case 1:case 2:case 3:case 4:case 5:case 6:case 7:case 8:case 9: - if( data.length===0 ){ return gzipEmpty[level]; } - return send(data,LEVEL[level]); - default: - return typeNumber; - } -} - -function send(data,level){ - return new Promise(function(resolve,reject){ - if( task===null ){ - task = { resolve, reject, data, level }; - stdin.write(level); - stdin.write(data); - } - else{ - queue.push({ resolve, reject, data, level }); - } - }); -} diff --git a/src/lib/worker.js b/src/lib/worker.js deleted file mode 100644 index 0640377..0000000 --- a/src/lib/worker.js +++ /dev/null @@ -1,51 +0,0 @@ -'use strict'; - -const { from } = Buffer; - -const { gzipSync, unzipSync } = require('zlib'); - -const { stdout } = process; - -const UNZIP = Object.create(null); -const OPTIONS = function(){ - const { assign, create } = Object; - const Options = (level)=> assign(create(null),{ level, memLevel:9 }); - return assign(create(null),{ - 0x30: UNZIP, - 0x31: Options(1), - 0x32: Options(2), - 0x33: Options(3), - 0x34: Options(4), - 0x35: Options(5), - 0x36: Options(6), - 0x37: Options(7), - 0x38: Options(8), - 0x39: Options(9), - }); -}(); - -let options = null; - -process.stdin.on('data',function(data){ - - if( options===null ){ - options = OPTIONS[data[0]]; - options===undefined && process.exit(); - if( data.length===1 ){ return; } - data = data.slice(1); - } - - try{ - data = options===UNZIP ? - unzipSync(data) : - gzipSync(data,options); - } - catch({ message }){ - options = null; - stderr.write(from(message)); - return; - } - options = null; - stdout.write(data); - -}); diff --git a/src/package.json b/src/package.json index f7b26df..ccc34e2 100644 --- a/src/package.json +++ b/src/package.json @@ -1,7 +1,7 @@ { "name":"@ltd/j-gzip", - "version":"1.0.1", - "description":"基于子进程的 gzip CPU 密集运算。从属于“简计划”。", + "version":"2.0.0", + "description":"基于原生多线程的 promisify 异步 gzip 相关的 CPU 密集运算。从属于“简计划”。", "bugs":"https://GitHub.com/LongTengDao/j-gzip/issues", "license":"LGPL", "author":"龙腾道 (www.LongTengDao.com)",