Skip to content

Commit

Permalink
update storage func code
Browse files Browse the repository at this point in the history
  • Loading branch information
vcaesar committed Feb 10, 2018
1 parent a476969 commit 44da40c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 56 deletions.
110 changes: 56 additions & 54 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,15 @@ func (engine *Engine) Ranker(options types.EngineOpts) {

// InitStorage initialize the persistent storage channel
func (engine *Engine) InitStorage() {
if engine.initOptions.UseStorage {
engine.storageIndexDocChans =
make([]chan storageIndexDocReq,
engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.storageIndexDocChans[shard] = make(
chan storageIndexDocReq)
}
engine.storageInitChan = make(
chan bool, engine.initOptions.StorageShards)
}
engine.storageIndexDocChans =
make([]chan storageIndexDocReq,
engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.storageIndexDocChans[shard] = make(
chan storageIndexDocReq)
}
engine.storageInitChan = make(
chan bool, engine.initOptions.StorageShards)
}

// CheckMem check the memory when the memory is larger
Expand All @@ -171,58 +169,58 @@ func (engine *Engine) CheckMem() {

// Storage start the persistent storage work connection
func (engine *Engine) Storage() {
if engine.initOptions.UseStorage {
err := os.MkdirAll(engine.initOptions.StorageFolder, 0700)
if err != nil {
log.Fatal("Can not create directory", engine.initOptions.StorageFolder)
}
// if engine.initOptions.UseStorage {
err := os.MkdirAll(engine.initOptions.StorageFolder, 0700)
if err != nil {
log.Fatal("Can not create directory", engine.initOptions.StorageFolder)
}

// 打开或者创建数据库
engine.dbs = make([]storage.Storage, engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)
// 打开或者创建数据库
engine.dbs = make([]storage.Storage, engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)

db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
}

// 从数据库中恢复
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageInitWorker(shard)
}
// 从数据库中恢复
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageInitWorker(shard)
}

// 等待恢复完成
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
<-engine.storageInitChan
}
for {
runtime.Gosched()
if engine.numIndexingReqs == engine.numDocsIndexed {
break
}
// 等待恢复完成
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
<-engine.storageInitChan
}
for {
runtime.Gosched()
if engine.numIndexingReqs == engine.numDocsIndexed {
break
}
}

// 关闭并重新打开数据库
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.dbs[shard].Close()
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)
// 关闭并重新打开数据库
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.dbs[shard].Close()
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)

db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
}

for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageIndexDocWorker(shard)
}
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageIndexDocWorker(shard)
}
// }
}

// Init initialize the engine
Expand Down Expand Up @@ -270,7 +268,9 @@ func (engine *Engine) Init(options types.EngineOpts) {
engine.CheckMem()

// 初始化持久化存储通道
engine.InitStorage()
if engine.initOptions.UseStorage {
engine.InitStorage()
}

// 启动分词器
for iThread := 0; iThread < options.NumSegmenterThreads; iThread++ {
Expand All @@ -293,7 +293,9 @@ func (engine *Engine) Init(options types.EngineOpts) {
}

// 启动持久化存储工作协程
engine.Storage()
if engine.initOptions.UseStorage {
engine.Storage()
}

atomic.AddUint64(&engine.numDocsStored, engine.numIndexingReqs)
}
Expand Down
1 change: 1 addition & 0 deletions indexer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (engine *Engine) indexerAddDocWorker(shard int) {
if request.doc != nil {
atomic.AddUint64(&engine.numTokenIndexAdded,
uint64(len(request.doc.Keywords)))

atomic.AddUint64(&engine.numDocsIndexed, 1)
}
if request.forceUpdate {
Expand Down
6 changes: 4 additions & 2 deletions net/grpc/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ func rpcSearch(sea com.SearchArgs) *pb.SearchReply {
return rep
}

var rpcdata []*pb.SearchReply
var rpcwg sync.WaitGroup
var (
rpcdata []*pb.SearchReply
rpcwg sync.WaitGroup
)

// WgRpc rpc
func WgRpc(address string, sea com.SearchArgs) {
Expand Down

0 comments on commit 44da40c

Please sign in to comment.