Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func Initialize(resumeJobID common.JobID, isBench bool) error {
}

if buildmode.IsMover {
StartSystemStatsMonitorForJob(jobID)
//StartSystemStatsMonitorForJob(jobID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we commenting this out?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This we have commented out in c2c-stage branch itself.
We will enable it back once we move to mover/stage2.
I think we have enough memory stats from rolling stats that we log from main.go in worker.

}

return nil
Expand Down
31 changes: 31 additions & 0 deletions common/credentialFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"fmt"
"sync"
"time"

gcpUtils "cloud.google.com/go/storage"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
Expand Down Expand Up @@ -62,6 +63,31 @@ func (o CredentialOpOptions) panicError(err error) {
}
}

// Constants for private network transport
const HttpsRetryAttempts = 5
const PeCheckCooldownTimeInSecs = 3600

func createS3ClientForPrivateNetwork(credInfo CredentialInfo) (*minio.Client, error) {
peIP := privateNetworkArgs.PrivateEndpointIPs
baseS3Host := credInfo.S3CredentialInfo.Endpoint
// Doublecheck endpoint should contain bucketname : "<bucketname>.s3.<region>.amazonaws.com"
s3Host := privateNetworkArgs.BucketName + "." + credInfo.S3CredentialInfo.Endpoint
transport := NewRoundRobinTransport(peIP, s3Host, HttpsRetryAttempts, PeCheckCooldownTimeInSecs*time.Second)
// Create MinIO client
client, err := minio.New(baseS3Host, &minio.Options{
Creds: credentials.New(credInfo.S3CredentialInfo.Provider),
Secure: true,
Transport: transport,
Region: credInfo.S3CredentialInfo.Region,
BucketLookup: minio.BucketLookupDNS, // force virtual-hosted style
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this handle buckets that have naming conventions only compatible with path-style url?

})
if err != nil {
return nil, fmt.Errorf("failed to create MinIO client: %v", err)
}
client.SetS3EnableDualstack(false)
return client, nil
}

// CreateS3Credential creates AWS S3 credential according to credential info.
func CreateS3Credential(ctx context.Context, credInfo CredentialInfo, options CredentialOpOptions) (*credentials.Credentials, error) {
switch credInfo.CredentialType {
Expand All @@ -81,6 +107,11 @@ func CreateS3Credential(ctx context.Context, credInfo CredentialInfo, options Cr
}

func CreateS3ClientFromProvider(credInfo CredentialInfo) (*minio.Client, error) {
if IsPrivateNetworkEnabled() {
fmt.Println("Creating S3 Client for Private Network")
s3Client, err := createS3ClientForPrivateNetwork(credInfo)
return s3Client, err
}
fmt.Println("Creating S3 Client for public access")
cred := credentials.New(credInfo.S3CredentialInfo.Provider)
//s3Client, err := minio.NewWithCredentials(credInfo.S3CredentialInfo.Endpoint, creds, true, credInfo.S3CredentialInfo.Region)
Expand Down
14 changes: 14 additions & 0 deletions common/fdatasync_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//go:build linux

package common

import (
"os"

"golang.org/x/sys/unix"
)

// Fdatasync attempts fdatasync on Linux; returns error if it fails.
func Fdatasync(f *os.File) error {
return unix.Fdatasync(int(f.Fd()))
}
8 changes: 8 additions & 0 deletions common/fdatasync_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//go:build !linux

package common

import "os"

// Fdatasync is a no-op on non-Linux platforms; always returns nil.
func Fdatasync(f *os.File) error { return nil }
23 changes: 19 additions & 4 deletions common/mmf_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
package common

import (
"fmt"
"os"
"sync"
"syscall"

"golang.org/x/sys/unix"
)

const lineEnding = "\n"
Expand Down Expand Up @@ -66,10 +69,22 @@ func NewMMF(file *os.File, writable bool, offset int64, length int64) (*MMF, err
// the MMF is unusable.
func (m *MMF) Unmap() {
m.lock.Lock()
err := syscall.Munmap(m.slice)
m.slice = nil
PanicIfErr(err)
m.isMapped = false
// Best-effort: advise kernel to drop cached pages for this mapping
if m.slice != nil {
// 1) Advise the kernel to drop the mapping's pages from the VM (client-side)
// Use MADV_DONTNEED on the mapping memory region (not on the fd).
if err := unix.Madvise(m.slice, unix.MADV_DONTNEED); err != nil {
// log but don't panic - MADV may not be supported or may fail
GetLifecycleMgr().Info(fmt.Sprintf("[mmf] madvise(MADV_DONTNEED) failed: %v", err))
}

// 2) Unmap the memory region
err := syscall.Munmap(m.slice)
m.slice = nil
PanicIfErr(err)
m.isMapped = false
}

m.lock.Unlock()
}

Expand Down
194 changes: 194 additions & 0 deletions common/privateNetwork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package common

import (
"crypto/tls"
"fmt"
"log"
"net/http"
"regexp"
"strings"
"sync/atomic"
"time"
)

// ==============================================================================================
// For C2C Private Networking configurations
// ==============================================================================================
type PrivateNetworkConfig struct {
Enabled bool // By default private network is disabled unless user explicitly enables it
PrivateEndpointIPs []string // List of private endpoint IPs
BucketName string // Bucket Name required to form Endpoint URL
}

var privateNetworkArgs PrivateNetworkConfig = PrivateNetworkConfig{}

// IPEntry holds one private IP with health info
type IPEntry struct {
IP string
unhealthy int32 // 0 = healthy, 1 = unhealthy
lastChecked time.Time
}

// RoundRobinTransport implements http.RoundTripper with retries and cooldowns
type RoundRobinTransport struct {
ips []*IPEntry
host string
healthyIPs atomic.Value // []*IPEntry, cached healthy list
counter uint32
transport *http.Transport
maxRetries int
cooldown time.Duration // how long to wait before retrying unhealthy IP
}

// Set private network arguments
func SetPrivateNetworkArgs(privateNetworkEnabled bool, privateEndpointIPs []string, bucketName string) {
re := regexp.MustCompile(`[^0-9.]`)
privateNetworkArgs.Enabled = privateNetworkEnabled
for i, ip := range privateEndpointIPs {
ipAddress := strings.TrimSpace(ip) // removes spaces, tabs, newlines
privateNetworkArgs.PrivateEndpointIPs[i] = re.ReplaceAllString(ipAddress, "")
}
privateNetworkArgs.BucketName = bucketName
}

// RoundRobinTransport creates the transport
func NewRoundRobinTransport(ips []string, host string, maxRetries int, cooldown time.Duration) *RoundRobinTransport {
entries := make([]*IPEntry, len(ips))
for i, ip := range ips {
entries[i] = &IPEntry{IP: ip, unhealthy: 0, lastChecked: time.Now()}
fmt.Println("PrivateEndpoint%d IPAddress:%s Unhealthy Status:%d LastChecked :%v\n ", i, entries[i].IP, entries[i].unhealthy, entries[i].lastChecked)
}

tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: false, ServerName: host}

rr := &RoundRobinTransport{
ips: entries,
host: host,
transport: tr,
maxRetries: maxRetries,
cooldown: cooldown,
}
rr.refreshHealthyPool()
return rr
}

// RoundTrip retries request with different IPs up to rr.maxRetries
func (rr *RoundRobinTransport) RoundTrip(req *http.Request) (*http.Response, error) {
var lastErr error
var peIP string
for attempt := 1; attempt < (rr.maxRetries + 1); attempt++ {
healthy := rr.healthyIPs.Load().([]*IPEntry)
if len(healthy) == 0 {
fmt.Errorf("[Attempt %d] No healthy IPs available", attempt)
return nil, fmt.Errorf("no healthy IPs available")
}

idx := atomic.AddUint32(&rr.counter, 1)
entry := healthy[idx%uint32(len(healthy))]
peIP := entry.IP
fmt.Println("[Attempt %d Counter=%d] -> Selected IP: %s Unhealth Status: %d LastTime: %v\n",
attempt, idx, peIP, entry.unhealthy, entry.lastChecked)

// Skip if still in cooldown
if atomic.LoadInt32(&entry.unhealthy) == 1 &&
time.Since(entry.lastChecked) < rr.cooldown {
fmt.Println("[Attempt %d Counter=%d] Skipping IP %s (still in cooldown)",
attempt, idx, peIP)
continue
}

// Try request with this IP
clonedReq := req.Clone(req.Context())

fmt.Println("[RoundTrip] Original request: %s://%s%s", clonedReq.URL.Scheme, clonedReq.URL.Host, clonedReq.URL.Path)
fmt.Println("[RoundTrip] Original Request Header Host: %s", clonedReq.Host)

// Override destination to PE IP:Port
clonedReq.URL.Scheme = req.URL.Scheme
clonedReq.URL.Host = peIP

// Keep original Host header so S3 understands the request
clonedReq.Host = rr.host

fmt.Println("[RoundTrip] Updated request: %s://%s%s", clonedReq.URL.Scheme, clonedReq.URL.Host, clonedReq.URL.Path)
fmt.Println("[RoundTrip] Updated Request Header Host: %s", clonedReq.Host)

resp, err := rr.transport.RoundTrip(clonedReq)
if err == nil {
// Success: mark IP healthy
atomic.StoreInt32(&entry.unhealthy, 0)
rr.refreshHealthyPool()
fmt.Println("[Attempt %d Counter %d] SUCCESS using IP %s", attempt, idx, peIP)
return resp, nil
}

log.Printf("[Attempt %d Counter %d] FAILED using IP %s -> %v", attempt, idx, peIP, err)
if resp != nil {
fmt.Errorf("[RoundTrip] Response: %s %s", resp.Proto, resp.Status)
for k, v := range resp.Header {
fmt.Errorf("[RoundTrip] Response Header: %s: %s", k, strings.Join(v, ", "))
}
}

// Failure: mark unhealthy
atomic.StoreInt32(&entry.unhealthy, 1)
entry.lastChecked = time.Now()
fmt.Println("[Attempt %d Counter %d] Marked IP %s as unhealthy", attempt, idx, peIP)
rr.refreshHealthyPool()

lastErr = err
}

return nil, fmt.Errorf("all retries for ip %v failed after %d attempts: %w", peIP, rr.maxRetries, lastErr)
}

// refreshHealthyPool updates the healthy IP list atomically
func (rr *RoundRobinTransport) refreshHealthyPool() {
var healthy []*IPEntry
for _, e := range rr.ips {
log.Printf("refreshHealthyPool Counter: %d IP Address: %s Unhealthy: %d\n", rr.counter, e.IP, e.unhealthy)
if atomic.LoadInt32(&e.unhealthy) == 0 ||
time.Since(e.lastChecked) >= rr.cooldown {
healthy = append(healthy, e)
}
}
log.Printf("refreshHealthyPool Counter: %d Healthy Pool Count: %d\n", rr.counter, len(healthy))
rr.healthyIPs.Store(healthy)
}

// Close cleans up idle connections
func (rr *RoundRobinTransport) Close() {
rr.transport.CloseIdleConnections()
}

// Function to check if private network is enabled or not. By default it should be disabled and return false
func IsPrivateNetworkEnabled() bool {
if privateNetworkArgs.Enabled {
fmt.Printf("Private Networking is enabled with Private Endpoints: %v and BucketName: %s\n", privateNetworkArgs.PrivateEndpointIPs, privateNetworkArgs.BucketName)
return true
} else {
fmt.Println("Private Networking is not enabled")
return false
}
}
Loading