35
35
#include " ../jrd/tra_proto.h"
36
36
37
37
#include < atomic>
38
+ #include < mutex>
39
+ #include < vector>
40
+ #include " boost/interprocess/sync/interprocess_mutex.hpp"
38
41
39
42
#ifdef WIN_NT
40
43
#include < process.h>
@@ -100,11 +103,13 @@ namespace
100
103
event_t clientEvent;
101
104
USHORT bufferSize;
102
105
std::atomic<Tag> tag;
106
+ unsigned seq;
107
+ boost::interprocess::interprocess_mutex bufferMutex;
103
108
char userName[USERNAME_LENGTH + 1 ]; // \0 if has PROFILE_ANY_ATTACHMENT
104
109
alignas (FB_ALIGNMENT) UCHAR buffer[4096 ];
105
110
};
106
111
107
- static const USHORT VERSION = 2 ;
112
+ static const USHORT VERSION = 3 ;
108
113
109
114
public:
110
115
ProfilerIpc (thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server = false );
@@ -179,7 +184,7 @@ class Jrd::ProfilerListener final
179
184
listener->watcherThread ();
180
185
}
181
186
182
- void processCommand (thread_db* tdbb);
187
+ void processCommand (thread_db* tdbb, ProfilerIpc::Tag tag, std::vector<UCHAR>& buffer );
183
188
184
189
private:
185
190
Attachment* const attachment;
@@ -736,6 +741,8 @@ ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmen
736
741
{
737
742
Guard guard (this );
738
743
744
+ header->seq = 0 ;
745
+
739
746
if (sharedMemory->eventInit (&header->serverEvent ) != FB_SUCCESS)
740
747
(Arg::Gds (isc_random) << " ProfilerIpc eventInit(serverEvent) failed" ).raise ();
741
748
}
@@ -817,18 +824,17 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
817
824
}
818
825
});
819
826
820
- const SLONG value = sharedMemory->eventClear (&header->clientEvent );
827
+ const SLONG clientEventCounter = sharedMemory->eventClear (&header->clientEvent );
828
+
829
+ std::unique_lock bufferMutexLock (header->bufferMutex );
821
830
822
- const Tag oldTag = header->tag .exchange (tag);
823
- switch (oldTag)
831
+ switch (header->tag )
824
832
{
825
833
case Tag::NOP:
826
- header->tag = oldTag;
827
834
(Arg::Gds (isc_random) << " Remote attachment failed to start listener thread" ).raise ();
828
835
break ;
829
836
830
837
case Tag::SERVER_EXITED:
831
- header->tag = oldTag;
832
838
(Arg::Gds (isc_random) << " Cannot start remote profile session - attachment exited" ).raise ();
833
839
break ;
834
840
@@ -846,41 +852,49 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
846
852
fb_assert (inSize <= sizeof (header->buffer ));
847
853
memcpy (header->buffer , in, inSize);
848
854
855
+ header->tag = tag;
856
+ const auto seq = ++header->seq ;
857
+
858
+ bufferMutexLock.unlock ();
859
+
849
860
if (sharedMemory->eventPost (&header->serverEvent ) != FB_SUCCESS)
850
861
(Arg::Gds (isc_random) << " Cannot start remote profile session - attachment exited" ).raise ();
851
862
863
+ const SLONG TIMEOUT = 500 * 1000 ; // 0.5 sec
864
+ const int serverPID = header->serverEvent .event_pid ;
865
+
866
+ while (true )
852
867
{
853
- const SLONG TIMEOUT = 500 * 1000 ; // 0.5 sec
868
+ { // scope
869
+ EngineCheckout cout (tdbb, FB_FUNCTION);
854
870
855
- const int serverPID = header->serverEvent .event_pid ;
856
- while (true )
857
- {
871
+ if (sharedMemory->eventWait (&header->clientEvent , clientEventCounter, TIMEOUT) == FB_SUCCESS)
872
+ break ;
873
+
874
+ if (serverPID != getpid () && !ISC_check_process_existence (serverPID))
858
875
{
859
- EngineCheckout cout (tdbb, FB_FUNCTION);
860
- if (sharedMemory->eventWait (&header->clientEvent , value, TIMEOUT) == FB_SUCCESS)
861
- break ;
876
+ // Server process was died or exited
877
+ fb_assert ((header->tag == tag) || header->tag == Tag::SERVER_EXITED);
862
878
863
- if (serverPID != getpid () && ! ISC_check_process_existence (serverPID) )
879
+ if (header-> tag == tag )
864
880
{
865
- // Server process was died or exited
866
- fb_assert ((header->tag == tag) || header->tag == Tag::SERVER_EXITED);
867
-
868
- if (header->tag == tag)
881
+ header->tag = Tag::SERVER_EXITED;
882
+ if (header->serverEvent .event_pid )
869
883
{
870
- header->tag = Tag::SERVER_EXITED;
871
- if (header->serverEvent .event_pid )
872
- {
873
- sharedMemory->eventFini (&header->serverEvent );
874
- header->serverEvent .event_pid = 0 ;
875
- }
884
+ sharedMemory->eventFini (&header->serverEvent );
885
+ header->serverEvent .event_pid = 0 ;
876
886
}
877
- break ;
878
887
}
888
+
889
+ break ;
879
890
}
880
- JRD_reschedule (tdbb, true );
881
891
}
892
+
893
+ JRD_reschedule (tdbb, true );
882
894
}
883
895
896
+ bufferMutexLock.lock ();
897
+
884
898
switch (header->tag )
885
899
{
886
900
case Tag::SERVER_EXITED:
@@ -977,7 +991,7 @@ void ProfilerListener::watcherThread()
977
991
{
978
992
while (!exiting)
979
993
{
980
- const SLONG value = sharedMemory->eventClear (&header->serverEvent );
994
+ const SLONG serverEventCounter = sharedMemory->eventClear (&header->serverEvent );
981
995
982
996
if (startup)
983
997
{
@@ -986,18 +1000,39 @@ void ProfilerListener::watcherThread()
986
1000
}
987
1001
else
988
1002
{
1003
+ ProfilerIpc::Tag tag;
1004
+ unsigned seq;
1005
+ std::vector<UCHAR> buffer;
1006
+
989
1007
fb_assert (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP);
990
1008
991
1009
try
992
1010
{
993
1011
FbLocalStatus statusVector;
994
1012
EngineContextHolder tdbb (&statusVector, attachment->getInterface (), FB_FUNCTION);
995
1013
996
- processCommand (tdbb);
997
- header->tag = ProfilerIpc::Tag::RESPONSE;
1014
+ { // scope
1015
+ std::unique_lock bufferMutexLock (header->bufferMutex );
1016
+
1017
+ if (header->userName [0 ] && attachment->getUserName () != header->userName )
1018
+ status_exception::raise (Arg::Gds (isc_miss_prvlg) << " PROFILE_ANY_ATTACHMENT" );
1019
+
1020
+ fb_assert (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP);
1021
+
1022
+ tag = header->tag ;
1023
+ seq = header->seq ;
1024
+ buffer.resize (header->bufferSize );
1025
+ memcpy (buffer.data (), header->buffer , header->bufferSize );
1026
+ }
1027
+
1028
+ processCommand (tdbb, tag, buffer);
1029
+
1030
+ tag = ProfilerIpc::Tag::RESPONSE;
998
1031
}
999
1032
catch (const status_exception& e)
1000
1033
{
1034
+ tag = ProfilerIpc::Tag::EXCEPTION;
1035
+
1001
1036
// // TODO: Serialize status vector instead of formated message.
1002
1037
1003
1038
const ISC_STATUS* status = e.value ();
@@ -1012,32 +1047,51 @@ void ProfilerListener::watcherThread()
1012
1047
errorMsg += temp;
1013
1048
}
1014
1049
1015
- header->bufferSize = MIN (errorMsg.length (), sizeof (header->buffer ) - 1 );
1016
- strncpy ((char *) header->buffer , errorMsg.c_str (), sizeof (header->buffer ));
1017
- header->buffer [header->bufferSize ] = ' \0 ' ;
1018
-
1019
- header->tag = ProfilerIpc::Tag::EXCEPTION;
1050
+ header->bufferSize = MIN (errorMsg.length (), sizeof (header->buffer ));
1051
+ memcpy (header->buffer , errorMsg.c_str (), header->bufferSize );
1020
1052
}
1021
1053
1022
- sharedMemory->eventPost (&header->clientEvent );
1054
+ fb_assert (buffer.size () <= sizeof (header->buffer ));
1055
+
1056
+ { // scope
1057
+ std::unique_lock bufferMutexLock (header->bufferMutex , std::try_to_lock);
1058
+
1059
+ // Otherwise a client lost interest in the response.
1060
+ if (bufferMutexLock.owns_lock () && header->seq == seq)
1061
+ {
1062
+ if (header->seq == seq)
1063
+ {
1064
+ header->tag = tag;
1065
+ header->bufferSize = buffer.size ();
1066
+ memcpy (header->buffer , buffer.data (), buffer.size ());
1067
+
1068
+ sharedMemory->eventPost (&header->clientEvent );
1069
+ }
1070
+ }
1071
+ }
1023
1072
}
1024
1073
1025
1074
if (exiting)
1026
1075
break ;
1027
1076
1028
- sharedMemory->eventWait (&header->serverEvent , value , 0 );
1077
+ sharedMemory->eventWait (&header->serverEvent , serverEventCounter , 0 );
1029
1078
}
1030
1079
}
1031
1080
catch (const Exception& ex)
1032
1081
{
1033
1082
iscLogException (" Error in profiler watcher thread\n " , ex);
1034
1083
}
1035
1084
1036
- const ProfilerIpc::Tag oldTag = header->tag .exchange (ProfilerIpc::Tag::SERVER_EXITED);
1037
- if (oldTag >= ProfilerIpc::Tag::FIRST_CLIENT_OP)
1038
- {
1039
- fb_assert (header->clientEvent .event_pid );
1040
- sharedMemory->eventPost (&header->clientEvent );
1085
+ { // scope
1086
+ std::unique_lock bufferMutexLock (header->bufferMutex );
1087
+
1088
+ if (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP)
1089
+ {
1090
+ fb_assert (header->clientEvent .event_pid );
1091
+ sharedMemory->eventPost (&header->clientEvent );
1092
+ }
1093
+
1094
+ header->tag = ProfilerIpc::Tag::SERVER_EXITED;
1041
1095
}
1042
1096
1043
1097
try
@@ -1051,70 +1105,75 @@ void ProfilerListener::watcherThread()
1051
1105
}
1052
1106
}
1053
1107
1054
- void ProfilerListener::processCommand (thread_db* tdbb)
1108
+ void ProfilerListener::processCommand (thread_db* tdbb, ProfilerIpc::Tag tag, std::vector<UCHAR>& buffer )
1055
1109
{
1056
- const auto header = ipc->sharedMemory ->getHeader ();
1057
1110
const auto profilerManager = attachment->getProfilerManager (tdbb);
1058
1111
1059
- if (header->userName [0 ] && attachment->getUserName () != header->userName )
1060
- status_exception::raise (Arg::Gds (isc_miss_prvlg) << " PROFILE_ANY_ATTACHMENT" );
1061
-
1062
1112
using Tag = ProfilerIpc::Tag;
1063
1113
1064
- switch (header-> tag )
1114
+ switch (tag)
1065
1115
{
1066
1116
case Tag::CANCEL_SESSION:
1117
+ fb_assert (buffer.empty ());
1067
1118
profilerManager->cancelSession ();
1068
- header-> bufferSize = 0 ;
1119
+ buffer. resize ( 0 ) ;
1069
1120
break ;
1070
1121
1071
1122
case Tag::DISCARD:
1123
+ fb_assert (buffer.empty ());
1072
1124
profilerManager->discard ();
1073
- header-> bufferSize = 0 ;
1125
+ buffer. resize ( 0 ) ;
1074
1126
break ;
1075
1127
1076
1128
case Tag::FINISH_SESSION:
1077
1129
{
1078
- const auto in = reinterpret_cast <const ProfilerPackage::FinishSessionInput::Type*>(header->buffer );
1079
- fb_assert (sizeof (*in) == header->bufferSize );
1130
+ const auto in = reinterpret_cast <const ProfilerPackage::FinishSessionInput::Type*>(buffer.data ());
1131
+ fb_assert (sizeof (*in) == buffer.size ());
1132
+
1080
1133
profilerManager->finishSession (tdbb, in->flush );
1081
- header->bufferSize = 0 ;
1134
+
1135
+ buffer.resize (0 );
1082
1136
break ;
1083
1137
}
1084
1138
1085
1139
case Tag::FLUSH:
1140
+ fb_assert (buffer.empty ());
1086
1141
profilerManager->flush ();
1087
- header-> bufferSize = 0 ;
1142
+ buffer. resize ( 0 ) ;
1088
1143
break ;
1089
1144
1090
1145
case Tag::PAUSE_SESSION:
1091
1146
{
1092
- const auto in = reinterpret_cast <const ProfilerPackage::PauseSessionInput::Type*>(header->buffer );
1093
- fb_assert (sizeof (*in) == header->bufferSize );
1147
+ const auto in = reinterpret_cast <const ProfilerPackage::PauseSessionInput::Type*>(buffer.data ());
1148
+ fb_assert (sizeof (*in) == buffer.size ());
1149
+
1094
1150
profilerManager->pauseSession (in->flush );
1095
- header->bufferSize = 0 ;
1151
+
1152
+ buffer.resize (0 );
1096
1153
break ;
1097
1154
}
1098
1155
1099
1156
case Tag::RESUME_SESSION:
1157
+ fb_assert (buffer.empty ());
1100
1158
profilerManager->resumeSession ();
1101
- header-> bufferSize = 0 ;
1159
+ buffer. resize ( 0 ) ;
1102
1160
break ;
1103
1161
1104
1162
case Tag::SET_FLUSH_INTERVAL:
1105
1163
{
1106
- const auto in = reinterpret_cast <const ProfilerPackage::SetFlushIntervalInput::Type*>(header-> buffer );
1107
- fb_assert (sizeof (*in) == header-> bufferSize );
1164
+ const auto in = reinterpret_cast <const ProfilerPackage::SetFlushIntervalInput::Type*>(buffer. data () );
1165
+ fb_assert (sizeof (*in) == buffer. size () );
1108
1166
1109
1167
profilerManager->setFlushInterval (in->flushInterval );
1110
- header->bufferSize = 0 ;
1168
+
1169
+ buffer.resize (0 );
1111
1170
break ;
1112
1171
}
1113
1172
1114
1173
case Tag::START_SESSION:
1115
1174
{
1116
- const auto in = reinterpret_cast <const ProfilerPackage::StartSessionInput::Type*>(header-> buffer );
1117
- fb_assert (sizeof (*in) == header-> bufferSize );
1175
+ const auto in = reinterpret_cast <const ProfilerPackage::StartSessionInput::Type*>(buffer. data () );
1176
+ fb_assert (sizeof (*in) == buffer. size () );
1118
1177
1119
1178
const string description (in->description .str ,
1120
1179
in->descriptionNull ? 0 : in->description .length );
@@ -1125,14 +1184,12 @@ void ProfilerListener::processCommand(thread_db* tdbb)
1125
1184
const string pluginOptions (in->pluginOptions .str ,
1126
1185
in->pluginOptionsNull ? 0 : in->pluginOptions .length );
1127
1186
1128
- const auto out = reinterpret_cast <ProfilerPackage::StartSessionOutput::Type*>(header->buffer );
1129
- static_assert (sizeof (*out) <= sizeof (header->buffer ), " Buffer size too small" );
1130
- header->bufferSize = sizeof (*out);
1187
+ const auto out = reinterpret_cast <ProfilerPackage::StartSessionOutput::Type*>(buffer.data ());
1188
+ buffer.resize (sizeof (*out));
1131
1189
1132
1190
out->sessionIdNull = FB_FALSE;
1133
1191
out->sessionId = profilerManager->startSession (tdbb, flushInterval,
1134
1192
pluginName, description, pluginOptions);
1135
-
1136
1193
break ;
1137
1194
}
1138
1195
0 commit comments