Skip to content

Commit

Permalink
support timestamp on postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
wonderflow committed Nov 9, 2018
1 parent cb54ebb commit 394b092
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 30 deletions.
17 changes: 9 additions & 8 deletions metric/system/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"strings"
"sync"

"time"

"github.com/qiniu/logkit/metric"
. "github.com/qiniu/logkit/utils/models"
"time"
)

const (
Expand Down Expand Up @@ -146,9 +147,9 @@ const (
KeyDiskioWriteBytesPerSec = "diskio_write_bytes_per_sec"
KeyDiskioReadTime = "diskio_read_time"
KeyDiskioWriteTime = "diskio_write_time"
KeyDiskioReadAWait = "diskio_read_await"
KeyDiskioWriteAWait = "diskio_write_await"
KeyDiskioAWait = "diskio_await"
KeyDiskioReadAWait = "diskio_read_await"
KeyDiskioWriteAWait = "diskio_write_await"
KeyDiskioAWait = "diskio_await"
KeyDiskioIoTime = "diskio_io_time"
KeyDiskioIoUtil = "diskio_io_util"
KeyDiskioIopsInProgress = "diskio_iops_in_progress"
Expand Down Expand Up @@ -291,9 +292,9 @@ func (s *DiskIOStats) Collect() (datas []map[string]interface{}, err error) {
KeyDiskioWriteBytesPerSec: 0,
KeyDiskioReadTime: io.ReadTime,
KeyDiskioWriteTime: io.WriteTime,
KeyDiskioReadAWait: 0,
KeyDiskioWriteAWait: 0,
KeyDiskioAWait: 0,
KeyDiskioReadAWait: 0,
KeyDiskioWriteAWait: 0,
KeyDiskioAWait: 0,
KeyDiskioIoTime: io.IoTime,
KeyDiskioIoUtil: 0,
KeyDiskioIopsInProgress: io.IopsInProgress,
Expand Down Expand Up @@ -426,7 +427,7 @@ func init() {
ps2 := newSystemPS()
metric.Add(TypeMetricDiskio, func() metric.Collector {
return &DiskIOStats{ps: ps2,
lastCollect: make(map[string]DiskioCollectInfo),
lastCollect: make(map[string]DiskioCollectInfo),
SkipSerialNumber: true}
})
}
2 changes: 1 addition & 1 deletion mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (r *LogExportRunner) readLines(dataSourceTag string) []Data {
var selen int
if se != nil {
selen = len(se.DatasourceSkipIndex)
log.Debugf("Runner[%v] datasourcetag add error, datas %v datasourceSkipIndex %v froms %v", r.Name(), datas, se.DatasourceSkipIndex, froms)
log.Debugf("Runner[%v] datasourcetag add error, datas %v datasourceSkipIndex %v froms %v", r.Name(), datas, se.DatasourceSkipIndex, froms)
}
log.Errorf("Runner[%v] datasourcetag add error, datas(TOTAL %v), datasourceSkipIndex(TOTAL %v) not match with froms(TOTAL %v)", r.Name(), len(datas), selen, len(froms))
}
Expand Down
4 changes: 2 additions & 2 deletions parser/grok/patterns.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//!!! Notice This is auto generated file, DO NOT EDIT IT!!!
//!!! Notice This is auto generated file, DO NOT EDIT IT!!!

package grok
package grok

const DEFAULT_PATTERNS = `S3_REQUEST_LINE (?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})
Expand Down
27 changes: 27 additions & 0 deletions reader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,33 @@ var ModeKeyOptions = map[string][]Option{
Advance: true,
ToolTip: `指定一个 PostgreSQL 的列名,作为 offset 的记录,类型必须是整型,建议使用插入(或修改)数据的时间戳(unixnano)作为该字段`,
},
{
KeyName: KeyPGtimestampKey,
ChooseOnly: false,
Default: "",
DefaultNoUse: true,
Description: `时间戳列名称(` + KeyPGtimestampKey + `)`,
Advance: true,
ToolTip: `指定一个 PostgreSQL 的时间戳(Date/Time Types)`,
},
{
KeyName: KeyPGStartTime,
ChooseOnly: false,
Default: "",
DefaultNoUse: true,
Description: `起始时间(` + KeyPGStartTime + `)`,
Advance: true,
ToolTip: `指定一个 PostgreSQL 的时间戳开始时间(1999-01-08 04:05:06)`,
},
{
KeyName: KeyPGBatchDuration,
ChooseOnly: false,
Default: "1m",
DefaultNoUse: true,
Description: `起始时间(` + KeyPGBatchDuration + `)`,
Advance: true,
ToolTip: `指定一个 PostgreSQL 搜索的范围(10s、2m、1h)`,
},
OptionMetaPath,
OptionDataSourceTag,
{
Expand Down
3 changes: 3 additions & 0 deletions reader/config/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ const (
KeyMssqlExecOnStart = "mssql_exec_onstart"

KeyPGsqlOffsetKey = "postgres_offset_key"
KeyPGtimestampKey = "postgres_timestamp_key"
KeyPGStartTime = "postgres_start_time_key"
KeyPGBatchDuration = "postgres_batch_intervel"
KeyPGsqlReadBatch = "postgres_limit_batch"
KeyPGsqlDataSource = "postgres_datasource"
KeyPGsqlDataBase = "postgres_database"
Expand Down
148 changes: 148 additions & 0 deletions reader/sql/datagen/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package main

import (
"database/sql"
"flag"
"fmt"
"os"

"math/rand"
"strconv"

_ "github.com/denisenkom/go-mssqldb" //mssql 驱动
_ "github.com/go-sql-driver/mysql" //mysql 驱动
_ "github.com/lib/pq" //postgres 驱动
utilsos "github.com/qiniu/logkit/utils/os"
"github.com/qiniu/x/log.v7"

"time"

"strings"

"github.com/Pallinder/go-randomdata"
)

const (
NextVersion = "v1.0.0"
)

const usage = `datagen generate sql data for test
Usage:
datagen [commands|flags]
The commands & flags are:
-v print the version to stdout.
-h <host> specify database host
-p <port> specify database port
-db <database> specify database name
-t <type> specify database type: mysql or postgres
-u <username> database username
-p <password> database password
-table <tablename> database tablename
Examples:
# start to gen data to database
datagen -h 127.0.0.1 -p 3306 -t postgres
`

var (
fversion = flag.Bool("v", false, "print the version to stdout")
host = flag.String("h", "127.0.0.1", "specify database host")
port = flag.String("p", "5432", "specify database port")
databaseType = flag.String("t", "postgres", "specify database type mysql or postgres")
database = flag.String("db", "testdb", "specify database name")
username = flag.String("u", "postgres", "database username")
password = flag.String("password", "", "database password")
table = flag.String("table", "test", "database table name")
)

func usageExit(rc int) {
fmt.Println(usage)
os.Exit(rc)
}

func main() {
flag.Usage = func() { usageExit(0) }
flag.Parse()
switch {
case *fversion:
fmt.Println("datagen version: ", NextVersion)
osInfo := utilsos.GetOSInfo()
fmt.Println("Hostname: ", osInfo.Hostname)
fmt.Println("Core: ", osInfo.Core)
fmt.Println("OS: ", osInfo.OS)
fmt.Println("Platform: ", osInfo.Platform)
return
}
switch *databaseType {
case "mysql":
fmt.Println("mysql is not supported now")
case "postgres":
fmt.Println("Start to generate data to ", *host, *port, *username, *database, *table)
generatePostgresData(*host, *port, *username, *password, *database, *table)
}
}

func openSql(dbtype, connectStr string) (db *sql.DB, err error) {
db, err = sql.Open(dbtype, connectStr)
if err != nil {
return nil, fmt.Errorf("open %v failed: %v", dbtype, err)
}
return db, nil
}

func getPostgresDb(dbsource string) (db *sql.DB, err error) {
db, err = openSql("postgres", dbsource)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
return db, nil
}

func preparePostgres(pgDbSource, databasename string) (db *sql.DB, err error) {
db, err = openSql("postgres", pgDbSource)
if err != nil {
log.Error(err)
return nil, err
}
defer db.Close()

return getPostgresDb(pgDbSource + " dbname=" + databasename)
}

func generatePostgresData(host, port, username, password, database, table string) {
datasource := "sslmode=disable host=" + host + " port=" + port + " dbname=" + database + " user=" + username + " password=" + password
db, err := preparePostgres(datasource, database)
if err != nil {
log.Error(err)
return
}
defer db.Close()

_, err = db.Exec(`CREATE TABLE ` + table + ` (id int4,email varchar, city varchar, useragent varchar,age int4,salary float4,delete bool,create_time timestamp(6))WITH (OIDS=FALSE);`)
if err != nil && !strings.Contains(err.Error(), "already exists") {
log.Error(err)
return
}
datanum := 1
for {
dt := `INSERT INTO ` + table + ` VALUES ('` + strconv.Itoa(datanum) + `', '` + randomdata.Email() + `', '` + randomdata.City() + `','` + randomdata.UserAgentString() + `', ` + strconv.Itoa(rand.Intn(100)) + `, '` + strconv.FormatFloat(rand.Float64(), 'f', -1, 64) + `', 't', '` + time.Unix(int64(149999999+datanum), 0).Format("2006-01-02 15:04:05") + `');`
_, err = db.Exec(dt)
if err != nil {
log.Error(err)
return
}
if datanum%100 == 0 {
fmt.Println(datanum, " of data inserted")
}
datanum++
}
}
Loading

0 comments on commit 394b092

Please sign in to comment.