From 3b0facd3ec343b954eafc449b7e0f93c316a68e9 Mon Sep 17 00:00:00 2001 From: wraymo Date: Fri, 10 Oct 2025 13:09:16 -0400 Subject: [PATCH 1/3] add clp support for clp_get_json_string --- components/core/src/clp_s/SchemaReader.cpp | 35 ++++++++++------------ components/core/src/clp_s/SchemaReader.hpp | 22 +++++++------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index 4118919caf..8ceb4db3ac 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -63,7 +63,7 @@ SchemaReader::load(std::shared_ptr stream_buffer, size_t offset, size_t } } -void SchemaReader::generate_json_string() { +std::string SchemaReader::generate_json_string(uint64_t message_index) { m_json_serializer.reset(); m_json_serializer.begin_document(); size_t column_id_index = 0; @@ -100,14 +100,14 @@ void SchemaReader::generate_json_string() { auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); m_json_serializer.append_value( - std::to_string(std::get(column->extract_value(m_cur_message))) + std::to_string(std::get(column->extract_value(message_index))) ); break; } case JsonSerializer::Op::AddIntValue: { column = m_reordered_columns[column_id_index++]; m_json_serializer.append_value( - std::to_string(std::get(column->extract_value(m_cur_message))) + std::to_string(std::get(column->extract_value(message_index))) ); break; } @@ -116,14 +116,14 @@ void SchemaReader::generate_json_string() { auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); m_json_serializer.append_value( - std::to_string(std::get(column->extract_value(m_cur_message))) + std::to_string(std::get(column->extract_value(message_index))) ); break; } case JsonSerializer::Op::AddFloatValue: { column = m_reordered_columns[column_id_index++]; m_json_serializer.append_value( - std::to_string(std::get(column->extract_value(m_cur_message))) + std::to_string(std::get(column->extract_value(message_index))) ); break; } @@ -131,12 +131,12 @@ void SchemaReader::generate_json_string() { column = m_reordered_columns[column_id_index++]; auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); - m_json_serializer.append_value_from_column(column, m_cur_message); + m_json_serializer.append_value_from_column(column, message_index); break; } case JsonSerializer::Op::AddFormattedFloatValue: { column = m_reordered_columns[column_id_index++]; - m_json_serializer.append_value_from_column(column, m_cur_message); + m_json_serializer.append_value_from_column(column, message_index); break; } case JsonSerializer::Op::AddBoolField: { @@ -144,7 +144,7 @@ void SchemaReader::generate_json_string() { auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); m_json_serializer.append_value( - std::get(column->extract_value(m_cur_message)) != 0 ? "true" + std::get(column->extract_value(message_index)) != 0 ? "true" : "false" ); break; @@ -152,7 +152,7 @@ void SchemaReader::generate_json_string() { case JsonSerializer::Op::AddBoolValue: { column = m_reordered_columns[column_id_index++]; m_json_serializer.append_value( - std::get(column->extract_value(m_cur_message)) != 0 ? "true" + std::get(column->extract_value(message_index)) != 0 ? "true" : "false" ); break; @@ -161,12 +161,12 @@ void SchemaReader::generate_json_string() { column = m_reordered_columns[column_id_index++]; auto name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); m_json_serializer.append_key(name); - m_json_serializer.append_value_from_column_with_quotes(column, m_cur_message); + m_json_serializer.append_value_from_column_with_quotes(column, message_index); break; } case JsonSerializer::Op::AddStringValue: { column = m_reordered_columns[column_id_index++]; - m_json_serializer.append_value_from_column_with_quotes(column, m_cur_message); + m_json_serializer.append_value_from_column_with_quotes(column, message_index); break; } case JsonSerializer::Op::AddArrayField: { @@ -174,7 +174,7 @@ void SchemaReader::generate_json_string() { m_json_serializer.append_key( m_global_schema_tree->get_node(column->get_id()).get_key_name() ); - m_json_serializer.append_value_from_column(column, m_cur_message); + m_json_serializer.append_value_from_column(column, message_index); break; } case JsonSerializer::Op::AddNullField: { @@ -190,6 +190,7 @@ void SchemaReader::generate_json_string() { } m_json_serializer.end_document(); + return m_json_serializer.get_serialized_string(); } bool SchemaReader::get_next_message(std::string& message) { @@ -200,9 +201,7 @@ bool SchemaReader::get_next_message(std::string& message) { if (false == m_serializer_initialized) { initialize_serializer(); } - generate_json_string(); - - message = m_json_serializer.get_serialized_string(); + message = generate_json_string(m_cur_message); if (message.back() != '\n') { message += '\n'; @@ -223,8 +222,7 @@ bool SchemaReader::get_next_message(std::string& message, FilterClass* filter) { if (false == m_serializer_initialized) { initialize_serializer(); } - generate_json_string(); - message = m_json_serializer.get_serialized_string(); + message = generate_json_string(m_cur_message); if (message.back() != '\n') { message += '\n'; @@ -256,8 +254,7 @@ bool SchemaReader::get_next_message_with_metadata( if (false == m_serializer_initialized) { initialize_serializer(); } - generate_json_string(); - message = m_json_serializer.get_serialized_string(); + message = generate_json_string(m_cur_message); if (message.back() != '\n') { message += '\n'; diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 64c397b593..562e5d6239 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -154,6 +154,13 @@ class SchemaReader { */ uint64_t get_num_messages() const { return m_num_messages; } + /** + * Generates a json string from the extracted values + * @param message_index the index of the message to generate the json string for + * @return the generated json string + */ + std::string generate_json_string(uint64_t message_index); + /** * Gets next message * @param message @@ -197,6 +204,11 @@ class SchemaReader { */ void initialize_filter_with_column_map(FilterClass* filter); + /** + * Initializes all internal data structured required to serialize records. + */ + void initialize_serializer(); + /** * Marks a column as timestamp * @param column_reader @@ -299,16 +311,6 @@ class SchemaReader { std::vector& path_to_intersection ); - /** - * Generates a json string from the extracted values - */ - void generate_json_string(); - - /** - * Initializes all internal data structured required to serialize records. - */ - void initialize_serializer(); - int32_t m_schema_id; uint64_t m_num_messages; uint64_t m_cur_message; From 8096c933f4fff227253aae95b341684e8c7c7006 Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 15 Oct 2025 08:45:46 -0400 Subject: [PATCH 2/3] address review comments --- components/core/src/clp_s/SchemaReader.cpp | 2 +- components/core/src/clp_s/SchemaReader.hpp | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index 8ceb4db3ac..a2a4ab6dcb 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -63,7 +63,7 @@ SchemaReader::load(std::shared_ptr stream_buffer, size_t offset, size_t } } -std::string SchemaReader::generate_json_string(uint64_t message_index) { +auto SchemaReader::generate_json_string(uint64_t message_index) -> std::string { m_json_serializer.reset(); m_json_serializer.begin_document(); size_t column_id_index = 0; diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 562e5d6239..6112b8b5b4 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -155,11 +155,11 @@ class SchemaReader { uint64_t get_num_messages() const { return m_num_messages; } /** - * Generates a json string from the extracted values - * @param message_index the index of the message to generate the json string for - * @return the generated json string + * Generates a JSON string from the encoded columns + * @param message_index The index of the message to generate the JSON string for. + * @return The generated JSON string */ - std::string generate_json_string(uint64_t message_index); + [[nodiscard]] auto generate_json_string(uint64_t message_index) -> std::string; /** * Gets next message From 293a5732a6335a6a86d1cca3be35547b60140a37 Mon Sep 17 00:00:00 2001 From: wraymo Date: Thu, 16 Oct 2025 09:45:45 -0400 Subject: [PATCH 3/3] fix a typo --- components/core/src/clp_s/SchemaReader.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 6112b8b5b4..33cb75b285 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -205,7 +205,7 @@ class SchemaReader { void initialize_filter_with_column_map(FilterClass* filter); /** - * Initializes all internal data structured required to serialize records. + * Initializes all internal data structures required to serialize records. */ void initialize_serializer();