Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,22 @@ func main() {
ctx, stop, app := newApp()
defer stop()

go func() {
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":2112", nil)
if err != nil {
fmt.Println("Error starting prometheus server: ", err.Error())
go func(ctx context.Context) {
ctxx, cancel := context.WithCancel(ctx)
defer cancel()
select {
case <-ctxx.Done():
fmt.Println("Shutting down prometheus server")
return
default:
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":2112", nil)
if err != nil {
fmt.Println("Error starting prometheus server: ", err.Error())
}
os.Exit(1)
}
os.Exit(1)
}()
}(ctx)

err := app.RunContext(ctx, os.Args)
// If required flags aren't set, it will return with error before we could set up logging
Expand Down
74 changes: 58 additions & 16 deletions pkg/cloudscale/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -38,6 +39,12 @@ const (
namespaceLabel = "crossplane.io/claim-namespace"
)

type ObjectStorageData struct {
cloudscale.BucketMetricsData
BucketDetail
Organization string
}

func NewObjectStorage(client *cloudscale.Client, k8sClient k8s.Client, controlApiClient k8s.Client, salesOrder, clusterId string, cloudZone string, uomMapping map[string]string, providerMetrics map[string]prometheus.Counter) (*ObjectStorage, error) {
return &ObjectStorage{
client: client,
Expand All @@ -61,8 +68,31 @@ func (o *ObjectStorage) GetMetrics(ctx context.Context, billingDate time.Time) (
if err != nil {
o.providerMetrics["providerFailed"].Inc()
return nil, err
} else {
o.providerMetrics["providerSucceeded"].Inc()
}

bucketMap := make(map[string]*ObjectStorageData)

// create a map with bucket name as key, this way we match buckets created manually and not via Appcat service
for i, bucketMetric := range bucketMetrics.Data {
bucketMap[bucketMetric.Subject.BucketName] = &ObjectStorageData{
BucketMetricsData: bucketMetrics.Data[i],
}
}

// Since our buckets are always created in the convention $namespace.$bucketname, we can extract the namespace from the bucket name by splitting it.
// However, we need to fetch the user details to get the actual namespace.
for key, bucket := range bucketMap {
// fetch bucket user by id
logger.Info("fetching user details", "userID", bucket.Subject.ObjectsUserID)
userDetails, err := o.client.ObjectsUsers.Get(ctx, bucket.Subject.ObjectsUserID)
if err != nil {
o.providerMetrics["providerFailed"].Inc()
logger.Error(err, "unknown userID, something broke here fatally", "userID", bucket.Subject.ObjectsUserID, "bucket", bucket)
// deleting this bucket as it's unsuable
delete(bucketMap, key)
continue
}
bucket.BucketDetail.Namespace = strings.Split(userDetails.DisplayName, ".")[0]
}

// Fetch organisations in case salesOrder is missing
Expand All @@ -71,6 +101,7 @@ func (o *ObjectStorage) GetMetrics(ctx context.Context, billingDate time.Time) (
logger.V(1).Info("Sales order id is missing, fetching namespaces to get the associated org id")
nsTenants, err = kubernetes.FetchNamespaceWithOrganizationMap(ctx, o.k8sClient)
if err != nil {
o.providerMetrics["providerFailed"].Inc()
return nil, err
}
}
Expand All @@ -79,37 +110,48 @@ func (o *ObjectStorage) GetMetrics(ctx context.Context, billingDate time.Time) (

buckets, err := fetchBuckets(ctx, o.k8sClient)
if err != nil {
o.providerMetrics["providerFailed"].Inc()
return nil, err
}

allRecords := make([]odoo.OdooMeteredBillingRecord, 0)
for _, bucketMetricsData := range bucketMetrics.Data {
name := bucketMetricsData.Subject.BucketName
logger = logger.WithValues("bucket", name)
bd, ok := buckets[name]
if !ok {
logger.Info("unable to sync bucket, ObjectBucket not found")
continue
for name, bucket := range bucketMap {
if val, ok := buckets[name]; ok {
bucket.Zone = val.Zone
}

// assign organisation to bucketMap
if val, ok := nsTenants[bucket.Namespace]; ok {
bucket.Organization = val
}
}

allRecords := make([]odoo.OdooMeteredBillingRecord, 0)
for _, bucket := range bucketMap {

appuioManaged := true
salesOrder := o.salesOrder
if salesOrder == "" {
appuioManaged = false
salesOrder, err = controlAPI.GetSalesOrder(ctx, o.controlApiClient, nsTenants[bd.Namespace])
if bucket.Organization == "" {
// in cases that our VSHN services are using buckets, then Organization is not set, we must default it to "vshn"
// we can't set it in cluster as for customers as then we might run into scheduling issues
bucket.Organization = "vshn"
}
salesOrder, err = controlAPI.GetSalesOrder(ctx, o.controlApiClient, bucket.Organization)
if err != nil {
logger.Error(err, "unable to sync bucket", "namespace", bd.Namespace)
logger.Error(err, "unable to sync bucket", "namespace", bucket, "reason", err)
continue
}
}
records, err := o.createOdooRecord(bucketMetricsData, bd, appuioManaged, salesOrder, billingDate)
records, err := o.createOdooRecord(bucket.BucketMetricsData, bucket.BucketDetail, appuioManaged, salesOrder, billingDate)
if err != nil {
logger.Error(err, "unable to create Odoo Record", "namespace", bd.Namespace)
logger.Error(err, "unable to create Odoo Record", "namespace", bucket.Namespace)
continue
}
allRecords = append(allRecords, records...)
logger.V(1).Info("Created Odoo records", "namespace", bd.Namespace, "records", records)
logger.V(1).Info("Created Odoo records", "namespace", bucket, "records", records)
}

o.providerMetrics["providerSucceeded"].Inc()
return allRecords, nil
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

orgv1 "github.com/appuio/control-api/apis/organization/v1"
"github.com/vshn/billing-collector-cloudservices/pkg/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -77,7 +76,6 @@ func restConfig(kubeconfig string, url string, token string) (*rest.Config, erro
}

func FetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient client.Client) (map[string]string, error) {
logger := log.Logger(ctx)

gvk := schema.GroupVersionKind{
Group: "",
Expand All @@ -96,7 +94,6 @@ func FetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient client.Cli
for _, ns := range list.Items {
orgLabel, ok := ns.GetLabels()[OrganizationLabel]
if !ok {
logger.Info("Organization label not found in namespace", "namespace", ns.GetName())
continue
}
namespaces[ns.GetName()] = orgLabel
Expand Down
Loading