Skip to content

Commit 6fe1401

Browse files
pansggCarlosLeeGit
authored andcommitted
data_source_pase add parm session config
1 parent 45a6ac0 commit 6fe1401

File tree

17 files changed

+42
-12
lines changed

17 files changed

+42
-12
lines changed

src/drivers/common/flowunit/source_context/source_context.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ std::shared_ptr<std::string> SourceContext::GetSourceURL() {
3131
std::string uri_str;
3232
DestroyUriFunc destroy_uri_func;
3333

34-
auto ret = plugin_->Parse(session_context_, data_source_cfg_, uri_str,
35-
destroy_uri_func);
34+
auto ret = plugin_->Parse(session_context_, session_config_, data_source_cfg_,
35+
uri_str, destroy_uri_func);
3636
if (!ret) {
3737
MBLOG_ERROR << "Parse config failed, source uri is empty";
3838
return nullptr;
@@ -100,6 +100,11 @@ void SourceContext::SetSessionContext(
100100
session_context_ = session_context;
101101
}
102102

103+
void SourceContext::SetSessionConfig(
104+
const std::shared_ptr<modelbox::Configuration> &session_config) {
105+
session_config_ = session_config;
106+
}
107+
103108
int32_t SourceContext::GetRetryInterval() {
104109
return retry_context_.GetRetryInterval();
105110
}

src/drivers/common/flowunit/source_context/source_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class SourceContext {
7171
void SetDataSourceCfg(std::string data_source_cfg);
7272
void SetSessionContext(
7373
const std::shared_ptr<modelbox::SessionContext>& session_context);
74+
void SetSessionConfig(
75+
const std::shared_ptr<modelbox::Configuration>& session_config);
7476

7577
RetryStatus NeedRetry();
7678
int32_t GetRetryInterval();
@@ -79,6 +81,7 @@ class SourceContext {
7981

8082
private:
8183
std::shared_ptr<modelbox::SessionContext> session_context_;
84+
std::shared_ptr<modelbox::Configuration> session_config_;
8285
std::string data_source_cfg_;
8386
std::shared_ptr<DataSourceParserPlugin> plugin_;
8487
modelbox::Status last_status_{modelbox::STATUS_SUCCESS};

src/drivers/devices/cpu/flowunit/data_source_parser/data_source_parser_flowunit.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ modelbox::Status DataSourceParserFlowUnit::Process(
9191
<< session_ctx->GetSessionId();
9292
std::string data_source_cfg(inbuff_data, buffer->GetBytes());
9393
std::shared_ptr<std::string> uri;
94+
auto session_config = data_ctx->GetSessionConfig();
9495
std::shared_ptr<modelbox::SourceContext> source_context =
95-
Parse(session_ctx, source_type, data_source_cfg, uri);
96+
Parse(session_ctx, session_config, source_type, data_source_cfg, uri);
9697
if (source_context) {
9798
source_context->SetDataSourceCfg(data_source_cfg);
9899
} else {
@@ -111,6 +112,7 @@ modelbox::Status DataSourceParserFlowUnit::Process(
111112

112113
std::shared_ptr<modelbox::SourceContext> DataSourceParserFlowUnit::Parse(
113114
const std::shared_ptr<modelbox::SessionContext> &session_context,
115+
const std::shared_ptr<modelbox::Configuration> &session_config,
114116
const std::string &source_type, const std::string &data_source_cfg,
115117
std::shared_ptr<std::string> &uri) {
116118
auto plugin = GetPlugin(source_type);
@@ -123,9 +125,8 @@ std::shared_ptr<modelbox::SourceContext> DataSourceParserFlowUnit::Parse(
123125
std::string uri_str;
124126
modelbox::DestroyUriFunc destroy_uri_func;
125127
std::string stream_type;
126-
127-
auto ret = plugin->Parse(session_context, data_source_cfg, uri_str,
128-
destroy_uri_func);
128+
auto ret = plugin->Parse(session_context, session_config, data_source_cfg,
129+
uri_str, destroy_uri_func);
129130
if (!ret) {
130131
MBLOG_ERROR << "Parse config failed, source uri is empty";
131132
}
@@ -138,6 +139,7 @@ std::shared_ptr<modelbox::SourceContext> DataSourceParserFlowUnit::Parse(
138139
plugin->GetStreamType(data_source_cfg, stream_type);
139140
source_context->SetStreamType(stream_type);
140141
source_context->SetSessionContext(session_context);
142+
source_context->SetSessionConfig(session_config);
141143

142144
uri = std::shared_ptr<std::string>(new std::string(uri_str),
143145
[destroy_uri_func](std::string *ptr) {

src/drivers/devices/cpu/flowunit/data_source_parser/data_source_parser_flowunit.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class DataSourceParserFlowUnit : public modelbox::FlowUnit {
7070
private:
7171
std::shared_ptr<modelbox::SourceContext> Parse(
7272
const std::shared_ptr<modelbox::SessionContext> &session_context,
73+
const std::shared_ptr<modelbox::Configuration> &session_config,
7374
const std::string &source_type, const std::string &data_source_cfg,
7475
std::shared_ptr<std::string> &uri);
7576

src/drivers/devices/cpu/flowunit/data_source_parser/parser_plugin/obs_source_parser/obs_source_parser.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ modelbox::Status ObsSourceParser::Init(
5555
retry_interval_ = opts->GetInt32("obs_retry_interval_ms", retry_interval_);
5656
retry_max_times_ = opts->GetInt32("obs_retry_count_limit", retry_max_times_);
5757
read_type_ = opts->GetString("obs_download_method", "file");
58+
59+
MBLOG_INFO << "obs source parser config retry_enabled:" << retry_enabled_
60+
<< " retry_interval:" << retry_interval_
61+
<< " retry_max_times:" << retry_max_times_;
62+
5863
if (read_type_ != "stream") {
5964
return modelbox::STATUS_OK;
6065
}
@@ -67,17 +72,14 @@ modelbox::Status ObsSourceParser::Init(
6772
max_read_size_ = OBS_STREAM_READ_SIZE_HIGH;
6873
}
6974

70-
MBLOG_INFO << "obs source parser config retry_enabled:" << retry_enabled_
71-
<< " retry_interval:" << retry_interval_
72-
<< " retry_max_times:" << retry_max_times_;
73-
7475
return modelbox::STATUS_OK;
7576
}
7677

7778
modelbox::Status ObsSourceParser::Deinit() { return modelbox::STATUS_OK; }
7879

7980
modelbox::Status ObsSourceParser::Parse(
8081
const std::shared_ptr<modelbox::SessionContext> &session_context,
82+
const std::shared_ptr<modelbox::Configuration> &session_config,
8183
const std::string &config, std::string &uri,
8284
modelbox::DestroyUriFunc &destroy_uri_func) {
8385
OBSDownloadInfo download_info;

src/drivers/devices/cpu/flowunit/data_source_parser/parser_plugin/obs_source_parser/obs_source_parser.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class ObsSourceParser : public modelbox::DataSourceParserPlugin {
5555

5656
modelbox::Status Parse(
5757
const std::shared_ptr<modelbox::SessionContext> &session_context,
58+
const std::shared_ptr<modelbox::Configuration> &session_config,
5859
const std::string &config, std::string &uri,
5960
modelbox::DestroyUriFunc &destroy_uri_func) override;
6061
modelbox::Status GetStreamType(const std::string &config,

src/drivers/devices/cpu/flowunit/data_source_parser/parser_plugin/restful_source_parser/restful_source_parser.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,19 @@ modelbox::Status RestfulSourceParser::Init(
3535
opts->GetInt32("restful_retry_interval_ms", retry_interval_);
3636
retry_max_times_ =
3737
opts->GetInt32("restful_retry_count_limit", retry_max_times_);
38+
39+
MBLOG_INFO << "restful source parser config retry_enabled:" << retry_enabled_
40+
<< " retry_interval:" << retry_interval_
41+
<< " retry_max_times:" << retry_max_times_;
42+
3843
return modelbox::STATUS_OK;
3944
}
4045

4146
modelbox::Status RestfulSourceParser::Deinit() { return modelbox::STATUS_OK; }
4247

4348
modelbox::Status RestfulSourceParser::Parse(
4449
const std::shared_ptr<modelbox::SessionContext> &session_context,
50+
const std::shared_ptr<modelbox::Configuration> &session_config,
4551
const std::string &config, std::string &uri,
4652
modelbox::DestroyUriFunc &destroy_uri_func) {
4753
RestfulInputInfo input_info;

src/drivers/devices/cpu/flowunit/data_source_parser/parser_plugin/restful_source_parser/restful_source_parser.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class RestfulSourceParser : public modelbox::DataSourceParserPlugin {
4848

4949
modelbox::Status Parse(
5050
const std::shared_ptr<modelbox::SessionContext> &session_context,
51+
const std::shared_ptr<modelbox::Configuration> &session_config,
5152
const std::string &config, std::string &uri,
5253
modelbox::DestroyUriFunc &destroy_uri_func) override;
5354

src/drivers/devices/cpu/flowunit/data_source_parser/parser_plugin/url_source_parser/url_source_parser.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ modelbox::Status UrlSourceParser::GetStreamType(const std::string &config,
8888

8989
modelbox::Status UrlSourceParser::Parse(
9090
const std::shared_ptr<modelbox::SessionContext> &session_context,
91+
const std::shared_ptr<modelbox::Configuration> &session_config,
9192
const std::string &config, std::string &uri,
9293
modelbox::DestroyUriFunc &destroy_uri_func) {
9394
nlohmann::json json;

src/drivers/devices/cpu/flowunit/data_source_parser/parser_plugin/url_source_parser/url_source_parser.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class UrlSourceParser : public modelbox::DataSourceParserPlugin {
3737

3838
modelbox::Status Parse(
3939
const std::shared_ptr<modelbox::SessionContext> &session_context,
40+
const std::shared_ptr<modelbox::Configuration> &session_config,
4041
const std::string &config, std::string &uri,
4142
modelbox::DestroyUriFunc &destroy_uri_func) override;
4243
modelbox::Status GetStreamType(const std::string &config,

0 commit comments

Comments
 (0)