Skip to content

Commit c55fd56

Browse files
plandoltfabpot
authored andcommitted
fixed roundrobin dead transport which should recover
1 parent 83f6687 commit c55fd56

File tree

6 files changed

+192
-10
lines changed

6 files changed

+192
-10
lines changed

FailoverTransportTest.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,69 @@ public function testSendOneDead()
6464
$t->send(new RawMessage(''));
6565
}
6666

67+
public function testSendOneDeadAndRecoveryNotWithinRetryPeriod()
68+
{
69+
$t1 = $this->createMock(TransportInterface::class);
70+
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
71+
$t1->expects($this->once())->method('send');
72+
$t2 = $this->createMock(TransportInterface::class);
73+
$t2->expects($this->exactly(5))->method('send');
74+
$t = new FailoverTransport([$t1, $t2], 40);
75+
$t->send(new RawMessage(''));
76+
sleep(4);
77+
$t->send(new RawMessage(''));
78+
sleep(4);
79+
$t->send(new RawMessage(''));
80+
sleep(4);
81+
$t->send(new RawMessage(''));
82+
sleep(4);
83+
$t->send(new RawMessage(''));
84+
}
85+
86+
public function testSendOneDeadAndRecoveryWithinRetryPeriod()
87+
{
88+
$t1 = $this->createMock(TransportInterface::class);
89+
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
90+
$t1->expects($this->at(1))->method('send');
91+
$t1->expects($this->exactly(3))->method('send');
92+
$t2 = $this->createMock(TransportInterface::class);
93+
$t2->expects($this->at(0))->method('send');
94+
$t2->expects($this->at(1))->method('send');
95+
$t2->expects($this->at(2))->method('send');
96+
$t2->expects($this->at(3))->method('send')->will($this->throwException(new TransportException()));
97+
$t2->expects($this->exactly(4))->method('send');
98+
$t = new FailoverTransport([$t1, $t2], 6);
99+
$t->send(new RawMessage('')); // t1>fail - t2>sent
100+
sleep(4);
101+
$t->send(new RawMessage('')); // t2>sent
102+
sleep(4);
103+
$t->send(new RawMessage('')); // t2>sent
104+
sleep(4);
105+
$t->send(new RawMessage('')); // t2>fail - t1>sent
106+
sleep(4);
107+
$t->send(new RawMessage('')); // t1>sent
108+
}
109+
110+
public function testSendAllDeadWithinRetryPeriod()
111+
{
112+
$t1 = $this->createMock(TransportInterface::class);
113+
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
114+
$t1->expects($this->once())->method('send');
115+
$t2 = $this->createMock(TransportInterface::class);
116+
$t2->expects($this->at(0))->method('send');
117+
$t2->expects($this->at(1))->method('send');
118+
$t2->expects($this->at(2))->method('send')->will($this->throwException(new TransportException()));
119+
$t2->expects($this->exactly(3))->method('send');
120+
$t = new FailoverTransport([$t1, $t2], 40);
121+
$t->send(new RawMessage(''));
122+
sleep(4);
123+
$t->send(new RawMessage(''));
124+
sleep(4);
125+
$this->expectException(TransportException::class);
126+
$this->expectExceptionMessage('All transports failed.');
127+
$t->send(new RawMessage(''));
128+
}
129+
67130
public function testSendOneDeadButRecover()
68131
{
69132
$t1 = $this->createMock(TransportInterface::class);

RoundRobinTransport.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,20 @@ protected function getNextTransport(): ?TransportInterface
6666
if (!$this->isTransportDead($transport)) {
6767
break;
6868
}
69+
6970
if ((microtime(true) - $this->deadTransports[$transport]) > $this->retryPeriod) {
7071
$this->deadTransports->detach($transport);
7172

7273
break;
7374
}
75+
76+
if ($transport) {
77+
$this->transports[] = $transport;
78+
}
79+
80+
if ($this->deadTransports->count() >= \count($this->transports)) {
81+
return null;
82+
}
7483
}
7584

7685
if ($transport) {

RoundRobinTransportTest.php

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,35 @@ public function testSendOneDead()
6464
$t->send(new RawMessage(''));
6565
}
6666

67-
public function testSendOneDeadButRecover()
67+
public function testSendOneDeadAndRecoveryNotWithinRetryPeriod()
6868
{
6969
$t1 = $this->createMock(TransportInterface::class);
70-
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
71-
$t1->expects($this->at(1))->method('send');
70+
$t1->expects($this->exactly(4))->method('send');
7271
$t2 = $this->createMock(TransportInterface::class);
72+
$t2->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
7373
$t2->expects($this->once())->method('send');
74-
$t = new RoundRobinTransport([$t1, $t2], 1);
74+
$t = new RoundRobinTransport([$t1, $t2], 60);
7575
$t->send(new RawMessage(''));
76-
sleep(2);
76+
$t->send(new RawMessage(''));
77+
$t->send(new RawMessage(''));
78+
$t->send(new RawMessage(''));
79+
}
80+
81+
public function testSendOneDeadAndRecoveryWithinRetryPeriod()
82+
{
83+
$t1 = $this->createMock(TransportInterface::class);
84+
$t1->expects($this->exactly(3))->method('send');
85+
$t2 = $this->createMock(TransportInterface::class);
86+
$t2->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
87+
$t2->expects($this->at(1))->method('send');
88+
$t2->expects($this->exactly(2))->method('send');
89+
$t = new RoundRobinTransport([$t1, $t2], 3);
90+
$t->send(new RawMessage(''));
91+
sleep(5);
92+
$t->send(new RawMessage(''));
93+
sleep(5);
94+
$t->send(new RawMessage(''));
95+
sleep(5);
7796
$t->send(new RawMessage(''));
7897
}
7998
}

Tests/Transport/FailoverTransportTest.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,69 @@ public function testSendOneDead()
6464
$t->send(new RawMessage(''));
6565
}
6666

67+
public function testSendOneDeadAndRecoveryNotWithinRetryPeriod()
68+
{
69+
$t1 = $this->createMock(TransportInterface::class);
70+
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
71+
$t1->expects($this->once())->method('send');
72+
$t2 = $this->createMock(TransportInterface::class);
73+
$t2->expects($this->exactly(5))->method('send');
74+
$t = new FailoverTransport([$t1, $t2], 40);
75+
$t->send(new RawMessage(''));
76+
sleep(4);
77+
$t->send(new RawMessage(''));
78+
sleep(4);
79+
$t->send(new RawMessage(''));
80+
sleep(4);
81+
$t->send(new RawMessage(''));
82+
sleep(4);
83+
$t->send(new RawMessage(''));
84+
}
85+
86+
public function testSendOneDeadAndRecoveryWithinRetryPeriod()
87+
{
88+
$t1 = $this->createMock(TransportInterface::class);
89+
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
90+
$t1->expects($this->at(1))->method('send');
91+
$t1->expects($this->exactly(3))->method('send');
92+
$t2 = $this->createMock(TransportInterface::class);
93+
$t2->expects($this->at(0))->method('send');
94+
$t2->expects($this->at(1))->method('send');
95+
$t2->expects($this->at(2))->method('send');
96+
$t2->expects($this->at(3))->method('send')->will($this->throwException(new TransportException()));
97+
$t2->expects($this->exactly(4))->method('send');
98+
$t = new FailoverTransport([$t1, $t2], 6);
99+
$t->send(new RawMessage('')); // t1>fail - t2>sent
100+
sleep(4);
101+
$t->send(new RawMessage('')); // t2>sent
102+
sleep(4);
103+
$t->send(new RawMessage('')); // t2>sent
104+
sleep(4);
105+
$t->send(new RawMessage('')); // t2>fail - t1>sent
106+
sleep(4);
107+
$t->send(new RawMessage('')); // t1>sent
108+
}
109+
110+
public function testSendAllDeadWithinRetryPeriod()
111+
{
112+
$t1 = $this->createMock(TransportInterface::class);
113+
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
114+
$t1->expects($this->once())->method('send');
115+
$t2 = $this->createMock(TransportInterface::class);
116+
$t2->expects($this->at(0))->method('send');
117+
$t2->expects($this->at(1))->method('send');
118+
$t2->expects($this->at(2))->method('send')->will($this->throwException(new TransportException()));
119+
$t2->expects($this->exactly(3))->method('send');
120+
$t = new FailoverTransport([$t1, $t2], 40);
121+
$t->send(new RawMessage(''));
122+
sleep(4);
123+
$t->send(new RawMessage(''));
124+
sleep(4);
125+
$this->expectException(TransportException::class);
126+
$this->expectExceptionMessage('All transports failed.');
127+
$t->send(new RawMessage(''));
128+
}
129+
67130
public function testSendOneDeadButRecover()
68131
{
69132
$t1 = $this->createMock(TransportInterface::class);

Tests/Transport/RoundRobinTransportTest.php

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,35 @@ public function testSendOneDead()
6464
$t->send(new RawMessage(''));
6565
}
6666

67-
public function testSendOneDeadButRecover()
67+
public function testSendOneDeadAndRecoveryNotWithinRetryPeriod()
6868
{
6969
$t1 = $this->createMock(TransportInterface::class);
70-
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
71-
$t1->expects($this->at(1))->method('send');
70+
$t1->expects($this->exactly(4))->method('send');
7271
$t2 = $this->createMock(TransportInterface::class);
72+
$t2->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
7373
$t2->expects($this->once())->method('send');
74-
$t = new RoundRobinTransport([$t1, $t2], 1);
74+
$t = new RoundRobinTransport([$t1, $t2], 60);
7575
$t->send(new RawMessage(''));
76-
sleep(2);
76+
$t->send(new RawMessage(''));
77+
$t->send(new RawMessage(''));
78+
$t->send(new RawMessage(''));
79+
}
80+
81+
public function testSendOneDeadAndRecoveryWithinRetryPeriod()
82+
{
83+
$t1 = $this->createMock(TransportInterface::class);
84+
$t1->expects($this->exactly(3))->method('send');
85+
$t2 = $this->createMock(TransportInterface::class);
86+
$t2->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
87+
$t2->expects($this->at(1))->method('send');
88+
$t2->expects($this->exactly(2))->method('send');
89+
$t = new RoundRobinTransport([$t1, $t2], 3);
90+
$t->send(new RawMessage(''));
91+
sleep(5);
92+
$t->send(new RawMessage(''));
93+
sleep(5);
94+
$t->send(new RawMessage(''));
95+
sleep(5);
7796
$t->send(new RawMessage(''));
7897
}
7998
}

Transport/RoundRobinTransport.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,20 @@ protected function getNextTransport(): ?TransportInterface
6666
if (!$this->isTransportDead($transport)) {
6767
break;
6868
}
69+
6970
if ((microtime(true) - $this->deadTransports[$transport]) > $this->retryPeriod) {
7071
$this->deadTransports->detach($transport);
7172

7273
break;
7374
}
75+
76+
if ($transport) {
77+
$this->transports[] = $transport;
78+
}
79+
80+
if ($this->deadTransports->count() >= \count($this->transports)) {
81+
return null;
82+
}
7483
}
7584

7685
if ($transport) {

0 commit comments

Comments
 (0)