Skip to content

Commit 2796a5c

Browse files
committed
Add log support to OTelExportSinkIR
Signed-off-by: Dom Del Nano <ddelnano@gmail.com> (cherry picked from commit 9d5e616)
1 parent 301198f commit 2796a5c

File tree

3 files changed

+210
-0
lines changed

3 files changed

+210
-0
lines changed

src/carnot/planner/ir/otel_export_sink_ir.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
#include <utility>
2020

21+
#include <google/protobuf/descriptor.h>
22+
2123
#include "src/carnot/planner/ir/ir.h"
2224
#include "src/carnot/planner/ir/otel_export_sink_ir.h"
2325
#include "src/carnot/planpb/plan.pb.h"
@@ -160,6 +162,29 @@ Status OTelExportSinkIR::ProcessConfig(const OTelData& data) {
160162
new_span.span_kind = span.span_kind;
161163
data_.spans.push_back(std::move(new_span));
162164
}
165+
for (const auto& log : data.logs) {
166+
OTelLog new_log;
167+
168+
PX_ASSIGN_OR_RETURN(new_log.time_column, AddColumn(log.time_column));
169+
PX_ASSIGN_OR_RETURN(new_log.body_column, AddColumn(log.body_column));
170+
if (log.observed_time_column != nullptr) {
171+
PX_ASSIGN_OR_RETURN(new_log.observed_time_column, AddColumn(log.observed_time_column));
172+
}
173+
174+
new_log.severity_text = log.severity_text;
175+
new_log.severity_number = log.severity_number;
176+
177+
for (const auto& attr : log.attributes) {
178+
if (attr.column_reference == nullptr) {
179+
new_log.attributes.push_back({attr.name, nullptr, attr.string_value});
180+
continue;
181+
}
182+
PX_ASSIGN_OR_RETURN(auto column, AddColumn(attr.column_reference));
183+
new_log.attributes.push_back({attr.name, column, ""});
184+
}
185+
186+
data_.logs.push_back(std::move(new_log));
187+
}
163188
return Status::OK();
164189
}
165190

@@ -330,6 +355,49 @@ Status OTelExportSinkIR::ToProto(planpb::Operator* op) const {
330355
}
331356
span_pb->set_kind_value(span.span_kind);
332357
}
358+
for (const auto& log : data_.logs) {
359+
auto log_pb = otel_op->add_logs();
360+
361+
if (log.time_column->EvaluatedDataType() != types::TIME64NS) {
362+
return log.time_column->CreateIRNodeError(
363+
"Expected time column '$0' to be TIME64NS, received $1", log.time_column->col_name(),
364+
types::ToString(log.time_column->EvaluatedDataType()));
365+
}
366+
PX_ASSIGN_OR_RETURN(auto time_column_index, log.time_column->GetColumnIndex());
367+
log_pb->set_time_column_index(time_column_index);
368+
369+
if (log.observed_time_column != nullptr) {
370+
if (log.observed_time_column->EvaluatedDataType() != types::TIME64NS) {
371+
return log.observed_time_column->CreateIRNodeError(
372+
"Expected observed_time column '$0' to be TIME64NS, received $1",
373+
log.observed_time_column->col_name(),
374+
types::ToString(log.observed_time_column->EvaluatedDataType()));
375+
}
376+
PX_ASSIGN_OR_RETURN(auto observed_time_column_index,
377+
log.observed_time_column->GetColumnIndex());
378+
log_pb->set_observed_time_column_index(observed_time_column_index);
379+
} else {
380+
log_pb->set_observed_time_column_index(-1);
381+
}
382+
383+
log_pb->set_severity_text(log.severity_text);
384+
385+
// TODO(ddelnano): Add validation for severity_number if the planner isn't the right
386+
// place to implement the validation.
387+
log_pb->set_severity_number(log.severity_number);
388+
389+
if (log.body_column->EvaluatedDataType() != types::STRING) {
390+
return log.body_column->CreateIRNodeError(
391+
"Expected body column '$0' to be STRING, received $1", log.body_column->col_name(),
392+
types::ToString(log.body_column->EvaluatedDataType()));
393+
}
394+
PX_ASSIGN_OR_RETURN(auto body_column_index, log.body_column->GetColumnIndex());
395+
log_pb->set_body_column_index(body_column_index);
396+
397+
for (const auto& attribute : log.attributes) {
398+
PX_RETURN_IF_ERROR(attribute.ToProto(log_pb->add_attributes()));
399+
}
400+
}
333401
return Status::OK();
334402
}
335403

src/carnot/planner/ir/otel_export_sink_ir.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,23 @@ struct OTelSpan {
127127
int64_t span_kind;
128128
};
129129

130+
struct OTelLog {
131+
std::vector<OTelAttribute> attributes;
132+
133+
ColumnIR* time_column;
134+
ColumnIR* observed_time_column = nullptr;
135+
ColumnIR* body_column;
136+
137+
int64_t severity_number;
138+
std::string severity_text;
139+
};
140+
130141
struct OTelData {
131142
planpb::OTelEndpointConfig endpoint_config;
132143
std::vector<OTelAttribute> resource_attributes;
133144
std::vector<OTelMetric> metrics;
134145
std::vector<OTelSpan> spans;
146+
std::vector<OTelLog> logs;
135147
};
136148

137149
/**

src/carnot/planner/ir/otel_export_sink_ir_test.cc

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,84 @@ INSTANTIATE_TEST_SUITE_P(
443443
.ConsumeValueOrDie();
444444
},
445445
},
446+
{
447+
"logs_basic",
448+
table_store::schema::Relation{{types::TIME64NS, types::STRING, types::STRING},
449+
{"start_time", "attribute_str", "log_message"},
450+
{types::ST_NONE, types::ST_NONE, types::ST_NONE}},
451+
R"pb(
452+
endpoint_config {}
453+
resource {}
454+
logs {
455+
attributes {
456+
name: "service.name"
457+
column {
458+
column_type: STRING
459+
column_index: 1
460+
}
461+
}
462+
time_column_index: 0
463+
observed_time_column_index: -1
464+
severity_number: 4
465+
severity_text: "INFO"
466+
body_column_index: 2
467+
}
468+
)pb",
469+
[](IR* graph, OperatorIR* parent, table_store::schema::Relation* relation) {
470+
OTelData data;
471+
472+
auto& log = data.logs.emplace_back();
473+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
474+
log.attributes.push_back(
475+
{"service.name", CreateTypedColumn(graph, "attribute_str", relation), ""});
476+
log.severity_number = 4;
477+
log.severity_text = "INFO";
478+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
479+
480+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data)
481+
.ConsumeValueOrDie();
482+
},
483+
},
484+
{
485+
"logs_with_observed_time_col",
486+
table_store::schema::Relation{
487+
{types::TIME64NS, types::TIME64NS, types::STRING, types::STRING},
488+
{"start_time", "observed_time", "attribute_str", "log_message"},
489+
{types::ST_NONE, types::ST_NONE, types::ST_NONE, types::ST_NONE}},
490+
R"pb(
491+
endpoint_config {}
492+
resource {}
493+
logs {
494+
attributes {
495+
name: "service.name"
496+
column {
497+
column_type: STRING
498+
column_index: 2
499+
}
500+
}
501+
time_column_index: 0
502+
observed_time_column_index: 1
503+
severity_number: 4
504+
severity_text: "INFO"
505+
body_column_index: 3
506+
}
507+
)pb",
508+
[](IR* graph, OperatorIR* parent, table_store::schema::Relation* relation) {
509+
OTelData data;
510+
511+
auto& log = data.logs.emplace_back();
512+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
513+
log.observed_time_column = CreateTypedColumn(graph, "observed_time", relation);
514+
log.attributes.push_back(
515+
{"service.name", CreateTypedColumn(graph, "attribute_str", relation), ""});
516+
log.severity_number = 4;
517+
log.severity_text = "INFO";
518+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
519+
520+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data)
521+
.ConsumeValueOrDie();
522+
},
523+
},
446524
{
447525
"string_value_attributes",
448526
table_store::schema::Relation{{types::TIME64NS, types::INT64},
@@ -557,6 +635,33 @@ OTelExportSinkIR* CreateSpanWithNameString(IR* graph, OperatorIR* parent,
557635
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data).ConsumeValueOrDie();
558636
}
559637

638+
OTelExportSinkIR* CreateLog(IR* graph, OperatorIR* parent,
639+
table_store::schema::Relation* relation) {
640+
OTelData data;
641+
642+
auto& log = data.logs.emplace_back();
643+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
644+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
645+
log.severity_number = 4;
646+
log.severity_text = "INFO";
647+
648+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data).ConsumeValueOrDie();
649+
}
650+
651+
OTelExportSinkIR* CreateLogWithObservedTime(IR* graph, OperatorIR* parent,
652+
table_store::schema::Relation* relation) {
653+
OTelData data;
654+
655+
auto& log = data.logs.emplace_back();
656+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
657+
log.observed_time_column = CreateTypedColumn(graph, "observed_time", relation);
658+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
659+
log.severity_number = 4;
660+
log.severity_text = "INFO";
661+
662+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data).ConsumeValueOrDie();
663+
}
664+
560665
INSTANTIATE_TEST_SUITE_P(
561666
ErrorTests, WrongColumnTypesTest,
562667
::testing::ValuesIn(std::vector<WrongColumnTypesTestCase>{
@@ -723,6 +828,31 @@ INSTANTIATE_TEST_SUITE_P(
723828
.ConsumeValueOrDie();
724829
},
725830
},
831+
{
832+
"log_time_column_wrong",
833+
table_store::schema::Relation{{types::INT64, types::STRING, types::STRING},
834+
{"start_time", "attribute_str", "log_message"},
835+
{types::ST_NONE, types::ST_NONE, types::ST_NONE}},
836+
"Expected time column 'start_time' to be TIME64NS, received INT64",
837+
&CreateLog,
838+
},
839+
{
840+
"log_body_column_wrong",
841+
table_store::schema::Relation{{types::TIME64NS, types::STRING, types::TIME64NS},
842+
{"start_time", "attribute_str", "log_message"},
843+
{types::ST_NONE, types::ST_NONE, types::ST_NONE}},
844+
"Expected body column 'log_message' to be STRING, received TIME64NS",
845+
&CreateLog,
846+
},
847+
{
848+
"log_observed_time_column_wrong",
849+
table_store::schema::Relation{
850+
{types::TIME64NS, types::INT64, types::STRING, types::STRING},
851+
{"start_time", "observed_time", "attribute_str", "log_message"},
852+
{types::ST_NONE, types::ST_NONE, types::ST_NONE, types::ST_NONE}},
853+
"Expected observed_time column 'observed_time' to be TIME64NS, received INT64",
854+
&CreateLogWithObservedTime,
855+
},
726856
}),
727857
[](const ::testing::TestParamInfo<WrongColumnTypesTestCase>& info) { return info.param.name; });
728858
} // namespace planner

0 commit comments

Comments
 (0)