Skip to content

Commit bafe30e

Browse files
committed
Improvements/tests for SEM_LIVE locking
1 parent 3aeebd1 commit bafe30e

File tree

5 files changed

+53
-28
lines changed

5 files changed

+53
-28
lines changed

lib/shared_memory.cpp

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ namespace IPC {
4646
#else
4747
mySem = SEM_FAILED;
4848
#endif
49-
isLocked = false;
49+
isLocked = 0;
5050
}
5151

5252
///\brief Constructs a named semaphore
@@ -60,7 +60,7 @@ namespace IPC {
6060
#else
6161
mySem = SEM_FAILED;
6262
#endif
63-
isLocked = false;
63+
isLocked = 0;
6464
open(name, oflag, mode, value, noWait);
6565
}
6666

@@ -167,13 +167,18 @@ namespace IPC {
167167
#endif
168168
return;
169169
}
170-
if (*this) {
171170
#if defined(__CYGWIN__) || defined(_WIN32)
172-
ReleaseMutex(mySem);
171+
ReleaseMutex(mySem);
173172
#else
174-
sem_post(mySem);
173+
sem_post(mySem);
175174
#endif
176-
isLocked = false;
175+
--isLocked;
176+
if (!isLocked){
177+
uint64_t micros = Util::getMicros(lockTime);
178+
if (micros > 500){
179+
INFO_MSG("Semaphore %s was locked for %.3f ms", myName.c_str(), (double)micros/1000.0);
180+
BACKTRACE;
181+
}
177182
}
178183
}
179184

@@ -188,7 +193,8 @@ namespace IPC {
188193
tmp = sem_wait(mySem);
189194
} while (tmp == -1 && errno == EINTR);
190195
#endif
191-
isLocked = true;
196+
lockTime = Util::getMicros();
197+
++isLocked;
192198
}
193199
}
194200

@@ -207,7 +213,9 @@ namespace IPC {
207213
result = sem_trywait(mySem);
208214
} while (result == -1 && errno == EINTR);
209215
#endif
210-
return isLocked = (result == 0);
216+
isLocked += (result == 0?1:0);
217+
if (isLocked == 1){lockTime = Util::getMicros();}
218+
return isLocked;
211219
}
212220

213221
///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise
@@ -224,28 +232,27 @@ namespace IPC {
224232
/// \todo (roxlu) test tryWaitOneSecond, shared_memory.cpp
225233
uint64_t now = Util::getMicros();
226234
uint64_t timeout = now + 1e6;
227-
while (now < timeout) {
228-
if (0 == sem_trywait(mySem)) {
229-
isLocked = true;
230-
return true;
231-
}
235+
result = 1;
236+
while (result && now < timeout) {
237+
result = sem_trywait(mySem);
232238
usleep(100e3);
233239
now = Util::getMicros();
234240
}
235-
return false;
236241
#else
237242
struct timespec wt;
238243
wt.tv_sec = 1;
239244
wt.tv_nsec = 0;
240245
result = sem_timedwait(mySem, &wt);
241246
#endif
242-
return isLocked = (result == 0);
247+
isLocked += (result == 0?1:0);
248+
if (isLocked == 1){lockTime = Util::getMicros();}
249+
return isLocked;
243250
}
244251

245252
///\brief Closes the currently opened semaphore
246253
void semaphore::close() {
247254
if (*this) {
248-
if (isLocked){post();}
255+
while (isLocked){post();}
249256
#if defined(__CYGWIN__) || defined(_WIN32)
250257
CloseHandle(mySem);
251258
mySem = 0;
@@ -275,7 +282,7 @@ namespace IPC {
275282
/// Unlinks the previously opened semaphore, closing it (if open) in the process.
276283
void semaphore::unlink() {
277284
#if defined(__CYGWIN__) || defined(_WIN32)
278-
if (isLocked){post();}
285+
while (isLocked){post();}
279286
#endif
280287
#if !defined(__CYGWIN__) && !defined(_WIN32)
281288
if (myName.size()){sem_unlink(myName.c_str());}

lib/shared_memory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ namespace IPC {
8585
#else
8686
sem_t * mySem;
8787
#endif
88-
bool isLocked;
88+
unsigned int isLocked;
89+
uint64_t lockTime;
8990
std::string myName;
9091
};
9192

src/controller/controller_statistics.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,7 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){
931931
if (streamIndex.mapped){
932932
static char liveSemName[NAME_BUFFER_SIZE];
933933
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, it->c_str());
934-
IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 1);
934+
IPC::semaphore metaLocker(liveSemName, O_CREAT | O_RDWR, (S_IRWXU|S_IRWXG|S_IRWXO), 8);
935935
metaLocker.wait();
936936
DTSC::Scan strm = DTSC::Packet(streamIndex.mapped, streamIndex.len, true).getScan();
937937
uint64_t lms = 0;

src/input/input_buffer.cpp

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,16 @@ namespace Mist {
195195
if (!liveMeta){
196196
static char liveSemName[NAME_BUFFER_SIZE];
197197
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
198-
liveMeta = new IPC::semaphore(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
198+
liveMeta = new IPC::semaphore(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 8);
199199
}
200200
liveMeta->wait();
201+
liveMeta->wait();
202+
liveMeta->wait();
203+
liveMeta->wait();
204+
liveMeta->wait();
205+
liveMeta->wait();
206+
liveMeta->wait();
207+
liveMeta->wait();
201208

202209
if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped) {
203210
char pageName[NAME_BUFFER_SIZE];
@@ -208,6 +215,13 @@ namespace Mist {
208215
myMeta.writeTo(nProxy.metaPages[0].mapped);
209216
memset(nProxy.metaPages[0].mapped + myMeta.getSendLen(), 0, (nProxy.metaPages[0].len > myMeta.getSendLen() ? std::min((size_t)(nProxy.metaPages[0].len - myMeta.getSendLen()), (size_t)4) : 0));
210217
liveMeta->post();
218+
liveMeta->post();
219+
liveMeta->post();
220+
liveMeta->post();
221+
liveMeta->post();
222+
liveMeta->post();
223+
liveMeta->post();
224+
liveMeta->post();
211225
}
212226

213227
///Checks if removing a key from this track is allowed/safe, and if so, removes it.
@@ -515,12 +529,12 @@ namespace Mist {
515529
updateTrackMeta(finalMap);
516530
hasPush = true;
517531
}
532+
//Update the metadata to reflect all changes
533+
updateMeta();
518534
//Write the final mapped track number and keyframe number to the user page element
519535
//This is used to resume pushing as well as pushing new tracks
520536
userConn.setTrackId(index, finalMap);
521537
userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size());
522-
//Update the metadata to reflect all changes
523-
updateMeta();
524538
continue;
525539
}
526540
//Set the temporary track id for this item, and increase the temporary value for use with the next track
@@ -643,12 +657,16 @@ namespace Mist {
643657
myMeta.tracks[finalMap].lastms = 0;
644658
myMeta.tracks[finalMap].trackID = finalMap;
645659
}
660+
//Update the metadata to reflect all changes
661+
updateMeta();
646662
//Write the final mapped track number and keyframe number to the user page element
647663
//This is used to resume pushing as well as pushing new tracks
648664
userConn.setTrackId(index, finalMap);
649-
userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size());
650-
//Update the metadata to reflect all changes
651-
updateMeta();
665+
if (myMeta.tracks[finalMap].keys.size()){
666+
userConn.setKeynum(index, myMeta.tracks[finalMap].keys.rbegin()->getNumber());
667+
}else{
668+
userConn.setKeynum(index, 0);
669+
}
652670
}
653671
//If the track is active, and this is the element responsible for pushing it
654672
if (activeTracks.count(value) && pushLocation[value] == data) {
@@ -694,7 +712,6 @@ namespace Mist {
694712
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) {
695713
updateMetaFromPage(tNum, pageIt->first);
696714
}
697-
updateMeta();
698715
}
699716

700717
void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) {

src/output/output.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ namespace Mist{
109109
if (!myMeta.vod){
110110
static char liveSemName[NAME_BUFFER_SIZE];
111111
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
112-
liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live);
112+
liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 8, !myMeta.live);
113113
if (*liveSem){
114114
liveSem->wait();
115115
}else{
@@ -1411,7 +1411,7 @@ namespace Mist{
14111411
IPC::semaphore * liveSem = 0;
14121412
static char liveSemName[NAME_BUFFER_SIZE];
14131413
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
1414-
liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live);
1414+
liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 8, !myMeta.live);
14151415
if (*liveSem){
14161416
liveSem->wait();
14171417
}else{

0 commit comments

Comments
 (0)