Skip to content

Commit

Permalink
Resolve Pub/Sub resources from scale target's environment (#5701)
Browse files Browse the repository at this point in the history

Signed-off-by: Jeremy Tymes <[email protected]>
  • Loading branch information
jtymes committed Apr 19, 2024
1 parent 38d261a commit 31d848c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Here is an overview of all new **experimental** features:
- **General**: Improve Prometheus metrics to align with best practices ([#4854](https://github.com/kedacore/keda/issues/4854))
- **General**: Support csv-format for WATCH_NAMESPACE env var ([#5670](https://github.com/kedacore/keda/issues/5670))
- **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574))
- **GCP Pub/Sub Scaler**: Add support for resolving resource names from the scale target's environment ([#5693](https://github.com/kedacore/keda/issues/5693))
- **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
- **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544))
Expand Down
97 changes: 77 additions & 20 deletions pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,80 @@ func NewPubSubScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}, nil
}

func parsePubSubResourceConfig(config *scalersconfig.ScalerConfig, meta *pubsubMetadata) error {
sub, subPresent := config.TriggerMetadata["subscriptionName"]
subFromEnv, subFromEnvPresent := config.TriggerMetadata["subscriptionNameFromEnv"]
if subPresent && subFromEnvPresent {
return fmt.Errorf("exactly one of subscriptionName or subscriptionNameFromEnv is allowed")
}
hasSub := subPresent || subFromEnvPresent

topic, topicPresent := config.TriggerMetadata["topicName"]
topicFromEnv, topicFromEnvPresent := config.TriggerMetadata["topicNameFromEnv"]
if topicPresent && topicFromEnvPresent {
return fmt.Errorf("exactly one of topicName or topicNameFromEnv is allowed")
}
hasTopic := topicPresent || topicFromEnvPresent

if (!hasSub && !hasTopic) || (hasSub && hasTopic) {
return fmt.Errorf("exactly one of subscription or topic name must be given")
}

if hasSub {
if subPresent {
if sub == "" {
return fmt.Errorf("no subscription name given")
}

meta.resourceName = sub
} else {
if subFromEnv == "" {
return fmt.Errorf("no environment variable name given for resolving subscription name")
}

resolvedSub, ok := config.ResolvedEnv[subFromEnv]
if !ok {
return fmt.Errorf("resolved environment doesn't contain name '%s'", subFromEnv)
}

if resolvedSub == "" {
return fmt.Errorf("resolved environment subscription name is empty")
}

meta.resourceName = config.ResolvedEnv[subFromEnv]
}

meta.resourceType = resourceTypePubSubSubscription
} else {
if topicPresent {
if topic == "" {
return fmt.Errorf("no topic name given")
}

meta.resourceName = topic
} else {
if topicFromEnv == "" {
return fmt.Errorf("no environment variable name given for resolving topic name")
}

resolvedTopic, ok := config.ResolvedEnv[topicFromEnv]
if !ok {
return fmt.Errorf("resolved environment doesn't contain name '%s'", topicFromEnv)
}

if resolvedTopic == "" {
return fmt.Errorf("resolved environment topic name is empty")
}

meta.resourceName = config.ResolvedEnv[topicFromEnv]
}

meta.resourceType = resourceTypePubSubTopic
}

return nil
}

func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*pubsubMetadata, error) {
meta := pubsubMetadata{mode: pubSubModeSubscriptionSize, value: pubSubDefaultValue}

Expand Down Expand Up @@ -106,26 +180,9 @@ func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger)

meta.aggregation = config.TriggerMetadata["aggregation"]

sub, subPresent := config.TriggerMetadata["subscriptionName"]
topic, topicPresent := config.TriggerMetadata["topicName"]
if (!subPresent && !topicPresent) || (subPresent && topicPresent) {
return nil, fmt.Errorf("exactly one of subscription or topic name must be given")
}

if subPresent {
if sub == "" {
return nil, fmt.Errorf("no subscription name given")
}

meta.resourceName = sub
meta.resourceType = resourceTypePubSubSubscription
} else {
if topic == "" {
return nil, fmt.Errorf("no topic name given")
}

meta.resourceName = topic
meta.resourceType = resourceTypePubSubTopic
err := parsePubSubResourceConfig(config, &meta)
if err != nil {
return nil, err
}

meta.activationValue = 0
Expand Down
18 changes: 15 additions & 3 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
)

var testPubSubResolvedEnv = map[string]string{
"SAMPLE_CREDS": "{}",
"SAMPLE_CREDS": "{}",
"MY_ENV_SUBSCRIPTION": "myEnvSubscription",
"MY_ENV_TOPIC": "myEnvTopic",
}

type parsePubSubMetadataTestData struct {
Expand Down Expand Up @@ -76,6 +78,14 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
{nil, map[string]string{"value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both subscriptionSize and topicName present
{nil, map[string]string{"subscriptionSize": "7", "topicName": "mytopic", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both subscriptionName and subscriptionNameFromEnv present
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionNameFromEnv": "MY_ENV_SUBSCRIPTION", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both topicName and topicNameFromEnv present
{nil, map[string]string{"topicName": "mytopic", "topicNameFromEnv": "MY_ENV_TOPIC", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// subscriptionNameFromEnv present
{nil, map[string]string{"subscriptionNameFromEnv": "MY_ENV_SUBSCRIPTION", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// topicNameFromEnv present
{nil, map[string]string{"topicNameFromEnv": "MY_ENV_TOPIC", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
Expand All @@ -90,6 +100,8 @@ var gcpResourceNameTests = []gcpPubSubSubscription{
{&testPubSubMetadata[12], 1, "projects/myproject/mysubscription", ""},
{&testPubSubMetadata[17], 1, "mytopic", "myproject"},
{&testPubSubMetadata[18], 1, "projects/myproject/mytopic", ""},
{&testPubSubMetadata[24], 1, "myEnvSubscription", ""},
{&testPubSubMetadata[25], 1, "myEnvTopic", ""},
}

var gcpSubscriptionDefaults = []gcpPubSubSubscription{
Expand Down Expand Up @@ -140,7 +152,7 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) {
}
}

func TestGcpPubSubSubscriptionName(t *testing.T) {
func TestGcpPubSubResourceName(t *testing.T) {
for _, testData := range gcpResourceNameTests {
meta, err := parsePubSubMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, TriggerIndex: testData.triggerIndex}, logr.Discard())
if err != nil {
Expand All @@ -150,7 +162,7 @@ func TestGcpPubSubSubscriptionName(t *testing.T) {
resourceID, projectID := getResourceData(&mockGcpPubSubScaler)

if resourceID != testData.name || projectID != testData.projectID {
t.Error("Wrong Subscription parsing:", resourceID, projectID)
t.Error("Wrong Resource parsing:", resourceID, projectID)
}
}
}
Expand Down

0 comments on commit 31d848c

Please sign in to comment.