Skip to content

Commit

Permalink
feat(agent): fix configuration pass-through.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Aug 14, 2023
1 parent ec49822 commit aaad3b4
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 19 deletions.
7 changes: 7 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func New(logger *zap.Logger, c config.Config) (Agent, error) {

pm, err := manager.New(logger, c, db)
if err != nil {
logger.Error("error during create policy manager, exiting", zap.Error(err))
return nil, err
}
if pm.GetRepo() == nil {
logger.Error("policy manager failed to get repository", zap.Error(err))
return nil, err
}
return &orbAgent{logger: logger, config: c, policyManager: pm, db: db, groupsInfos: make(map[string]GroupInfo)}, nil
Expand All @@ -112,6 +117,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error {
configuration := structs.Map(a.config.OrbAgent.Otel)
configuration["agent_tags"] = a.config.OrbAgent.Tags
if err := be.Configure(a.logger, a.policyManager.GetRepo(), configurationEntry, configuration); err != nil {
a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err))
return err
}
backendCtx := context.WithValue(agentCtx, "routine", name)
Expand All @@ -121,6 +127,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error {
backendCtx = context.WithValue(backendCtx, "agent_id", "auto-provisioning-without-id")
}
if err := be.Start(context.WithCancel(backendCtx)); err != nil {
a.logger.Info("failed to start backend", zap.String("backend", name), zap.Error(err))
return err
}
a.backends[name] = be
Expand Down
8 changes: 4 additions & 4 deletions agent/backend/otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var _ backend.Backend = (*openTelemetryBackend)(nil)

//go:embed otelcol-contrib
var openTelemtryContribBinary []byte
var openTelemetryContribBinary []byte

type openTelemetryBackend struct {
logger *zap.Logger
Expand Down Expand Up @@ -58,7 +58,7 @@ type openTelemetryBackend struct {

// Configure initializes the backend with the given configuration
func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo,
_ map[string]string, otelConfig map[string]interface{}) error {
configuration map[string]string, otelConfig map[string]interface{}) error {
o.logger = logger
o.policyRepo = repo
o.otelReceiverTaps = []string{}
Expand All @@ -79,7 +79,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy
}
}
o.mqttConfig = config.MQTTConfig{
Address: "",
Address: configuration["cloud"],
Id: "",
Key: "",
ChannelID: "",
Expand All @@ -89,7 +89,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy
}

func (o openTelemetryBackend) Version() (string, error) {
executable, err := memexec.New(openTelemtryContribBinary)
executable, err := memexec.New(openTelemetryContribBinary)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/backend/otel/otel_sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ receivers:
exporters:
otlphttp:
endpoint: http://localhost:
endpoint: http://localhost:0
processors: # this collection value may be supported (tbd)
Expand Down
8 changes: 7 additions & 1 deletion agent/backend/otel/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, upd

func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error {
policyContext := context.WithValue(o.mainContext, "policy_id", policyData.ID)
executable, err := memexec.New(openTelemtryContribBinary)
executable, err := memexec.New(openTelemetryContribBinary)
if err != nil {
return err
}
Expand All @@ -67,6 +67,12 @@ func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFi
return err
}
go func(ctx context.Context) {
err := command.Start()
if err != nil {
o.logger.Error("error starting command", zap.Error(err))
ctx.Done()
return
}
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
o.logger.Info("stderr output",
Expand Down
26 changes: 13 additions & 13 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ func Run(cmd *cobra.Command, args []string) {
initConfig()

// configuration
var config config.Config
err := viper.Unmarshal(&config)
var configData config.Config
err := viper.Unmarshal(&configData)
if err != nil {
cobra.CheckErr(fmt.Errorf("agent start up error (config): %w", err))
cobra.CheckErr(fmt.Errorf("agent start up error (configData): %w", err))
os.Exit(1)
}

// include pktvisor backend by default if binary is at default location
_, err = os.Stat(pktvisor.DefaultBinary)
if err == nil && config.OrbAgent.Backends == nil {
config.OrbAgent.Backends = make(map[string]map[string]string)
config.OrbAgent.Backends["pktvisor"] = make(map[string]string)
config.OrbAgent.Backends["pktvisor"]["binary"] = pktvisor.DefaultBinary
if err == nil && configData.OrbAgent.Backends == nil {
configData.OrbAgent.Backends = make(map[string]map[string]string)
configData.OrbAgent.Backends["pktvisor"] = make(map[string]string)
configData.OrbAgent.Backends["pktvisor"]["binary"] = pktvisor.DefaultBinary
if len(cfgFiles) > 0 {
config.OrbAgent.Backends["pktvisor"]["config_file"] = cfgFiles[0]
configData.OrbAgent.Backends["pktvisor"]["config_file"] = cfgFiles[0]
}
}

Expand All @@ -88,7 +88,7 @@ func Run(cmd *cobra.Command, args []string) {
}(logger)

// new agent
a, err := agent.New(logger, config)
a, err := agent.New(logger, configData)
if err != nil {
logger.Error("agent start up error", zap.Error(err))
os.Exit(1)
Expand Down Expand Up @@ -151,10 +151,10 @@ func mergeOrError(path string) {
v.SetDefault("orb.otel.port", 0)
v.SetDefault("orb.debug.enable", Debug)

v.SetDefault("orb.backends.pktvisor.binary", "/usr/local/sbin/pktvisord")
v.SetDefault("orb.backends.pktvisor.config_file", "/opt/orb/agent.yaml")
v.SetDefault("orb.backends.pktvisor.api_host", "localhost")
v.SetDefault("orb.backends.pktvisor.api_port", "10853")
//v.SetDefault("orb.backends.pktvisor.binary", "/usr/local/sbin/pktvisord")
//v.SetDefault("orb.backends.pktvisor.config_file", "/opt/orb/agent.yaml")
//v.SetDefault("orb.backends.pktvisor.api_host", "localhost")
//v.SetDefault("orb.backends.pktvisor.api_port", "10853")

if len(path) > 0 {
cobra.CheckErr(v.ReadInConfig())
Expand Down
38 changes: 38 additions & 0 deletions cmd/agent/otel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: "1.0"

# this section is used by pktvisor
# see https://github.com/orb-community/pktvisor/blob/develop/RFCs/2021-04-16-75-taps.md
visor:
taps:
default_pcap:
input_type: pcap
config:
iface: "auto"

# this section is used orb-agent
# most sections and keys are optional
orb:
# these are arbitrary key value pairs used for dynamically define a group of agents by matching against agent group tags
tags:
region: LA
cloud:
config:
# optionally specify an agent name to use during auto provisioning
# hostname will be used if it's not specified here
agent_name: dev-otel-1
auto_provision: false
api:
address: https://kubernetes.docker.internal
mqtt:
address: tls://kubernetes.docker.internal:8883
id: "7cdfe20e-9a58-42ab-9fc0-e6b004a96aed"
key: "f29ebb11-7784-43cf-82d2-bc6192551d77"
channel_id: "dd0a1803-4109-4496-807c-bf624d5350c0"
tls:
verify: false
backends:
otel:
config_file: "/opt/orb/agent.yaml"
otel:
enable: true

0 comments on commit aaad3b4

Please sign in to comment.