Skip to content

Commit 795ae38

Browse files
committed
tmp changes
1 parent a3a2bc8 commit 795ae38

File tree

5 files changed

+16
-8
lines changed

5 files changed

+16
-8
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,9 @@ class TJsonFilter::TImpl {
225225
const TString& whereFilter,
226226
TCallback callback)
227227
: Sql(GenerateSql(columns, types, whereFilter)) {
228-
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());
228+
229+
230+
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions().SetUDFsDir("/home/kardymon-d/ydb3/ydb/ydb/library/yql/udfs/common"));
229231

230232
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
231233
Program = factory->MakePushStreamProgram(

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ class TJsonParser::TImpl {
232232
const TVector<TString>& columns,
233233
TCallback callback)
234234
: Sql(GenerateSql(columns)) {
235-
auto options = NYql::NPureCalc::TProgramFactoryOptions();
235+
auto options = NYql::NPureCalc::TProgramFactoryOptions().SetUDFsDir("/home/kardymon-d/ydb3/ydb/ydb/library/yql/udfs/common");
236236
auto factory = NYql::NPureCalc::MakeProgramFactory(options);
237237

238238
LOG_ROW_DISPATCHER_DEBUG("Creating program...");

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,9 @@ void TTopicSession::SendToParsing(ui64 offset, const TString& message) {
549549
}
550550
}
551551

552+
if (ClientsWithoutPredicate.size() == Clients.size())
553+
return;
554+
552555
try {
553556
Parser->Push(offset, message);
554557
} catch (const std::exception& e) {
@@ -617,7 +620,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
617620
std::forward_as_tuple(ev->Sender),
618621
std::forward_as_tuple(ev)).first->second;
619622

620-
TString predicate = clientInfo.Settings.GetSource().GetPredicate();
623+
TString predicate;// = clientInfo.Settings.GetSource().GetPredicate();
621624
if (!predicate.empty()) {
622625
clientInfo.Filter = NewJsonFilter(
623626
columns,
@@ -630,6 +633,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
630633
ClientsWithoutPredicate.insert(ev->Sender);
631634
}
632635

636+
Cerr << "New client" << Endl;
637+
633638
LOG_ROW_DISPATCHER_INFO("New client: offset " << clientInfo.NextMessageOffset << ", predicate: " << clientInfo.Settings.GetSource().GetPredicate());
634639

635640
if (ReadSession) {

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1386,6 +1386,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13861386
.OutputIndex = outputIndex,
13871387
.StatsLevel = collectStatsLevel,
13881388
.TxId = TxId,
1389+
.TaskId = Task.GetId(),
13891390
.Callback = static_cast<TSinkCallbacks*>(this),
13901391
.SecureParams = secureParams,
13911392
.TaskParams = taskParams,

ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
152152
, LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ")
153153
, FreeSpace(freeSpace)
154154
, TopicClient(Driver, GetTopicClientSettings())
155-
, File("data/result/" + std::get<TString>(TxId), EOpenModeFlag::CreateAlways | EOpenModeFlag::WrOnly)
155+
, File("data/result/" + std::get<TString>(TxId) + "_" + ToString(taskId), EOpenModeFlag::CreateAlways | EOpenModeFlag::WrOnly)
156156
{
157157
EgressStats.Level = statsLevel;
158158
}
@@ -317,10 +317,10 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
317317
}
318318

319319
void CreateSessionIfNotExists() {
320-
if (!WriteSession) {
321-
WriteSession = TopicClient.CreateWriteSession(GetWriteSessionSettings());
322-
SubscribeOnNextEvent();
323-
}
320+
// if (!WriteSession) {
321+
// WriteSession = TopicClient.CreateWriteSession(GetWriteSessionSettings());
322+
// SubscribeOnNextEvent();
323+
// }
324324
}
325325

326326
void SubscribeOnNextEvent() {

0 commit comments

Comments
 (0)