Skip to content

Commit 77f547a

Browse files
authored
[Feat][Format] Add internal id column to vertex payload file (#264)
--------- Signed-off-by: acezen <qiaozi.zwb@alibaba-inc.com>
1 parent bde169c commit 77f547a

20 files changed

+114
-123
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ jobs:
3636
-P /tmp/
3737
sudo apt-get install -y /tmp/apache-arrow-apt-source-latest-"$(lsb_release --codename --short)".deb
3838
sudo apt-get update -y
39-
sudo apt install -y libarrow-dev=14.0.0-1 \
40-
libarrow-dataset-dev=14.0.0-1 \
41-
libarrow-acero-dev=14.0.0-1 \
42-
libparquet-dev=14.0.0-1
39+
sudo apt install -y libarrow-dev=14.0.1-1 \
40+
libarrow-dataset-dev=14.0.1-1 \
41+
libarrow-acero-dev=14.0.1-1 \
42+
libparquet-dev=14.0.1-1
4343
sudo apt-get install -y libboost-graph-dev ccache libcurl4-openssl-dev
4444
4545
- name: CMake

README.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500
9898
:align: center
9999
:alt: vertex physical table
100100

101+
**Note**: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the delta encoding for the internal vertex id column, which would not bring too much overhead for the storage.
102+
101103
Edges in GraphAr
102104
^^^^^^^^^^^^^^^^
103105

cpp/examples/mid_level_reader_example.cc

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ limitations under the License.
2020
#include "./config.h"
2121
#include "gar/reader/arrow_chunk_reader.h"
2222
#include "gar/util/expression.h"
23+
#include "gar/util/general_params.h"
2324

2425
void vertex_property_chunk_reader(const GAR_NAMESPACE::GraphInfo& graph_info) {
2526
// constuct reader
@@ -36,29 +37,37 @@ void vertex_property_chunk_reader(const GAR_NAMESPACE::GraphInfo& graph_info) {
3637
// use reader
3738
auto result = reader.GetChunk();
3839
ASSERT(!result.has_error());
39-
auto range = reader.GetRange().value();
4040
std::cout << "chunk number: " << reader.GetChunkNum() << std::endl;
41-
std::cout << "range of fisrt vertex property chunk: " << range.first << " "
42-
<< range.second << std::endl;
4341
auto table = result.value();
4442
std::cout << "rows number of first vertex property chunk: "
4543
<< table->num_rows() << std::endl;
4644
std::cout << "schema of first vertex property chunk: " << std::endl
4745
<< table->schema()->ToString() << std::endl;
46+
auto index_col =
47+
table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol);
48+
ASSERT(index_col != nullptr);
49+
std::cout << "Internal id column: " << index_col->ToString() << " "
50+
<< std::endl;
4851
// seek vertex id
4952
ASSERT(reader.seek(100).ok());
5053
result = reader.GetChunk();
5154
ASSERT(!result.has_error());
52-
range = reader.GetRange().value();
53-
std::cout << "range of vertex property chunk for vertex id 100: "
54-
<< range.first << " " << range.second << std::endl;
55+
table = result.value();
56+
index_col =
57+
table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol);
58+
ASSERT(index_col != nullptr);
59+
std::cout << "Internal id column of vertex property chunk for vertex id 100: "
60+
<< index_col->ToString() << " " << std::endl;
5561
// next chunk
5662
ASSERT(reader.next_chunk().ok());
5763
result = reader.GetChunk();
5864
ASSERT(!result.has_error());
59-
range = reader.GetRange().value();
60-
std::cout << "range of next vertex property chunk: " << range.first << " "
61-
<< range.second << std::endl;
65+
table = result.value();
66+
index_col =
67+
table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol);
68+
ASSERT(index_col != nullptr);
69+
std::cout << "Internal id column of next chunk: " << index_col->ToString()
70+
<< " " << std::endl;
6271

6372
// reader with filter pushdown
6473
auto filter = GAR_NAMESPACE::_Equal(GAR_NAMESPACE::_Property("gender"),

cpp/include/gar/reader/arrow_chunk_reader.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,6 @@ class VertexPropertyArrowChunkReader {
100100
*/
101101
Result<std::shared_ptr<arrow::Table>> GetChunk() noexcept;
102102

103-
/**
104-
* @brief Get the vertex id range of current chunk.
105-
*
106-
* @return Result: std::pair<begin_id, end_id> or error.
107-
*/
108-
Result<std::pair<IdType, IdType>> GetRange() noexcept;
109-
110103
/**
111104
* @brief Sets chunk position indicator to next chunk.
112105
*

cpp/include/gar/writer/arrow_chunk_writer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ class VertexPropertyWriter {
227227
ValidateLevel validate_level) const noexcept;
228228

229229
private:
230+
Result<std::shared_ptr<arrow::Table>> addIndexColumn(
231+
const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
232+
IdType chunk_size) const noexcept;
233+
230234
VertexInfo vertex_info_;
231235
std::string prefix_;
232236
std::shared_ptr<FileSystem> fs_;

cpp/src/arrow_chunk_reader.cc

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,6 @@ VertexPropertyArrowChunkReader::GetChunk() noexcept {
3636
return chunk_table_->Slice(row_offset);
3737
}
3838

39-
Result<std::pair<IdType, IdType>>
40-
VertexPropertyArrowChunkReader::GetRange() noexcept {
41-
if (chunk_table_ == nullptr) {
42-
return Status::Invalid(
43-
"The chunk table is not initialized, please call "
44-
"GetChunk() first.");
45-
}
46-
const auto chunk_size = vertex_info_.GetChunkSize();
47-
IdType row_offset = seek_id_ - chunk_index_ * chunk_size;
48-
bool is_last_chunk = (chunk_index_ == chunk_num_ - 1);
49-
const auto curr_chunk_size =
50-
is_last_chunk ? (vertex_num_ - chunk_index_ * chunk_size) : chunk_size;
51-
52-
return std::make_pair(seek_id_, seek_id_ + curr_chunk_size - row_offset);
53-
}
54-
5539
void VertexPropertyArrowChunkReader::Filter(util::Filter filter) {
5640
filter_options_.filter = filter;
5741
}

cpp/src/arrow_chunk_writer.cc

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,15 @@ Status VertexPropertyWriter::WriteChunk(
198198
GAR_RETURN_NOT_OK(
199199
validate(input_table, property_group, chunk_index, validate_level));
200200
auto file_type = property_group.GetFileType();
201-
202-
std::vector<int> indices;
203-
indices.clear();
204201
auto schema = input_table->schema();
202+
int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
203+
if (indice == -1) {
204+
return Status::Invalid("The internal id Column named ",
205+
GeneralParams::kVertexIndexCol,
206+
" does not exist in the input table.");
207+
}
208+
209+
std::vector<int> indices({indice});
205210
for (auto& property : property_group.GetProperties()) {
206211
int indice = schema->GetFieldIndex(property.name);
207212
if (indice == -1) {
@@ -251,13 +256,37 @@ Status VertexPropertyWriter::WriteTable(
251256
const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
252257
ValidateLevel validate_level) const noexcept {
253258
auto property_groups = vertex_info_.GetPropertyGroups();
259+
GAR_ASSIGN_OR_RAISE(auto table_with_index,
260+
addIndexColumn(input_table, start_chunk_index,
261+
vertex_info_.GetChunkSize()));
254262
for (auto& property_group : property_groups) {
255-
GAR_RETURN_NOT_OK(WriteTable(input_table, property_group, start_chunk_index,
256-
validate_level));
263+
GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group,
264+
start_chunk_index, validate_level));
257265
}
258266
return Status::OK();
259267
}
260268

269+
Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::addIndexColumn(
270+
const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
271+
IdType chunk_size) const noexcept {
272+
arrow::Int64Builder array_builder;
273+
RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size));
274+
int64_t length = table->num_rows();
275+
for (IdType i = 0; i < length; i++) {
276+
RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i));
277+
}
278+
std::shared_ptr<arrow::Array> array;
279+
RETURN_NOT_ARROW_OK(array_builder.Finish(&array));
280+
std::shared_ptr<arrow::ChunkedArray> chunked_array =
281+
std::make_shared<arrow::ChunkedArray>(array);
282+
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
283+
auto ret, table->AddColumn(0,
284+
arrow::field(GeneralParams::kVertexIndexCol,
285+
arrow::int64(), false),
286+
chunked_array));
287+
return ret;
288+
}
289+
261290
// implementations for EdgeChunkWriter
262291

263292
// Check if the operation of writing number or copying a file is allowed.

cpp/test/test_arrow_chunk_reader.cc

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ limitations under the License.
2020
#include "./util.h"
2121
#include "gar/reader/arrow_chunk_reader.h"
2222
#include "gar/util/expression.h"
23+
#include "gar/util/general_params.h"
2324

2425
#define CATCH_CONFIG_MAIN
2526
#include <catch2/catch.hpp>
@@ -53,37 +54,33 @@ TEST_CASE("test_vertex_property_arrow_chunk_reader") {
5354
auto reader = maybe_reader.value();
5455
auto result = reader.GetChunk();
5556
REQUIRE(!result.has_error());
56-
auto range = reader.GetRange().value();
5757
auto table = result.value();
5858
REQUIRE(table->num_rows() == 100);
59-
REQUIRE(range.first == 0);
60-
REQUIRE(range.second == 100);
59+
REQUIRE(table->GetColumnByName(
60+
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);
6161

6262
// seek
6363
REQUIRE(reader.seek(100).ok());
6464
result = reader.GetChunk();
6565
REQUIRE(!result.has_error());
66-
range = reader.GetRange().value();
6766
table = result.value();
6867
REQUIRE(table->num_rows() == 100);
69-
REQUIRE(range.first == 100);
70-
REQUIRE(range.second == 200);
68+
REQUIRE(table->GetColumnByName(
69+
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);
7170
REQUIRE(reader.next_chunk().ok());
7271
result = reader.GetChunk();
7372
REQUIRE(!result.has_error());
74-
range = reader.GetRange().value();
7573
table = result.value();
7674
REQUIRE(table->num_rows() == 100);
77-
REQUIRE(range.first == 200);
78-
REQUIRE(range.second == 300);
75+
REQUIRE(table->GetColumnByName(
76+
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);
7977
REQUIRE(reader.seek(900).ok());
8078
result = reader.GetChunk();
8179
REQUIRE(!result.has_error());
82-
range = reader.GetRange().value();
8380
table = result.value();
8481
REQUIRE(table->num_rows() == 3);
85-
REQUIRE(range.first == 900);
86-
REQUIRE(range.second == 903);
82+
REQUIRE(table->GetColumnByName(
83+
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);
8784
REQUIRE(reader.GetChunkNum() == 10);
8885
REQUIRE(reader.next_chunk().IsIndexError());
8986

@@ -119,10 +116,7 @@ TEST_CASE("test_vertex_property_pushdown") {
119116
auto result = reader.GetChunk();
120117
REQUIRE(!result.has_error());
121118
table = result.value();
122-
auto [start, end] = reader.GetRange().value();
123-
std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() << "/"
124-
<< chunk_size << ",\tRange: (" << start << ", " << end << "]"
125-
<< '\n';
119+
std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() << '\n';
126120
idx++;
127121
sum += table->num_rows();
128122
} while (!reader.next_chunk().IsIndexError());

docs/images/vertex_physical_table.png

324 KB
Loading

docs/user-guide/file-format.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500
5757
:alt: vertex physical table
5858

5959

60+
**Note**: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the delta encoding for the internal vertex id column, which would not bring too much overhead for the storage.
61+
6062
Edges in GraphAr
6163
------------------------
6264

spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ object GraphAr2Nebula {
5656
// The edge data need to convert src and dst to the vertex id,
5757
// so we need to read the vertex data with index column.
5858
val graphData =
59-
GraphReader.read(graphInfoPath, spark, addVertexIndex = true)
59+
GraphReader.read(graphInfoPath, spark)
6060
val vertexData = graphData._1
6161
val edgeData = graphData._2
6262
putVertexDataIntoNebula(graphInfo, vertexData)

spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ object GraphAr2Neo4j {
4646
val graphInfoPath: String = args(0)
4747
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)
4848

49-
// The edge data need to convert src and dst to the vertex id , so we need to read
50-
// the vertex data with index column.
51-
val graphData = GraphReader.read(graphInfoPath, spark, true)
49+
val graphData = GraphReader.read(graphInfoPath, spark)
5250
val vertexData = graphData._1
5351
val edgeData = graphData._2
5452

spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,18 @@ object GraphReader {
3535
* The map of (vertex label -> VertexInfo) for the graph.
3636
* @param spark
3737
* The Spark session for the reading.
38-
* @param addIndex
39-
* Whether to add index column for the DataFrame.
4038
* @return
4139
* The map of (vertex label -> DataFrame)
4240
*/
4341
private def readAllVertices(
4442
prefix: String,
4543
vertexInfos: Map[String, VertexInfo],
46-
spark: SparkSession,
47-
addIndex: Boolean = false
44+
spark: SparkSession
4845
): Map[String, DataFrame] = {
4946
val vertex_dataframes: Map[String, DataFrame] = vertexInfos.map {
5047
case (label, vertexInfo) => {
5148
val reader = new VertexReader(prefix, vertexInfo, spark)
52-
(label, reader.readAllVertexPropertyGroups(addIndex))
49+
(label, reader.readAllVertexPropertyGroups())
5350
}
5451
}
5552
return vertex_dataframes
@@ -109,8 +106,6 @@ object GraphReader {
109106
* The info object for the graph.
110107
* @param spark
111108
* The Spark session for the loading.
112-
* @param addVertexIndex
113-
* Whether to add index for the vertex DataFrames.
114109
* @return
115110
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
116111
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
@@ -119,8 +114,7 @@ object GraphReader {
119114
*/
120115
def readWithGraphInfo(
121116
graphInfo: GraphInfo,
122-
spark: SparkSession,
123-
addVertexIndex: Boolean = false
117+
spark: SparkSession
124118
): Pair[Map[String, DataFrame], Map[
125119
(String, String, String),
126120
Map[String, DataFrame]
@@ -129,7 +123,7 @@ object GraphReader {
129123
val vertex_infos = graphInfo.getVertexInfos()
130124
val edge_infos = graphInfo.getEdgeInfos()
131125
return (
132-
readAllVertices(prefix, vertex_infos, spark, addVertexIndex),
126+
readAllVertices(prefix, vertex_infos, spark),
133127
readAllEdges(prefix, edge_infos, spark)
134128
)
135129
}
@@ -142,8 +136,6 @@ object GraphReader {
142136
* The path of the graph info yaml.
143137
* @param spark
144138
* The Spark session for the loading.
145-
* @param addVertexIndex
146-
* Whether to add index for the vertex DataFrames.
147139
* @return
148140
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
149141
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
@@ -152,8 +144,7 @@ object GraphReader {
152144
*/
153145
def read(
154146
graphInfoPath: String,
155-
spark: SparkSession,
156-
addVertexIndex: Boolean = false
147+
spark: SparkSession
157148
): Pair[Map[String, DataFrame], Map[
158149
(String, String, String),
159150
Map[String, DataFrame]
@@ -162,6 +153,6 @@ object GraphReader {
162153
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, spark)
163154

164155
// conduct reading
165-
readWithGraphInfo(graph_info, spark, addVertexIndex)
156+
readWithGraphInfo(graph_info, spark)
166157
}
167158
}

spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ object GraphTransformer {
6666
val source_vertex_info = sourceVertexInfosMap(label)
6767
// read vertex chunks from the source graph
6868
val reader = new VertexReader(source_prefix, source_vertex_info, spark)
69-
val df = reader.readAllVertexPropertyGroups(true)
69+
val df = reader.readAllVertexPropertyGroups()
7070
// write vertex chunks for the dest graph
7171
val writer = new VertexWriter(dest_prefix, dest_vertex_info, df)
7272
writer.writeVertexProperties()

0 commit comments

Comments
 (0)