Skip to content

Commit

Permalink
Refactor Solace scaler config (#5856)
Browse files Browse the repository at this point in the history
Signed-off-by: SpiritZhou <[email protected]>
  • Loading branch information
SpiritZhou committed Jun 5, 2024
1 parent 535d7f7 commit fd4fcc3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 164 deletions.
214 changes: 52 additions & 162 deletions pkg/scalers/solace_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ const (
solaceAPIVersion = "v2"
solaceAPIObjectTypeQueue = "queue"

// Log Message Templates
solaceFoundMetaFalse = "required Field %s NOT FOUND in Solace Metadata"

// YAML Configuration Metadata Field Names
// Broker Identifiers
solaceMetaSempBaseURL = "solaceSempBaseURL"
Expand Down Expand Up @@ -81,30 +78,45 @@ type SolaceScaler struct {
}

type SolaceMetadata struct {
// Scaler index
triggerIndex int

SolaceMetaSempBaseURL string `keda:"name=solaceSempBaseURL, order=triggerMetadata"`

// Full SEMP URL to target queue (CONSTRUCTED IN CODE)
endpointURL string
solaceSempURL string
EndpointURL string

// Solace Message VPN
messageVpn string
queueName string
MessageVpn string `keda:"name=messageVpn, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`

// Basic Auth Username
username string
Username string `keda:"name=username, order=authParams;triggerMetadata;resolvedEnv"`
// Basic Auth Password
password string
Password string `keda:"name=password, order=authParams;triggerMetadata;resolvedEnv"`

// Target Message Count
msgCountTarget int64
msgSpoolUsageTarget int64 // Spool Use Target in Megabytes
msgRxRateTarget int64 // Ingress Rate Target per consumer in msgs/second
MsgCountTarget int64 `keda:"name=messageCountTarget, order=triggerMetadata, optional"`
MsgSpoolUsageTarget int64 `keda:"name=messageSpoolUsageTarget, order=triggerMetadata, optional"` // Spool Use Target in Megabytes
MsgRxRateTarget int64 `keda:"name=messageReceiveRateTarget, order=triggerMetadata, optional"` // Ingress Rate Target per consumer in msgs/second

// Activation Target Message Count
activationMsgCountTarget int
activationMsgSpoolUsageTarget int // Spool Use Target in Megabytes
activationMsgRxRateTarget int // Ingress Rate Target per consumer in msgs/second
// Scaler index
triggerIndex int
ActivationMsgCountTarget int `keda:"name=activationMessageCountTarget, order=triggerMetadata, optional, default=0"`
ActivationMsgSpoolUsageTarget int `keda:"name=activationMessageSpoolUsageTarget, order=triggerMetadata, optional, default=0"` // Spool Use Target in Megabytes
ActivationMsgRxRateTarget int `keda:"name=activationMessageReceiveRateTarget, order=triggerMetadata, optional, default=0"` // Ingress Rate Target per consumer in msgs/second
}

func (s *SolaceMetadata) Validate() error {
// Check that we have at least one positive target value for the scaler
if s.MsgCountTarget < 1 && s.MsgSpoolUsageTarget < 1 && s.MsgRxRateTarget < 1 {
return fmt.Errorf("no target value found in the scaler configuration")
}

// Convert Megabyte values to Bytes
s.MsgSpoolUsageTarget = s.MsgSpoolUsageTarget * 1024 * 1024
s.ActivationMsgSpoolUsageTarget = s.ActivationMsgSpoolUsageTarget * 1024 * 1024

return nil
}

// SEMP API Response Root Struct
Expand Down Expand Up @@ -164,146 +176,24 @@ func NewSolaceScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

// Called by constructor
func parseSolaceMetadata(config *scalersconfig.ScalerConfig) (*SolaceMetadata, error) {
meta := SolaceMetadata{}
// GET THE SEMP API ENDPOINT
if val, ok := config.TriggerMetadata[solaceMetaSempBaseURL]; ok && val != "" {
meta.solaceSempURL = val
} else {
return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaSempBaseURL)
}
// GET Message VPN
if val, ok := config.TriggerMetadata[solaceMetaMsgVpn]; ok && val != "" {
meta.messageVpn = val
} else {
return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaMsgVpn)
}
// GET Queue Name
if val, ok := config.TriggerMetadata[solaceMetaQueueName]; ok && val != "" {
meta.queueName = val
} else {
return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaQueueName)
}

// GET METRIC TARGET VALUES
// GET msgCountTarget
meta.msgCountTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaMsgCountTarget]; ok && val != "" {
if msgCount, err := strconv.ParseInt(val, 10, 64); err == nil {
meta.msgCountTarget = msgCount
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaMsgCountTarget, err)
}
}
// GET msgSpoolUsageTarget
meta.msgSpoolUsageTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaMsgSpoolUsageTarget]; ok && val != "" {
if msgSpoolUsage, err := strconv.ParseInt(val, 10, 64); err == nil {
meta.msgSpoolUsageTarget = msgSpoolUsage * 1024 * 1024
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaMsgSpoolUsageTarget, err)
}
}
// GET msgRcvRateTarget
meta.msgRxRateTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaMsgRxRateTarget]; ok && val != "" {
if msgRcvRate, err := strconv.ParseInt(val, 10, 64); err == nil {
meta.msgRxRateTarget = msgRcvRate
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaMsgRxRateTarget, err)
}
}

// Check that we have at least one positive target value for the scaler
if meta.msgCountTarget < 1 && meta.msgSpoolUsageTarget < 1 && meta.msgRxRateTarget < 1 {
return nil, fmt.Errorf("no target value found in the scaler configuration")
}

// GET ACTIVATION METRIC TARGET VALUES
// GET activationMsgCountTarget
meta.activationMsgCountTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaActivationMsgCountTarget]; ok && val != "" {
if activationMsgCountTarget, err := strconv.Atoi(val); err == nil {
meta.activationMsgCountTarget = activationMsgCountTarget
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaActivationMsgCountTarget, err)
}
}
// GET activationMsgSpoolUsageTarget
meta.activationMsgSpoolUsageTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaActivationMsgSpoolUsageTarget]; ok && val != "" {
if activationMsgSpoolUsageTarget, err := strconv.Atoi(val); err == nil {
meta.activationMsgSpoolUsageTarget = activationMsgSpoolUsageTarget * 1024 * 1024
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaActivationMsgSpoolUsageTarget, err)
}
}
meta.activationMsgRxRateTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaActivationMsgRxRateTarget]; ok && val != "" {
if activationMsgRxRateTarget, err := strconv.Atoi(val); err == nil {
meta.activationMsgRxRateTarget = activationMsgRxRateTarget
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaActivationMsgRxRateTarget, err)
}
meta := &SolaceMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}
meta.triggerIndex = config.TriggerIndex

// Format Solace SEMP Queue Endpoint (REST URL)
meta.endpointURL = fmt.Sprintf(
meta.EndpointURL = fmt.Sprintf(
solaceSempEndpointURLTemplate,
meta.solaceSempURL,
meta.SolaceMetaSempBaseURL,
solaceAPIName,
solaceAPIVersion,
meta.messageVpn,
meta.MessageVpn,
solaceAPIObjectTypeQueue,
url.QueryEscape(meta.queueName),
url.QueryEscape(meta.QueueName),
)

// Get Credentials
var e error
if meta.username, meta.password, e = getSolaceSempCredentials(config); e != nil {
return nil, e
}

meta.triggerIndex = config.TriggerIndex

return &meta, nil
}

func getSolaceSempCredentials(config *scalersconfig.ScalerConfig) (u string, p string, err error) {
// GET CREDENTIALS
// The username must be a valid broker ADMIN user identifier with read access to SEMP for the broker, VPN, and relevant objects
// The scaler will attempt to acquire username and then password independently. For each:
// - Search K8S Secret (Encoded)
// - Search environment variable specified by config at 'usernameFromEnv' / 'passwordFromEnv'
// - Search 'username' / 'password' fields (Clear Text)
// Get username
if usernameSecret, ok := config.AuthParams[solaceMetaUsername]; ok && usernameSecret != "" {
u = usernameSecret
} else if usernameFromEnv, ok := config.TriggerMetadata[solaceMetaUsernameFromEnv]; ok && usernameFromEnv != "" {
if resolvedUser, ok := config.ResolvedEnv[config.TriggerMetadata[solaceMetaUsernameFromEnv]]; ok && resolvedUser != "" {
u = resolvedUser
} else {
return "", "", fmt.Errorf("username could not be resolved from the environment variable: %s", usernameFromEnv)
}
} else if usernameClear, ok := config.TriggerMetadata[solaceMetaUsername]; ok && usernameClear != "" {
u = usernameClear
} else {
return "", "", fmt.Errorf("username is required and not found in K8Secret, environment, or clear text")
}
// Get Password
if passwordSecret, ok := config.AuthParams[solaceMetaPassword]; ok && passwordSecret != "" {
p = passwordSecret
} else if passwordEnv, ok := config.TriggerMetadata[solaceMetaPasswordFromEnv]; ok && passwordEnv != "" {
if resolvedPassword, ok := config.ResolvedEnv[config.TriggerMetadata[solaceMetaPasswordFromEnv]]; ok && resolvedPassword != "" {
p = resolvedPassword
} else {
return "", "", fmt.Errorf("password could not be resolved from the environment variable: %s", passwordEnv)
}
} else if passwordClear, ok := config.TriggerMetadata[solaceMetaPassword]; ok && passwordClear != "" {
p = passwordClear
} else {
return "", "", fmt.Errorf("password is required and not found in K8Secret, environment, or clear text")
}
return u, p, nil
return meta, nil
}

// INTERFACE METHOD
Expand All @@ -317,37 +207,37 @@ func getSolaceSempCredentials(config *scalersconfig.ScalerConfig) (u string, p s
func (s *SolaceScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricSpecList []v2.MetricSpec
// Message Count Target Spec
if s.metadata.msgCountTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.queueName, solaceTriggermsgcount))
if s.metadata.MsgCountTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.QueueName, solaceTriggermsgcount))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.msgCountTarget),
Target: GetMetricTarget(s.metricType, s.metadata.MsgCountTarget),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: solaceExtMetricType}
metricSpecList = append(metricSpecList, metricSpec)
}
// Message Spool Usage Target Spec
if s.metadata.msgSpoolUsageTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.queueName, solaceTriggermsgspoolusage))
if s.metadata.MsgSpoolUsageTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.QueueName, solaceTriggermsgspoolusage))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.msgSpoolUsageTarget),
Target: GetMetricTarget(s.metricType, s.metadata.MsgSpoolUsageTarget),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: solaceExtMetricType}
metricSpecList = append(metricSpecList, metricSpec)
}
// Message Receive Rate Target Spec
if s.metadata.msgRxRateTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.queueName, solaceTriggermsgrxrate))
if s.metadata.MsgRxRateTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.QueueName, solaceTriggermsgrxrate))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.msgRxRateTarget),
Target: GetMetricTarget(s.metricType, s.metadata.MsgRxRateTarget),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: solaceExtMetricType}
metricSpecList = append(metricSpecList, metricSpec)
Expand All @@ -357,7 +247,7 @@ func (s *SolaceScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec

// returns SolaceMetricValues struct populated from broker SEMP endpoint
func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (SolaceMetricValues, error) {
var scaledMetricEndpointURL = s.metadata.endpointURL
var scaledMetricEndpointURL = s.metadata.EndpointURL
var httpClient = s.httpClient
var sempResponse solaceSEMPResponse
var metricValues SolaceMetricValues
Expand All @@ -370,7 +260,7 @@ func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (Solac
}

// Add HTTP Auth and Headers
request.SetBasicAuth(s.metadata.username, s.metadata.password)
request.SetBasicAuth(s.metadata.Username, s.metadata.Password)
request.Header.Set("Content-Type", "application/json")

// Call Solace SEMP API
Expand Down Expand Up @@ -429,9 +319,9 @@ func (s *SolaceScaler) GetMetricsAndActivity(ctx context.Context, metricName str
return []external_metrics.ExternalMetricValue{}, false, err
}
return []external_metrics.ExternalMetricValue{metric},
metricValues.msgCount > s.metadata.activationMsgCountTarget ||
metricValues.msgSpoolUsage > s.metadata.activationMsgSpoolUsageTarget ||
metricValues.msgRcvRate > s.metadata.activationMsgRxRateTarget,
metricValues.msgCount > s.metadata.ActivationMsgCountTarget ||
metricValues.msgSpoolUsage > s.metadata.ActivationMsgSpoolUsageTarget ||
metricValues.msgRcvRate > s.metadata.ActivationMsgRxRateTarget,
nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/solace_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,8 @@ func TestSolaceParseSolaceMetadata(t *testing.T) {
default:
fmt.Println(" --> PASS")
}
if !testData.isError && strings.Contains(testData.metadata["queueName"], "/") && !strings.Contains(meta.endpointURL, url.QueryEscape(testData.metadata["queueName"])) {
t.Error("expected endpointURL to query escape special characters in the URL but got:", meta.endpointURL)
if !testData.isError && strings.Contains(testData.metadata["queueName"], "/") && !strings.Contains(meta.EndpointURL, url.QueryEscape(testData.metadata["queueName"])) {
t.Error("expected endpointURL to query escape special characters in the URL but got:", meta.EndpointURL)
fmt.Println(" --> FAIL")
}
}
Expand Down

0 comments on commit fd4fcc3

Please sign in to comment.