Skip to content

Commit dbcf56d

Browse files
authored
Merge pull request #7 from olekjs/dev-release-v1.6.0
Add bulk actions
2 parents 5c1c6b7 + d593209 commit dbcf56d

14 files changed

+593
-1
lines changed

src/Builder/Builder.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
use LogicException;
77
use Olekjs\Elasticsearch\Client;
88
use Olekjs\Elasticsearch\Contracts\BuilderInterface;
9+
use Olekjs\Elasticsearch\Contracts\BulkOperationInterface;
910
use Olekjs\Elasticsearch\Contracts\ClientInterface;
11+
use Olekjs\Elasticsearch\Dto\BulkResponseDto;
1012
use Olekjs\Elasticsearch\Dto\FindResponseDto;
1113
use Olekjs\Elasticsearch\Dto\PaginateResponseDto;
1214
use Olekjs\Elasticsearch\Dto\SearchResponseDto;
@@ -314,6 +316,15 @@ public function paginate(int $page = 1, int $perPage = 25): PaginateResponseDto
314316
return $this->client->paginate($this->index, $this->body, $page, $perPage);
315317
}
316318

319+
/**
320+
* @throws SearchResponseException
321+
* @throws CoreException
322+
*/
323+
public function bulk(BulkOperationInterface $bulk): BulkResponseDto
324+
{
325+
return $this->client->bulk($bulk);
326+
}
327+
317328
public function getIndex(): string
318329
{
319330
return $this->index;

src/Bulk/Bulk.php

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
namespace Olekjs\Elasticsearch\Bulk;
4+
5+
use JsonException;
6+
use LogicException;
7+
use Olekjs\Elasticsearch\Contracts\BulkOperationInterface;
8+
9+
class Bulk implements BulkOperationInterface
10+
{
11+
private const UPDATE_ACTION = 'update';
12+
13+
private array $documents;
14+
15+
/*
16+
* @throws LogicException
17+
*/
18+
public function add(string $action, string $index, string $id, ?array $data = []): self
19+
{
20+
$this->addDocument($action, $index, $id, $data);
21+
22+
return $this;
23+
}
24+
25+
/*
26+
* @throws LogicException
27+
*/
28+
public function addMany(array $documents): self
29+
{
30+
foreach ($documents as $document) {
31+
$this->validateDocument($document);
32+
$this->addDocument($document['action'], $document['index'], $document['id'], $document['data']);
33+
}
34+
35+
return $this;
36+
}
37+
38+
public function getDocuments(): array
39+
{
40+
return $this->documents;
41+
}
42+
43+
/**
44+
* @throws JsonException
45+
*/
46+
public function toRequestJson(): string
47+
{
48+
$body = [];
49+
50+
foreach ($this->getDocuments() as $document) {
51+
if (empty($document)) {
52+
continue;
53+
}
54+
55+
$body[] = json_encode($document, JSON_THROW_ON_ERROR);
56+
}
57+
58+
return join(PHP_EOL, $body) . PHP_EOL;
59+
}
60+
61+
/*
62+
* @throws LogicException
63+
*/
64+
public function validateDocument(array $document): void
65+
{
66+
if (
67+
!isset($document['action'], $document['index'], $document['id'], $document['data'])
68+
|| !is_string($document['action'])
69+
|| !(is_string($document['index']) || is_int($document['index']))
70+
|| !(is_string($document['id']) || is_int($document['id']))
71+
|| !is_array($document['data'])
72+
) {
73+
throw new LogicException('Document has incorrect structure.');
74+
}
75+
}
76+
77+
protected function addDocument(string $action, string $index, string $id, array $data): void
78+
{
79+
$this->documents[][$action] = ['_index' => $index, '_id' => $id];
80+
81+
if (empty($data)) {
82+
return;
83+
}
84+
85+
match ($action) {
86+
self::UPDATE_ACTION => $this->documents[]['doc'] = $data,
87+
default => $this->documents[] = $data,
88+
};
89+
}
90+
}

src/Client.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22

33
namespace Olekjs\Elasticsearch;
44

5+
use Illuminate\Contracts\Support\Jsonable;
6+
use Illuminate\Support\Facades\Http;
57
use Olekjs\Elasticsearch\Contracts\AbstractClient;
8+
use Olekjs\Elasticsearch\Contracts\BulkOperationInterface;
69
use Olekjs\Elasticsearch\Contracts\ClientInterface;
10+
use Olekjs\Elasticsearch\Dto\BulkResponseDto;
711
use Olekjs\Elasticsearch\Dto\IndexResponseDto;
812
use Olekjs\Elasticsearch\Dto\PaginateResponseDto;
913
use Olekjs\Elasticsearch\Dto\SearchResponseDto;
@@ -16,6 +20,7 @@
1620
use Olekjs\Elasticsearch\Exceptions\NotFoundResponseException;
1721
use Olekjs\Elasticsearch\Exceptions\SearchResponseException;
1822
use Olekjs\Elasticsearch\Exceptions\UpdateResponseException;
23+
use Olekjs\Elasticsearch\Utils\BulkResponse;
1924
use Olekjs\Elasticsearch\Utils\FindResponse;
2025
use Olekjs\Elasticsearch\Utils\IndexResponse;
2126
use Olekjs\Elasticsearch\Utils\PaginateResponse;
@@ -337,4 +342,22 @@ public function paginate(string $index, array $data = [], int $page = 1, int $pe
337342
'total_documents' => $totalDocuments
338343
]);
339344
}
345+
346+
/**
347+
* @throws SearchResponseException
348+
* @throws CoreException
349+
*/
350+
public function bulk(BulkOperationInterface $bulk): BulkResponseDto
351+
{
352+
$response = $this->getBaseClient()->withBody($bulk->toRequestJson())->post('_bulk');
353+
354+
if ($response->clientError()) {
355+
$this->throwSearchResponseException(
356+
data_get($response, 'error.reason'),
357+
$response->status(),
358+
);
359+
}
360+
361+
return BulkResponse::from($response);
362+
}
340363
}

src/Contracts/BuilderInterface.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use LogicException;
66
use Olekjs\Elasticsearch\Builder\Builder;
7+
use Olekjs\Elasticsearch\Dto\BulkResponseDto;
78
use Olekjs\Elasticsearch\Dto\FindResponseDto;
89
use Olekjs\Elasticsearch\Dto\PaginateResponseDto;
910
use Olekjs\Elasticsearch\Dto\SearchResponseDto;
@@ -92,6 +93,12 @@ public function get(): SearchResponseDto;
9293
*/
9394
public function paginate(int $page = 1, int $perPage = 25): PaginateResponseDto;
9495

96+
/**
97+
* @throws SearchResponseException
98+
* @throws CoreException
99+
*/
100+
public function bulk(BulkOperationInterface $bulk): BulkResponseDto;
101+
95102
public function getIndex(): string;
96103

97104
public function getBody(): array;
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
namespace Olekjs\Elasticsearch\Contracts;
4+
5+
use JsonException;
6+
7+
interface BulkOperationInterface
8+
{
9+
/*
10+
* @throws LogicException
11+
*/
12+
public function add(string $action, string $index, string $id, ?array $data = []): self;
13+
14+
/*
15+
* @throws LogicException
16+
*/
17+
public function addMany(array $documents): self;
18+
19+
public function getDocuments(): array;
20+
21+
/**
22+
* @throws JsonException
23+
*/
24+
public function toRequestJson(): string;
25+
26+
/*
27+
* @throws LogicException
28+
*/
29+
public function validateDocument(array $document): void;
30+
}

src/Contracts/ClientInterface.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Olekjs\Elasticsearch\Contracts;
44

5+
use Olekjs\Elasticsearch\Dto\BulkResponseDto;
56
use Olekjs\Elasticsearch\Dto\FindResponseDto;
67
use Olekjs\Elasticsearch\Dto\IndexResponseDto;
78
use Olekjs\Elasticsearch\Dto\PaginateResponseDto;
@@ -101,4 +102,10 @@ public function count(string $index, array $data = []): int;
101102
* @throws CoreException
102103
*/
103104
public function paginate(string $index, array $data = [], int $page = 1, int $perPage = 25): PaginateResponseDto;
105+
106+
/**
107+
* @throws SearchResponseException
108+
* @throws CoreException
109+
*/
110+
public function bulk(BulkOperationInterface $bulk): BulkResponseDto;
104111
}

src/Dto/BulkItemResponseDto.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
namespace Olekjs\Elasticsearch\Dto;
4+
5+
use Illuminate\Contracts\Support\Arrayable;
6+
use Olekjs\Elasticsearch\Contracts\ResponseDtoInterface;
7+
8+
class BulkItemResponseDto implements ResponseDtoInterface, Arrayable
9+
{
10+
public function __construct(
11+
private readonly string $action,
12+
private readonly IndexResponseDto $data,
13+
) {
14+
}
15+
16+
public function getAction(): string
17+
{
18+
return $this->action;
19+
}
20+
21+
public function getData(): IndexResponseDto
22+
{
23+
return $this->data;
24+
}
25+
26+
public function toArray(): array
27+
{
28+
return [
29+
'took' => $this->getAction(),
30+
'items' => $this->getData()->toArray(),
31+
];
32+
}
33+
}

src/Dto/BulkResponseDto.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace Olekjs\Elasticsearch\Dto;
4+
5+
use Illuminate\Contracts\Support\Arrayable;
6+
use Olekjs\Elasticsearch\Contracts\ResponseDtoInterface;
7+
8+
class BulkResponseDto implements ResponseDtoInterface, Arrayable
9+
{
10+
public function __construct(
11+
private readonly int $took,
12+
private readonly bool $errors,
13+
private readonly array $items,
14+
) {
15+
}
16+
17+
public function getTook(): int
18+
{
19+
return $this->took;
20+
}
21+
22+
public function isErrors(): bool
23+
{
24+
return $this->errors;
25+
}
26+
27+
/**
28+
* @return BulkItemResponseDto[]
29+
*/
30+
public function getItems(): array
31+
{
32+
return $this->items;
33+
}
34+
35+
public function toArray(): array
36+
{
37+
return [
38+
'took' => $this->getTook(),
39+
'errors' => $this->isErrors(),
40+
'items' => array_map(
41+
fn(BulkItemResponseDto $bulkItemResponseDto): array => $bulkItemResponseDto->toArray(),
42+
$this->getItems()
43+
),
44+
];
45+
}
46+
}

src/Utils/BulkResponse.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
namespace Olekjs\Elasticsearch\Utils;
4+
5+
use Illuminate\Http\Client\Response;
6+
use Olekjs\Elasticsearch\Contracts\ResponseInterface;
7+
use Olekjs\Elasticsearch\Dto\BulkItemResponseDto;
8+
use Olekjs\Elasticsearch\Dto\BulkResponseDto;
9+
use Olekjs\Elasticsearch\Dto\IndexResponseDto;
10+
use Olekjs\Elasticsearch\Dto\ShardsResponseDto;
11+
12+
class BulkResponse implements ResponseInterface
13+
{
14+
public static function from(Response $response, array $data = []): BulkResponseDto
15+
{
16+
$items = array_map(
17+
fn(array $items): array => array_map(
18+
fn(array $data, string $action): BulkItemResponseDto => new BulkItemResponseDto(
19+
action: $action,
20+
data: new IndexResponseDto(
21+
index: data_get($data, '_index'),
22+
id: data_get($data, '_id'),
23+
version: data_get($data, '_version'),
24+
result: data_get($data, 'result'),
25+
shards: new ShardsResponseDto(
26+
total: data_get($data, '_shards.total'),
27+
successful: data_get($data, '_shards.successful'),
28+
failed: data_get($data, '_shards.failed'),
29+
),
30+
sequenceNumber: data_get($data, '_seq_no'),
31+
primaryTerm: data_get($data, '_primary_term')
32+
)
33+
),
34+
$items,
35+
array_keys($items),
36+
), data_get($response, 'items')
37+
);
38+
39+
return new BulkResponseDto(
40+
took: data_get($response, 'took'),
41+
errors: data_get($response, 'errors'),
42+
items: collect($items)->flatten()->all(),
43+
);
44+
}
45+
}

0 commit comments

Comments
 (0)