Skip to content

Commit 1e8951b

Browse files
committed
prefer host near client
1 parent 94512f5 commit 1e8951b

File tree

8 files changed

+195
-11
lines changed

8 files changed

+195
-11
lines changed

client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestClient(t *testing.T) {
4545
if err != nil {
4646
t.Fatal(err)
4747
}
48-
_, err = tr.db.Exec("insert into rack(rackid, zoneid, subnet) values(1, 1, '0.0.0.0/0')")
48+
_, err = tr.db.Exec("insert into rack(rackid, zoneid) values(1, 1)")
4949
if err != nil {
5050
t.Fatal(err)
5151
}

config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func init() {
1717

1818
func cleanDB(t *testing.T, db *sql.DB) {
1919
t.Helper()
20-
tables := []string{"file_on", "tempfile", "file", "device", "host", "rack", "zone"}
20+
tables := []string{"file_on", "tempfile", "file", "device", "host", "subnet", "rack", "zone"}
2121
for _, table := range tables {
2222
_, err := db.Exec("delete from " + table)
2323
if err != nil {

drain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (d *Drainer) moveFile(fid int64) error {
112112
if err != nil {
113113
return err
114114
}
115-
ad, err := findAliveDevice(d.db, fi.Size(), d.Dest)
115+
ad, err := findAliveDevice(d.db, fi.Size(), d.Dest, "")
116116
if err != nil {
117117
return err
118118
}

drain_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestDrain(t *testing.T) {
2525
if err != nil {
2626
t.Fatal(err)
2727
}
28-
_, err = tr.db.Exec("insert into rack(rackid, zoneid, subnet) values(1, 1, '0.0.0.0/0')")
28+
_, err = tr.db.Exec("insert into rack(rackid, zoneid) values(1, 1)")
2929
if err != nil {
3030
t.Fatal(err)
3131
}

schema.sql

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,19 @@ CREATE TABLE `zone` (
99
CREATE TABLE `rack` (
1010
`rackid` mediumint(8) unsigned NOT NULL,
1111
`zoneid` mediumint(8) unsigned NOT NULL,
12-
`subnet` varchar(18) NOT NULL,
12+
`name` varchar(40),
1313
PRIMARY KEY (`rackid`),
1414
FOREIGN KEY (`zoneid`) REFERENCES `zone` (`zoneid`)
1515
);
1616

17+
CREATE TABLE `subnet` (
18+
`subnetid` mediumint(8) unsigned NOT NULL,
19+
`rackid` mediumint(8) unsigned NOT NULL,
20+
`subnet` varchar(18) NOT NULL,
21+
PRIMARY KEY (`subnetid`),
22+
FOREIGN KEY (`rackid`) REFERENCES `rack` (`rackid`)
23+
);
24+
1725
CREATE TABLE `host` (
1826
`hostid` mediumint(8) unsigned NOT NULL,
1927
`status` enum('alive','dead','down') NOT NULL DEFAULT 'alive',

server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func setupServer(t *testing.T, ttl time.Duration) (s *Server, closeFunc func())
3838
if err != nil {
3939
t.Fatal(err)
4040
}
41-
_, err = s.db.Exec("insert into rack(rackid, zoneid, subnet) values(1, 1, '0.0.0.0/0')")
41+
_, err = s.db.Exec("insert into rack(rackid, zoneid) values(1, 1)")
4242
if err != nil {
4343
t.Fatal(err)
4444
}

tracker.go

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (t *Tracker) createOpen(w http.ResponseWriter, r *http.Request) {
232232
return
233233
}
234234
}
235-
d, err := findAliveDevice(t.db, int64(size), nil)
235+
d, err := findAliveDevice(t.db, int64(size), nil, getClientIP(r))
236236
if err == errNoDeviceAvailable {
237237
http.Error(w, "no device available", http.StatusServiceUnavailable)
238238
return
@@ -262,6 +262,10 @@ func (t *Tracker) createOpen(w http.ResponseWriter, r *http.Request) {
262262
var errNoDeviceAvailable = errors.New("no device available")
263263

264264
type aliveDevice struct {
265+
zoneid int64
266+
rackid int64
267+
hostid int64
268+
hostip string
265269
hostname string
266270
httpPort int64
267271
devid int64
@@ -271,7 +275,7 @@ func (d *aliveDevice) PatchURL(fid int64) string {
271275
return fmt.Sprintf("http://%s:%d/dev%d/%s", d.hostname, d.httpPort, d.devid, vivify(fid))
272276
}
273277

274-
func findAliveDevice(db *sql.DB, size int64, devids []int64) (*aliveDevice, error) {
278+
func findAliveDevice(db *sql.DB, size int64, devids []int64, clientIP string) (*aliveDevice, error) {
275279
var devidsSQL string
276280
if len(devids) > 0 {
277281
var devidsString []string
@@ -282,9 +286,11 @@ func findAliveDevice(db *sql.DB, size int64, devids []int64) (*aliveDevice, erro
282286
} else {
283287
devidsSQL = "and d.status='alive' "
284288
}
285-
rows, err := db.Query("select h.hostname, d.write_port, d.devid "+
289+
rows, err := db.Query("select z.zoneid, r.rackid, h.hostid, h.hostip, h.hostname, d.devid, d.write_port "+
286290
"from device d "+
287291
"join host h on d.hostid=h.hostid "+
292+
"join rack r on h.rackid=r.rackid "+
293+
"join zone z on r.zoneid=z.zoneid "+
288294
"where h.status='alive' "+
289295
"and bytes_free>= ? "+
290296
devidsSQL+
@@ -297,7 +303,7 @@ func findAliveDevice(db *sql.DB, size int64, devids []int64) (*aliveDevice, erro
297303
defer rows.Close() // nolint: errcheck
298304
for rows.Next() {
299305
var d aliveDevice
300-
err = rows.Scan(&d.hostname, &d.httpPort, &d.devid)
306+
err = rows.Scan(&d.zoneid, &d.rackid, &d.hostid, &d.hostip, &d.hostname, &d.devid, &d.httpPort)
301307
if err != nil {
302308
return nil, err
303309
}
@@ -307,6 +313,27 @@ func findAliveDevice(db *sql.DB, size int64, devids []int64) (*aliveDevice, erro
307313
if err != nil {
308314
return nil, err
309315
}
316+
sameHostDevices := filterSameHost(devices, clientIP)
317+
if len(sameHostDevices) > 0 {
318+
devices = sameHostDevices
319+
} else {
320+
subnets, err := getSubnets(db)
321+
if err != nil {
322+
return nil, err
323+
}
324+
rackID, zoneID, ok := getRackID(subnets, clientIP)
325+
if ok {
326+
sameRackDevices := filterSameRack(devices, rackID)
327+
if len(sameRackDevices) > 0 {
328+
devices = sameRackDevices
329+
} else {
330+
sameZoneDevices := filterSameZone(devices, zoneID)
331+
if len(sameZoneDevices) > 0 {
332+
devices = sameZoneDevices
333+
}
334+
}
335+
}
336+
}
310337
if len(devices) == 0 {
311338
return nil, errNoDeviceAvailable
312339
}
@@ -317,6 +344,82 @@ func findAliveDevice(db *sql.DB, size int64, devids []int64) (*aliveDevice, erro
317344
return &devices[rand.Intn(len(devices))], nil
318345
}
319346

347+
func getRackID(subnets []subnet, clientIP string) (rackid, zoneid int64, ok bool) {
348+
ip := net.ParseIP(clientIP)
349+
for _, s := range subnets {
350+
_, ipnet, err := net.ParseCIDR(s.subnet)
351+
if err != nil {
352+
continue
353+
}
354+
if ipnet.Contains(ip) {
355+
return s.rackid, s.zoneid, true
356+
}
357+
}
358+
return 0, 0, false
359+
}
360+
361+
func filterSameRack(devices []aliveDevice, rackID int64) []aliveDevice {
362+
var ret []aliveDevice
363+
for _, d := range devices {
364+
if d.rackid == rackID {
365+
ret = append(ret, d)
366+
}
367+
}
368+
return ret
369+
}
370+
371+
func filterSameZone(devices []aliveDevice, zoneID int64) []aliveDevice {
372+
var ret []aliveDevice
373+
for _, d := range devices {
374+
if d.zoneid == zoneID {
375+
ret = append(ret, d)
376+
}
377+
}
378+
return ret
379+
}
380+
381+
func getClientIP(req *http.Request) string {
382+
xff := req.Header.Get("x-forwarded-for")
383+
if xff != "" {
384+
return strings.TrimSpace(strings.Split(xff, ",")[0])
385+
}
386+
return req.RemoteAddr
387+
}
388+
389+
func filterSameHost(devices []aliveDevice, clientIP string) []aliveDevice {
390+
var ret []aliveDevice
391+
for _, d := range devices {
392+
if d.hostip == clientIP {
393+
ret = append(ret, d)
394+
}
395+
}
396+
return ret
397+
}
398+
399+
type subnet struct {
400+
subnetid int64
401+
rackid int64
402+
zoneid int64
403+
subnet string
404+
}
405+
406+
func getSubnets(db *sql.DB) ([]subnet, error) {
407+
var ret []subnet
408+
rows, err := db.Query("select subnetid, r.rackid, z.zoneid, subnet from subnet s join rack r on s.rackid=r.rackid join zone z on z.zoneid=r.zoneid")
409+
if err != nil {
410+
return nil, err
411+
}
412+
for rows.Next() {
413+
var s subnet
414+
err = rows.Scan(&s.subnetid, &s.rackid, &s.zoneid, &s.subnet)
415+
if err != nil {
416+
return nil, err
417+
}
418+
ret = append(ret, s)
419+
}
420+
return ret, rows.Err()
421+
}
422+
320423
func (t *Tracker) createClose(w http.ResponseWriter, r *http.Request) {
321424
if r.Method != http.MethodPost {
322425
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)

tracker_test.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func insertHost(t *testing.T, tr *Tracker) {
3737
if err != nil {
3838
t.Fatal(err)
3939
}
40-
_, err = tr.db.Exec("insert into rack(rackid, zoneid, subnet) values(1, 1, '0.0.0.0/0')")
40+
_, err = tr.db.Exec("insert into rack(rackid, zoneid) values(1, 1)")
4141
if err != nil {
4242
t.Fatal(err)
4343
}
@@ -122,6 +122,79 @@ func TestCreateOpen(t *testing.T) {
122122
}
123123
}
124124

125+
func TestCreateOpenSameZone(t *testing.T) {
126+
tr, err := NewTracker(testConfig)
127+
if err != nil {
128+
t.Fatal(err)
129+
}
130+
cleanDB(t, tr.db)
131+
_, err = tr.db.Exec("insert into zone(zoneid, name) values(1, 'zone1')")
132+
if err != nil {
133+
t.Fatal(err)
134+
}
135+
_, err = tr.db.Exec("insert into zone(zoneid, name) values(2, 'zone2')")
136+
if err != nil {
137+
t.Fatal(err)
138+
}
139+
_, err = tr.db.Exec("insert into rack(rackid, zoneid) values(1, 1)")
140+
if err != nil {
141+
t.Fatal(err)
142+
}
143+
_, err = tr.db.Exec("insert into rack(rackid, zoneid) values(2, 1)")
144+
if err != nil {
145+
t.Fatal(err)
146+
}
147+
_, err = tr.db.Exec("insert into rack(rackid, zoneid) values(3, 2)")
148+
if err != nil {
149+
t.Fatal(err)
150+
}
151+
_, err = tr.db.Exec("insert into rack(rackid, zoneid) values(4, 2)")
152+
if err != nil {
153+
t.Fatal(err)
154+
}
155+
_, err = tr.db.Exec("insert into subnet(subnetid, rackid, subnet) values(1, 3, '1.0.0.0/8')")
156+
if err != nil {
157+
t.Fatal(err)
158+
}
159+
_, err = tr.db.Exec("insert into host(hostid, hostname, status, hostip, rackid) values(1, 'foo', 'alive', '1.1.1.1', 3)")
160+
if err != nil {
161+
t.Fatal(err)
162+
}
163+
_, err = tr.db.Exec("insert into host(hostid, hostname, status, hostip, rackid) values(2, 'bar', 'alive', '2.2.2.2', 4)")
164+
if err != nil {
165+
t.Fatal(err)
166+
}
167+
_, err = tr.db.Exec("insert into device(devid, status, hostid, bytes_total, bytes_used, bytes_free, write_port) values(1, 'alive', 1, 1000, 1000, 0, 1234)")
168+
if err != nil {
169+
t.Fatal(err)
170+
}
171+
_, err = tr.db.Exec("insert into device(devid, status, hostid, bytes_total, bytes_used, bytes_free, write_port) values(2, 'alive', 2, 1000, 500, 500, 1234)")
172+
if err != nil {
173+
t.Fatal(err)
174+
}
175+
_, err = tr.db.Exec("alter table tempfile auto_increment = 5")
176+
if err != nil {
177+
t.Fatal(err)
178+
}
179+
req, err := http.NewRequest("POST", "/create-open?size=100", nil)
180+
if err != nil {
181+
t.Fatal(err)
182+
}
183+
req.Header.Set("x-forwarded-for", "1.1.1.1")
184+
rr := httptest.NewRecorder()
185+
186+
tr.server.Handler.ServeHTTP(rr, req)
187+
if status := rr.Code; status != http.StatusOK {
188+
t.Errorf("handler returned wrong status code: got %v want %v",
189+
status, http.StatusOK)
190+
}
191+
expected := "{\"path\":\"http://bar:1234/dev2/0/000/000/0000000005.fid\",\"fid\":5}\n"
192+
if rr.Body.String() != expected {
193+
t.Errorf("handler returned unexpected body: got %v want %v",
194+
rr.Body.String(), expected)
195+
}
196+
}
197+
125198
func TestCreateClose(t *testing.T) {
126199
tr, err := NewTracker(testConfig)
127200
if err != nil {

0 commit comments

Comments
 (0)