Skip to content

Commit

Permalink
Getting dummy transport layer in
Browse files Browse the repository at this point in the history
  • Loading branch information
0x19 committed Sep 15, 2024
1 parent fc30762 commit f4b4d42
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 22 deletions.
157 changes: 157 additions & 0 deletions benchmark/dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package benchmark

import (
"context"
"fmt"
"github.com/quic-go/quic-go"
"github.com/unpackdev/fdb"
transport_dummy "github.com/unpackdev/fdb/transports/dummy"
"github.com/unpackdev/fdb/types"
)

type DummySuite struct {
fdbInstance *fdb.FDB
server *transport_dummy.Server
client quic.Connection
stream quic.Stream
}

func NewDummySuite(fdbInstance *fdb.FDB) *DummySuite {
return &DummySuite{
fdbInstance: fdbInstance,
}
}

// Start starts the QUIC server for benchmarking.
func (qs *DummySuite) Start() error {
dummyTransport, err := qs.fdbInstance.GetTransportByType(types.DummyTransportType)
if err != nil {
return fmt.Errorf("failed to retrieve QUIC transport: %w", err)
}

dummyServer, ok := dummyTransport.(*transport_dummy.Server)
if !ok {
return fmt.Errorf("failed to cast transport to DummyServer")
}

db, err := qs.fdbInstance.GetDbManager().GetDb("benchmark")
if err != nil {
return fmt.Errorf("failed to retrieve benchmark database: %w", err)
}

wHandler := transport_dummy.NewDummyWriteHandler(db)
dummyServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage)

rHandler := transport_dummy.NewDummyReadHandler(db)
dummyServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage)

if err := dummyServer.Start(); err != nil {
return fmt.Errorf("failed to start Dummy server: %w", err)
}

qs.server = dummyServer
fmt.Println("QUIC server started successfully")
return nil
}

// Stop stops the QUIC server and closes the client connection and stream.
func (qs *DummySuite) Stop() {
if qs.stream != nil {
qs.stream.Close()
}
if qs.client != nil {
qs.client.CloseWithError(0, "closing connection")
}
if qs.server != nil {
qs.server.Stop()
fmt.Println("QUIC server stopped successfully")
}
}

// SetupClient sets up a QUIC client and stream only once.
func (qs *DummySuite) SetupClient(ctx context.Context) error {
if qs.client != nil && qs.stream != nil {
return nil // Already setup, reuse client and stream
}

/* serverAddr := qs.server.Addr()
clientTLSConfig := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"quic-example"},
}
// Connect to the server
client, err := quic.DialAddr(ctx, serverAddr, clientTLSConfig, nil)
if err != nil {
return fmt.Errorf("failed to dial QUIC server: %w", err)
}
qs.client = client
// Open a stream to send messages
stream, err := client.OpenStreamSync(ctx)
if err != nil {
return fmt.Errorf("failed to open stream: %w", err)
}
qs.stream = stream*/

return nil
}

// Run sends a single message through a QUIC stream sequentially.
func (qs *DummySuite) Run(ctx context.Context) error {
// Check if stream is initialized
if qs.stream == nil {
return fmt.Errorf("stream is not initialized")
}

/* // Send the write message
message := createWriteMessage()
encodedMessage, err := message.Encode()
if err != nil {
return fmt.Errorf("failed to encode message: %w", err)
}
_, err = qs.stream.Write(encodedMessage)
if err != nil {
return fmt.Errorf("failed to write message to server: %w", err)
}
// Reuse buffer from pool
buffer := bufferPool.Get().([]byte)
defer bufferPool.Put(buffer) // Return buffer to pool after use
// Read the response
_, err = qs.stream.Read(buffer)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
// Perform read operation
readMessage := createReadMessage(message.Key)
encodedReadMessage, err := readMessage.Encode()
if err != nil {
return fmt.Errorf("failed to encode read message: %w", err)
}
_, err = qs.stream.Write(encodedReadMessage)
if err != nil {
return fmt.Errorf("failed to write read message: %w", err)
}
// Read the data length
_, err = io.ReadFull(qs.stream, buffer[:4])
if err != nil {
return fmt.Errorf("failed to read data length: %w", err)
}
valueLength := binary.BigEndian.Uint32(buffer[:4])
// Read the actual data
readBuffer := make([]byte, valueLength)
_, err = io.ReadFull(qs.stream, readBuffer)
if err != nil {
return fmt.Errorf("failed to read value: %w", err)
}
*/
return nil
}
3 changes: 2 additions & 1 deletion benchmark/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ func NewSuiteManager(fdbInstance *fdb.FDB) *SuiteManager {
Suites: make(map[SuiteType]TransportSuite),
}

// Register available suites (currently just QUIC)
// Register available suites
manager.RegisterSuite(QUICSuite, NewQuicSuite(fdbInstance))
manager.RegisterSuite(DummySuiteType, NewDummySuite(fdbInstance))

// Future: Add other suites like UDS here

Expand Down
5 changes: 3 additions & 2 deletions benchmark/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
type SuiteType string

const (
QUICSuite SuiteType = "quic"
UDSSuite SuiteType = "uds" // Example for future transport suites
QUICSuite SuiteType = "quic"
UDSSuiteType SuiteType = "uds" // Example for future transport suites
DummySuiteType SuiteType = "dummy"
)

// ErrInvalidSuiteType is returned when an unsupported SuiteType is provided.
Expand Down
14 changes: 14 additions & 0 deletions cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ func BenchmarkCommand() *cli.Command {
},
},
},
{
Type: types.DummyTransportType,
Enabled: true,
Config: config.DummyTransport{
Enabled: true,
IPv4: "127.0.0.1",
Port: 4433,
TLS: config.TLS{
Key: "./data/certs/key.pem",
Cert: "./data/certs/cert.pem",
RootCA: "",
},
},
},
},
}

Expand Down
9 changes: 9 additions & 0 deletions fdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/unpackdev/fdb/config"
"github.com/unpackdev/fdb/db"
"github.com/unpackdev/fdb/transports"
transport_dummy "github.com/unpackdev/fdb/transports/dummy"
transport_quic "github.com/unpackdev/fdb/transports/quic"
"github.com/unpackdev/fdb/types"
)
Expand Down Expand Up @@ -39,6 +40,14 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) {

for _, transport := range cnf.Transports {
switch t := transport.Config.(type) {
case config.DummyTransport:
udsServer, err := transport_dummy.NewDummyServer(ctx, t)
if err != nil {
return nil, errors.Wrap(err, "failed to create dummy server")
}
if err := transportManager.RegisterTransport(types.DummyTransportType, udsServer); err != nil {
return nil, errors.Wrap(err, "failed to register UDS transport")
}
case config.QuicTransport:
quicServer, err := transport_quic.NewServer(ctx, t)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion transports/dummy/handler_read.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_dummy

import (
"github.com/panjf2000/gnet"
Expand Down
2 changes: 1 addition & 1 deletion transports/dummy/handler_write.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_dummy

import (
"github.com/panjf2000/gnet"
Expand Down
30 changes: 14 additions & 16 deletions transports/dummy/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_dummy

import (
"context"
Expand All @@ -12,8 +12,7 @@ import (

type DummyHandler func(c gnet.Conn, frame []byte)

// DummyServer struct represents the Unix Domain Socket (UDS) server using gnet
type DummyServer struct {
type Server struct {
*gnet.EventServer
ctx context.Context
cnf config.DummyTransport
Expand All @@ -23,9 +22,8 @@ type DummyServer struct {
started chan struct{}
}

// NewUDSServer creates a new UDSServer instance
func NewUDSServer(ctx context.Context, cnf config.DummyTransport) (*DummyServer, error) {
server := &DummyServer{
func NewDummyServer(ctx context.Context, cnf config.DummyTransport) (*Server, error) {
server := &Server{
ctx: ctx,
cnf: cnf,
handlerRegistry: make(map[types.HandlerType]DummyHandler),
Expand All @@ -37,12 +35,12 @@ func NewUDSServer(ctx context.Context, cnf config.DummyTransport) (*DummyServer,
}

// Addr returns the UDS socket path as a string
func (s *DummyServer) Addr() string {
func (s *Server) Addr() string {
return s.cnf.Addr()
}

// Start starts the UDS server
func (s *DummyServer) Start() error {
func (s *Server) Start() error {
s.stopChan = make(chan struct{})
s.started = make(chan struct{}) // Initialize the started channel
listenAddr := "unix://" + s.addr
Expand All @@ -59,7 +57,7 @@ func (s *DummyServer) Start() error {
}

// Tick is called periodically by gnet
func (s *DummyServer) Tick() (delay time.Duration, action gnet.Action) {
func (s *Server) Tick() (delay time.Duration, action gnet.Action) {
select {
case <-s.stopChan:
return 0, gnet.Shutdown
Expand All @@ -69,23 +67,23 @@ func (s *DummyServer) Tick() (delay time.Duration, action gnet.Action) {
}

// Stop stops the UDS server
func (s *DummyServer) Stop() {
func (s *Server) Stop() {
close(s.stopChan)
}

func (s *DummyServer) WaitStarted() <-chan struct{} {
func (s *Server) WaitStarted() <-chan struct{} {
return s.started
}

// OnInitComplete is called when the server starts
func (s *DummyServer) OnInitComplete(server gnet.Server) (action gnet.Action) {
func (s *Server) OnInitComplete(server gnet.Server) (action gnet.Action) {
log.Printf("Dummy Server is listening on %s", server.Addr.String())
close(s.started) // Signal that the server has started
return
}

// React handles incoming data
func (s *DummyServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
func (s *Server) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
if len(frame) < 1 {
return []byte("ERROR: Invalid action"), gnet.None
}
Expand All @@ -109,7 +107,7 @@ func (s *DummyServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.
}

// parseActionType parses the action type from the frame
func (s *DummyServer) parseActionType(frame []byte) (types.HandlerType, error) {
func (s *Server) parseActionType(frame []byte) (types.HandlerType, error) {
if len(frame) < 1 {
return 0, errors.New("invalid action: frame too short")
}
Expand All @@ -124,11 +122,11 @@ func (s *DummyServer) parseActionType(frame []byte) (types.HandlerType, error) {
}

// RegisterHandler registers a handler for a specific action
func (s *DummyServer) RegisterHandler(actionType types.HandlerType, handler DummyHandler) {
func (s *Server) RegisterHandler(actionType types.HandlerType, handler DummyHandler) {
s.handlerRegistry[actionType] = handler
}

// DeregisterHandler deregisters a handler for a specific action
func (s *DummyServer) DeregisterHandler(actionType types.HandlerType) {
func (s *Server) DeregisterHandler(actionType types.HandlerType) {
delete(s.handlerRegistry, actionType)
}
2 changes: 1 addition & 1 deletion transports/dummy/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_dummy

import (
"context"
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type TransportType int

const (
UDPTransportType TransportType = iota
DummyTransportType
QUICTransportType
UDSTransportType
TCPTransportType
Expand Down

0 comments on commit f4b4d42

Please sign in to comment.