Skip to content

Commit f94d14a

Browse files
committed
Add log support to OTelExportSinkIR
Signed-off-by: Dom Del Nano <ddelnano@gmail.com> (cherry picked from commit 9d5e616)
1 parent 301198f commit f94d14a

File tree

3 files changed

+214
-0
lines changed

3 files changed

+214
-0
lines changed

src/carnot/planner/ir/otel_export_sink_ir.cc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
#include <utility>
2020

21+
#include <google/protobuf/descriptor.h>
22+
2123
#include "src/carnot/planner/ir/ir.h"
2224
#include "src/carnot/planner/ir/otel_export_sink_ir.h"
2325
#include "src/carnot/planpb/plan.pb.h"
@@ -160,6 +162,29 @@ Status OTelExportSinkIR::ProcessConfig(const OTelData& data) {
160162
new_span.span_kind = span.span_kind;
161163
data_.spans.push_back(std::move(new_span));
162164
}
165+
for (const auto& log : data.logs) {
166+
OTelLog new_log;
167+
168+
PX_ASSIGN_OR_RETURN(new_log.time_column, AddColumn(log.time_column));
169+
PX_ASSIGN_OR_RETURN(new_log.body_column, AddColumn(log.body_column));
170+
if (log.observed_time_column != nullptr) {
171+
PX_ASSIGN_OR_RETURN(new_log.observed_time_column, AddColumn(log.observed_time_column));
172+
}
173+
174+
new_log.severity_text = log.severity_text;
175+
new_log.severity_number = log.severity_number;
176+
177+
for (const auto& attr : log.attributes) {
178+
if (attr.column_reference == nullptr) {
179+
new_log.attributes.push_back({attr.name, nullptr, attr.string_value});
180+
continue;
181+
}
182+
PX_ASSIGN_OR_RETURN(auto column, AddColumn(attr.column_reference));
183+
new_log.attributes.push_back({attr.name, column, ""});
184+
}
185+
186+
data_.logs.push_back(std::move(new_log));
187+
}
163188
return Status::OK();
164189
}
165190

@@ -330,6 +355,50 @@ Status OTelExportSinkIR::ToProto(planpb::Operator* op) const {
330355
}
331356
span_pb->set_kind_value(span.span_kind);
332357
}
358+
for (const auto& log : data_.logs) {
359+
auto log_pb = otel_op->add_logs();
360+
361+
if (log.time_column->EvaluatedDataType() != types::TIME64NS) {
362+
return log.time_column->CreateIRNodeError(
363+
"Expected time column '$0' to be TIME64NS, received $1", log.time_column->col_name(),
364+
types::ToString(log.time_column->EvaluatedDataType()));
365+
}
366+
PX_ASSIGN_OR_RETURN(auto time_column_index,
367+
log.time_column->GetColumnIndex());
368+
log_pb->set_time_column_index(time_column_index);
369+
370+
if (log.observed_time_column != nullptr) {
371+
if (log.observed_time_column->EvaluatedDataType() != types::TIME64NS) {
372+
return log.observed_time_column->CreateIRNodeError(
373+
"Expected observed_time column '$0' to be TIME64NS, received $1", log.observed_time_column->col_name(),
374+
types::ToString(log.observed_time_column->EvaluatedDataType()));
375+
}
376+
PX_ASSIGN_OR_RETURN(auto observed_time_column_index,
377+
log.observed_time_column->GetColumnIndex());
378+
log_pb->set_observed_time_column_index(observed_time_column_index);
379+
} else {
380+
log_pb->set_observed_time_column_index(-1);
381+
}
382+
383+
log_pb->set_severity_text(log.severity_text);
384+
385+
// TODO(ddelnano): Add validation for severity_number if the planner isn't the right
386+
// place to implement the validation.
387+
log_pb->set_severity_number(log.severity_number);
388+
389+
if (log.body_column->EvaluatedDataType() != types::STRING) {
390+
return log.body_column->CreateIRNodeError(
391+
"Expected body column '$0' to be STRING, received $1", log.body_column->col_name(),
392+
types::ToString(log.body_column->EvaluatedDataType()));
393+
}
394+
PX_ASSIGN_OR_RETURN(auto body_column_index,
395+
log.body_column->GetColumnIndex());
396+
log_pb->set_body_column_index(body_column_index);
397+
398+
for (const auto& attribute : log.attributes) {
399+
PX_RETURN_IF_ERROR(attribute.ToProto(log_pb->add_attributes()));
400+
}
401+
}
333402
return Status::OK();
334403
}
335404

src/carnot/planner/ir/otel_export_sink_ir.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,23 @@ struct OTelSpan {
127127
int64_t span_kind;
128128
};
129129

130+
struct OTelLog {
131+
std::vector<OTelAttribute> attributes;
132+
133+
ColumnIR* time_column;
134+
ColumnIR* observed_time_column = nullptr;
135+
ColumnIR* body_column;
136+
137+
int64_t severity_number;
138+
std::string severity_text;
139+
};
140+
130141
struct OTelData {
131142
planpb::OTelEndpointConfig endpoint_config;
132143
std::vector<OTelAttribute> resource_attributes;
133144
std::vector<OTelMetric> metrics;
134145
std::vector<OTelSpan> spans;
146+
std::vector<OTelLog> logs;
135147
};
136148

137149
/**

src/carnot/planner/ir/otel_export_sink_ir_test.cc

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,85 @@ INSTANTIATE_TEST_SUITE_P(
443443
.ConsumeValueOrDie();
444444
},
445445
},
446+
{
447+
"logs_basic",
448+
table_store::schema::Relation{
449+
{types::TIME64NS, types::STRING, types::STRING},
450+
{"start_time", "attribute_str", "log_message"},
451+
{types::ST_NONE, types::ST_NONE, types::ST_NONE}},
452+
R"pb(
453+
endpoint_config {}
454+
resource {}
455+
logs {
456+
attributes {
457+
name: "service.name"
458+
column {
459+
column_type: STRING
460+
column_index: 1
461+
}
462+
}
463+
time_column_index: 0
464+
observed_time_column_index: -1
465+
severity_number: 4
466+
severity_text: "INFO"
467+
body_column_index: 2
468+
}
469+
)pb",
470+
[](IR* graph, OperatorIR* parent, table_store::schema::Relation* relation) {
471+
OTelData data;
472+
473+
auto& log = data.logs.emplace_back();
474+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
475+
log.attributes.push_back(
476+
{"service.name", CreateTypedColumn(graph, "attribute_str", relation), ""});
477+
log.severity_number = 4;
478+
log.severity_text = "INFO";
479+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
480+
481+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data)
482+
.ConsumeValueOrDie();
483+
},
484+
},
485+
{
486+
"logs_with_observed_time_col",
487+
table_store::schema::Relation{
488+
{types::TIME64NS, types::TIME64NS, types::STRING, types::STRING},
489+
{"start_time", "observed_time", "attribute_str", "log_message"},
490+
{types::ST_NONE, types::ST_NONE, types::ST_NONE, types::ST_NONE}},
491+
R"pb(
492+
endpoint_config {}
493+
resource {}
494+
logs {
495+
attributes {
496+
name: "service.name"
497+
column {
498+
column_type: STRING
499+
column_index: 2
500+
}
501+
}
502+
time_column_index: 0
503+
observed_time_column_index: 1
504+
severity_number: 4
505+
severity_text: "INFO"
506+
body_column_index: 3
507+
}
508+
)pb",
509+
[](IR* graph, OperatorIR* parent, table_store::schema::Relation* relation) {
510+
OTelData data;
511+
512+
auto& log = data.logs.emplace_back();
513+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
514+
log.observed_time_column = CreateTypedColumn(graph, "observed_time", relation);
515+
log.attributes.push_back(
516+
{"service.name", CreateTypedColumn(graph, "attribute_str", relation), ""});
517+
log.severity_number = 4;
518+
log.severity_text = "INFO";
519+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
520+
521+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data)
522+
.ConsumeValueOrDie();
523+
},
524+
},
446525
{
447526
"string_value_attributes",
448527
table_store::schema::Relation{{types::TIME64NS, types::INT64},
@@ -557,6 +636,33 @@ OTelExportSinkIR* CreateSpanWithNameString(IR* graph, OperatorIR* parent,
557636
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data).ConsumeValueOrDie();
558637
}
559638

639+
OTelExportSinkIR* CreateLog(IR* graph, OperatorIR* parent,
640+
table_store::schema::Relation* relation) {
641+
OTelData data;
642+
643+
auto& log = data.logs.emplace_back();
644+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
645+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
646+
log.severity_number = 4;
647+
log.severity_text = "INFO";
648+
649+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data).ConsumeValueOrDie();
650+
}
651+
652+
OTelExportSinkIR* CreateLogWithObservedTime(IR* graph, OperatorIR* parent,
653+
table_store::schema::Relation* relation) {
654+
OTelData data;
655+
656+
auto& log = data.logs.emplace_back();
657+
log.time_column = CreateTypedColumn(graph, "start_time", relation);
658+
log.observed_time_column = CreateTypedColumn(graph, "observed_time", relation);
659+
log.body_column = CreateTypedColumn(graph, "log_message", relation);
660+
log.severity_number = 4;
661+
log.severity_text = "INFO";
662+
663+
return graph->CreateNode<OTelExportSinkIR>(parent->ast(), parent, data).ConsumeValueOrDie();
664+
}
665+
560666
INSTANTIATE_TEST_SUITE_P(
561667
ErrorTests, WrongColumnTypesTest,
562668
::testing::ValuesIn(std::vector<WrongColumnTypesTestCase>{
@@ -723,6 +829,33 @@ INSTANTIATE_TEST_SUITE_P(
723829
.ConsumeValueOrDie();
724830
},
725831
},
832+
{
833+
"log_time_column_wrong",
834+
table_store::schema::Relation{
835+
{types::INT64, types::STRING, types::STRING},
836+
{"start_time", "attribute_str", "log_message"},
837+
{types::ST_NONE, types::ST_NONE, types::ST_NONE}},
838+
"Expected time column 'start_time' to be TIME64NS, received INT64",
839+
&CreateLog,
840+
},
841+
{
842+
"log_body_column_wrong",
843+
table_store::schema::Relation{
844+
{types::TIME64NS, types::STRING, types::TIME64NS},
845+
{"start_time", "attribute_str", "log_message"},
846+
{types::ST_NONE, types::ST_NONE, types::ST_NONE}},
847+
"Expected body column 'log_message' to be STRING, received TIME64NS",
848+
&CreateLog,
849+
},
850+
{
851+
"log_observed_time_column_wrong",
852+
table_store::schema::Relation{
853+
{types::TIME64NS, types::INT64, types::STRING, types::STRING},
854+
{"start_time", "observed_time", "attribute_str", "log_message"},
855+
{types::ST_NONE, types::ST_NONE, types::ST_NONE, types::ST_NONE}},
856+
"Expected observed_time column 'observed_time' to be TIME64NS, received INT64",
857+
&CreateLogWithObservedTime,
858+
},
726859
}),
727860
[](const ::testing::TestParamInfo<WrongColumnTypesTestCase>& info) { return info.param.name; });
728861
} // namespace planner

0 commit comments

Comments
 (0)