Skip to content
Merged
226 changes: 219 additions & 7 deletions internal/pkg/api/handleOpAMP.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"gopkg.in/yaml.v3"

"github.com/gofrs/uuid/v5"
"github.com/open-telemetry/opamp-go/protobufs"
oaServer "github.com/open-telemetry/opamp-go/server"
Expand All @@ -27,6 +30,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/checkin"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
)

Expand All @@ -38,13 +42,17 @@ type OpAMPT struct {
cfg *config.Server
bulk bulk.Bulk
cache cache.Cache
bc *checkin.Bulk
bc checkinBulk

srv oaServer.OpAMPServer
handler oaServer.HTTPHandlerFunc
connCtx oaServer.ConnContext
}

type checkinBulk interface {
CheckIn(id string, opts ...checkin.Option) error
}

func NewOpAMPT(
ctx context.Context,
cfg *config.Server,
Expand Down Expand Up @@ -200,13 +208,20 @@ func (oa *OpAMPT) handleMessage(zlog zerolog.Logger, apiKey *apikey.APIKey) func
}
}

func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, _ zerolog.Logger, agentID string) (*model.Agent, error) {
func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, zlog zerolog.Logger, agentID string) (*model.Agent, error) {
agent, err := dl.FindAgent(ctx, oa.bulk, dl.QueryAgentByID, dl.FieldID, agentID)
if errors.Is(err, dl.ErrNotFound) {
return nil, nil
}

// if agents index doesn't exist yet, it will be created when the first agent document is indexed
if errors.Is(err, es.ErrIndexNotFound) {
zlog.Info().Msg("index not found when searching for enrolled agent")
return nil, nil
}

if err != nil {
zlog.Error().Err(err).Msg("failed to find agent by ID")
return nil, fmt.Errorf("failed to find agent: %w", err)
}

Expand All @@ -233,12 +248,17 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
// description is only sent if any of its fields change.
meta := localMetadata{}
meta.Elastic.Agent.ID = agentID
agentType := ""
var identifyingAttributes, nonIdentifyingAttributes json.RawMessage
if aToS.AgentDescription != nil {
// Extract agent version
for _, ia := range aToS.AgentDescription.IdentifyingAttributes {
switch attribute.Key(ia.Key) {
case semconv.ServiceVersionKey:
meta.Elastic.Agent.Version = ia.GetValue().GetStringValue()
case semconv.ServiceNameKey:
agentType = ia.GetValue().GetStringValue()
meta.Elastic.Agent.Name = agentType
}
}
zlog.Debug().Str("opamp.agent.version", meta.Elastic.Agent.Version).Msg("extracted agent version")
Expand All @@ -250,9 +270,22 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
hostname := nia.GetValue().GetStringValue()
meta.Host.Name = hostname
meta.Host.Hostname = hostname
case semconv.OSTypeKey:
osType := nia.GetValue().GetStringValue()
meta.Os.Platform = osType
}
}
zlog.Debug().Str("hostname", meta.Host.Hostname).Msg("extracted hostname")

identifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.IdentifyingAttributes)
if err != nil {
return nil, fmt.Errorf("failed to marshal identifying attributes: %w", err)
}

nonIdentifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.NonIdentifyingAttributes)
if err != nil {
return nil, fmt.Errorf("failed to marshal non-identifying attributes: %w", err)
}
}

// Update local metadata if something has changed
Expand All @@ -267,9 +300,17 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
EnrolledAt: now.UTC().Format(time.RFC3339),
PolicyID: rec.PolicyID,
Agent: &model.AgentMetadata{
ID: agentID,
ID: agentID,
Version: meta.Elastic.Agent.Version,
Type: agentType,
},
LocalMetadata: data,
// Setting revision to 1, the collector won't receive policy changes and 0 would keep the collector in updating state
PolicyRevisionIdx: 1,
IdentifyingAttributes: identifyingAttributes,
NonIdentifyingAttributes: nonIdentifyingAttributes,
Type: "OPAMP",
Tags: []string{agentType},
}

data, err = json.Marshal(agent)
Expand All @@ -291,14 +332,42 @@ func (oa *OpAMPT) updateAgent(zlog zerolog.Logger, agent *model.Agent, aToS *pro

initialOpts := make([]checkin.Option, 0)

status := "online"

// Extract the health status from the health message if it exists.
if aToS.Health != nil {
initialOpts = append(initialOpts, checkin.WithStatus(aToS.Health.Status))
if !aToS.Health.Healthy {
status = "error"
} else if aToS.Health.Status == "StatusRecoverableError" {
status = "degraded"
}

// Extract the unhealthy reason from the health message if it exists.
// Extract the last_checkin_message from the health message if it exists.
if aToS.Health.LastError != "" {
unhealthyReason := []string{aToS.Health.LastError}
initialOpts = append(initialOpts, checkin.WithUnhealthyReason(&unhealthyReason))
initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.LastError))
} else {
initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.Status))
}
healthBytes, err := json.Marshal(aToS.Health)
if err != nil {
return fmt.Errorf("failed to marshal health: %w", err)
}
initialOpts = append(initialOpts, checkin.WithHealth(healthBytes))
}

initialOpts = append(initialOpts, checkin.WithStatus(status))
initialOpts = append(initialOpts, checkin.WithSequenceNum(aToS.SequenceNum))

capabilities := decodeCapabilities(aToS.Capabilities)
initialOpts = append(initialOpts, checkin.WithCapabilities(capabilities))

if aToS.EffectiveConfig != nil {
effectiveConfigBytes, err := ParseEffectiveConfig(aToS.EffectiveConfig)
if err != nil {
return fmt.Errorf("failed to parse effective config: %w", err)
}
if effectiveConfigBytes != nil {
initialOpts = append(initialOpts, checkin.WithEffectiveConfig(effectiveConfigBytes))
}
}

Expand All @@ -310,10 +379,153 @@ type localMetadata struct {
Agent struct {
ID string `json:"id,omitempty"`
Version string `json:"version,omitempty"`
Name string `json:"name,omitempty"`
} `json:"agent,omitempty"`
} `json:"elastic,omitempty"`
Host struct {
Hostname string `json:"hostname,omitempty"`
Name string `json:"name,omitempty"`
} `json:"host,omitempty"`
Os struct {
Platform string `json:"platform,omitempty"`
} `json:"os,omitempty"`
}

func ParseEffectiveConfig(effectiveConfig *protobufs.EffectiveConfig) ([]byte, error) {
if effectiveConfig.ConfigMap != nil && effectiveConfig.ConfigMap.ConfigMap[""] != nil {
configMap := effectiveConfig.ConfigMap.ConfigMap[""]

if len(configMap.Body) != 0 {
bodyBytes := configMap.Body

obj := make(map[string]interface{})
if err := yaml.Unmarshal(bodyBytes, &obj); err != nil {
return nil, fmt.Errorf("unmarshal effective config failure: %w", err)
}
redactSensitive(obj)
effectiveConfigBytes, err := json.Marshal(obj)
if err != nil {
return nil, fmt.Errorf("failed to marshal effective config: %w", err)
}
return effectiveConfigBytes, nil
}
}
return nil, nil
}

func redactSensitive(v interface{}) {
const redacted = "[REDACTED]"
switch typed := v.(type) {
case map[string]interface{}:
for key, val := range typed {
if redactKey(key) {
typed[key] = redacted
continue
}
redactSensitive(val)
}
case map[interface{}]interface{}:
for rawKey, val := range typed {
key, ok := rawKey.(string)
if ok && redactKey(key) {
typed[rawKey] = redacted
continue
}
redactSensitive(val)
}
case []interface{}:
for i := range typed {
redactSensitive(typed[i])
}
}
}

// TODO move to a common place, same as https://github.com/elastic/elastic-agent/blob/1c3fb4b4c8989cd2cfb692780debd7619820ae72/internal/pkg/diagnostics/diagnostics.go#L454-L468
func redactKey(k string) bool {
// "routekey" shouldn't be redacted.
// Add any other exceptions here.
if k == "routekey" {
return false
}

k = strings.ToLower(k)
return strings.Contains(k, "auth") ||
strings.Contains(k, "certificate") ||
strings.Contains(k, "passphrase") ||
strings.Contains(k, "password") ||
strings.Contains(k, "token") ||
strings.Contains(k, "key") ||
strings.Contains(k, "secret")
}

// anyValueToInterface recursively converts protobufs.AnyValue to Go interface{} for JSON marshalling
func anyValueToInterface(zlog zerolog.Logger, av *protobufs.AnyValue) interface{} {
switch v := av.GetValue().(type) {
case *protobufs.AnyValue_StringValue:
return v.StringValue
case *protobufs.AnyValue_IntValue:
return v.IntValue
case *protobufs.AnyValue_DoubleValue:
return v.DoubleValue
case *protobufs.AnyValue_BoolValue:
return v.BoolValue
case *protobufs.AnyValue_BytesValue:
return v.BytesValue
case *protobufs.AnyValue_ArrayValue:
arr := make([]interface{}, 0, len(v.ArrayValue.Values))
for _, av2 := range v.ArrayValue.Values {
arr = append(arr, anyValueToInterface(zlog, av2))
}
return arr
case *protobufs.AnyValue_KvlistValue:
m := make(map[string]interface{}, len(v.KvlistValue.Values))
for _, kv := range v.KvlistValue.Values {
if kv.Value != nil {
m[kv.Key] = anyValueToInterface(zlog, kv.Value)
}
}
return m
default:
zlog.Warn().Msg("unknown AnyValue type encountered in anyValueToInterface")
return nil
}
}

func ProtobufKVToRawMessage(zlog zerolog.Logger, kv []*protobufs.KeyValue) (json.RawMessage, error) {
// 1. Build an intermediate map to represent the JSON object
data := make(map[string]interface{}, len(kv))
for _, item := range kv {
if item.Value == nil {
continue
}
data[item.Key] = anyValueToInterface(zlog, item.Value)
}

// 2. Marshal the map into bytes
b, err := json.Marshal(data)
if err != nil {
zlog.Error().Err(err).Msg("failed to marshal key-value pairs")
return nil, err
}

return json.RawMessage(b), nil
}

// decodeCapabilities converts capability bitmask to human-readable strings
func decodeCapabilities(caps uint64) []string {
var result []string
capMap := map[uint64]string{
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus): "ReportsStatus",
uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig): "AcceptsRemoteConfig",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig): "ReportsEffectiveConfig",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth): "ReportsHealth",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents): "ReportsAvailableComponents",
uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand): "AcceptsRestartCommand",
}
for mask, name := range capMap {
if caps&mask != 0 {
result = append(result, name)
}
}
return result
}
Loading