Skip to content

Add CyberArk client and mock server #664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/client/client_cyberark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package client

import (
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
)

type CyberArkClient = dataupload.CyberArkClient

var NewCyberArkClient = dataupload.NewCyberArkClient
154 changes: 154 additions & 0 deletions pkg/internal/cyberark/dataupload/dataupload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package dataupload

import (
"bytes"
"context"
"crypto/sha256"
"crypto/x509"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"

"k8s.io/client-go/transport"

"github.com/jetstack/preflight/api"
"github.com/jetstack/preflight/pkg/version"
)

const (
// maxRetrievePresignedUploadURLBodySize is the maximum allowed size for a response body from the
// Retrieve Presigned Upload URL service.
maxRetrievePresignedUploadURLBodySize = 10 * 1024
)

type CyberArkClient struct {
baseURL string
client *http.Client

authenticateRequest func(req *http.Request) error
}

type Options struct {
ClusterName string
ClusterDescription string
}

func NewCyberArkClient(trustedCAs *x509.CertPool, baseURL string, authenticateRequest func(req *http.Request) error) (*CyberArkClient, error) {
cyberClient := &http.Client{}
tr := http.DefaultTransport.(*http.Transport).Clone()
if trustedCAs != nil {
tr.TLSClientConfig.RootCAs = trustedCAs
}
cyberClient.Transport = transport.DebugWrappers(tr)

return &CyberArkClient{
baseURL: baseURL,
client: cyberClient,
authenticateRequest: authenticateRequest,
}, nil
}

func (c *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, payload api.DataReadingsPost, opts Options) error {
if opts.ClusterName == "" {
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
}

encodedBody := &bytes.Buffer{}
checksum := sha256.New()
if err := json.NewEncoder(io.MultiWriter(encodedBody, checksum)).Encode(payload); err != nil {
return err
}

presignedUploadURL, err := c.retrievePresignedUploadURL(ctx, hex.EncodeToString(checksum.Sum(nil)), opts)
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, presignedUploadURL, encodedBody)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
version.SetUserAgent(req)

res, err := c.client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

if code := res.StatusCode; code < 200 || code >= 300 {
body, _ := io.ReadAll(io.LimitReader(res.Body, 500))
if len(body) == 0 {
body = []byte(`<empty body>`)
}
return fmt.Errorf("received response with status code %d: %s", code, bytes.TrimSpace(body))
}

return nil
}

func (c *CyberArkClient) retrievePresignedUploadURL(ctx context.Context, checksum string, opts Options) (string, error) {
uploadURL, err := url.JoinPath(c.baseURL, "/api/data/kubernetes/upload")
if err != nil {
return "", err
}

request := struct {
ClusterID string `json:"cluster_id"`
ClusterDescription string `json:"cluster_description"`
Checksum string `json:"checksum_sha256"`
}{
ClusterID: opts.ClusterName,
ClusterDescription: opts.ClusterDescription,
Checksum: checksum,
}

encodedBody := &bytes.Buffer{}
if err := json.NewEncoder(encodedBody).Encode(request); err != nil {
return "", err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, encodedBody)
if err != nil {
return "", err
}

req.Header.Set("Content-Type", "application/json")
if err := c.authenticateRequest(req); err != nil {
return "", fmt.Errorf("failed to authenticate request")
}
version.SetUserAgent(req)

res, err := c.client.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()

if code := res.StatusCode; code < 200 || code >= 300 {
body, _ := io.ReadAll(io.LimitReader(res.Body, 500))
if len(body) == 0 {
body = []byte(`<empty body>`)
}
return "", fmt.Errorf("received response with status code %d: %s", code, bytes.TrimSpace(body))
}

response := struct {
URL string `json:"url"`
}{}

if err := json.NewDecoder(io.LimitReader(res.Body, maxRetrievePresignedUploadURLBodySize)).Decode(&response); err != nil {
if err == io.ErrUnexpectedEOF {
return "", fmt.Errorf("rejecting JSON response from server as it was too large or was truncated")
}

return "", fmt.Errorf("failed to parse JSON from otherwise successful request to start data upload: %s", err)
}

return response.URL, nil
}
120 changes: 120 additions & 0 deletions pkg/internal/cyberark/dataupload/dataupload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package dataupload_test

import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/jetstack/preflight/api"
"github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
)

func TestCyberArkClient_PostDataReadingsWithOptions(t *testing.T) {
fakeTime := time.Unix(123, 0)
defaultPayload := api.DataReadingsPost{
AgentMetadata: &api.AgentMetadata{
Version: "test-version",
ClusterID: "test",
},
DataGatherTime: fakeTime,
DataReadings: []*api.DataReading{
{
ClusterID: "success-cluster-id",
DataGatherer: "test-gatherer",
Timestamp: api.Time{Time: fakeTime},
Data: map[string]interface{}{"test": "data"},
SchemaVersion: "v1",
},
},
}
defaultOpts := dataupload.Options{
ClusterName: "success-cluster-id",
ClusterDescription: "success-cluster-description",
}

setToken := func(token string) func(*http.Request) error {
return func(req *http.Request) error {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
return nil
}
}

tests := []struct {
name string
payload api.DataReadingsPost
authenticate func(req *http.Request) error
opts dataupload.Options
requireFn func(t *testing.T, err error)
}{
{
name: "successful upload",
payload: defaultPayload,
opts: defaultOpts,
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
name: "error when cluster name is empty",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: ""},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "programmer mistake: the cluster name")
},
},
{
name: "error when bearer token is incorrect",
payload: defaultPayload,
opts: defaultOpts,
authenticate: setToken("fail-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "received response with status code 500: should authenticate using the correct bearer token")
},
},
{
name: "invalid JSON from server (RetrievePresignedUploadURL step)",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: "invalid-json-retrieve-presigned", ClusterDescription: defaultOpts.ClusterDescription},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "rejecting JSON response from server as it was too large or was truncated")
},
},
{
name: "500 from server (PostData step)",
payload: defaultPayload,
opts: dataupload.Options{ClusterName: "invalid-response-post-data", ClusterDescription: defaultOpts.ClusterDescription},
authenticate: setToken("success-token"),
requireFn: func(t *testing.T, err error) {
require.ErrorContains(t, err, "received response with status code 500: mock error")
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
server := dataupload.MockDataUploadServer()
defer server.Close()

certPool := x509.NewCertPool()
require.True(t, certPool.AppendCertsFromPEM(pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: server.Server.TLS.Certificates[0].Certificate[0],
})))

cyberArkClient, err := dataupload.NewCyberArkClient(certPool, server.Server.URL, tc.authenticate)
require.NoError(t, err)

err = cyberArkClient.PostDataReadingsWithOptions(context.TODO(), tc.payload, tc.opts)
tc.requireFn(t, err)
})
}
}
Loading