Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 122 additions & 145 deletions dali/sasha/saxref.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,26 +609,103 @@ struct cDirDesc
return numParts!=grp.ordinality() || partNum>=grp.ordinality() || !grp.queryNode(partNum).endpoint().equals(ep);
}

cFileDesc *addFile(unsigned drv,const char *name,const char *filePath,unsigned filePathOffset,unsigned node,const SocketEndpoint &ep,IGroup &grp,unsigned numnodes,unsigned stripeNum,unsigned numStripedDevices,XRefAllocator *allocator)
CriticalSection &getFile(cFileDesc *&file, cDirDesc *parent, const char *filename, unsigned filenameLen, unsigned partNum, unsigned numParts)
{
// In containerized systems, it is common for the file to have a directory for each part (dir-per-part)
// We know a file is a dir-per-part file if [1] the directory name is numeric and matches the part number
// and [2] the file found has a mask (e.g. ._5_of_10), where '5' matches the directory name
// If [1] and [2] are true, set isDirPerPart to true and add file to parent directory instead of current directory
if (isContainerized()&&(numParts!=NotFound))
{
auto namePtr = (char const *)(name+1);
unsigned dirPerPartNum = readDigits(namePtr, (size32_t)(name[0]), false);
if ((dirPerPartNum>0)&&(partNum==(dirPerPartNum-1)))
{
{
CriticalBlock block(filesCrit);
auto it = files.find(filename);
if (it != files.end())
{
if (!it->second->isDirPerPart)
{
// Not a dir-per-part file, add part to found file
file = it->second.get();
return filesCrit;
}
// Likely this file part is dir-per-part, so don't accidentally add to this file
}
}
{
CriticalBlock parentBlock(parent->filesCrit);
auto it = parent->files.find(filename);
if (it != parent->files.end())
{
// Assume already added as dir-per-part file, add part to found file
file = it->second.get();
}
else
{
// Create file in parent
file = cFileDesc::create(filename, numParts, true, filenameLen, allocator);
parent->files.emplace(filename, file);
}
return parent->filesCrit;
}
}
else
{
// Not a dir-per-part file, check for previously moved file
CriticalBlock parentBlock(parent->filesCrit);
auto it = parent->files.find(filename);
if (it != parent->files.end())
{
if (it->second->isDirPerPart)
{
// Move file to current directory
it->second->isDirPerPart = false;
file = it->second.release();
parent->files.erase(it);
CriticalUnblock unblock(parent->filesCrit);

CriticalBlock block(filesCrit);
files.emplace(filename, file);
return filesCrit;
}
}

// No previously moved file and no dir-per-part. Create in current directory
CriticalUnblock unblock(parent->filesCrit);
CriticalBlock block(filesCrit);
file = cFileDesc::create(filename, numParts, false, filenameLen, allocator);
files.emplace(filename, file);
return filesCrit;
}
}
else
{
CriticalBlock block(filesCrit);
auto it = files.find(filename);
if (it != files.end()) {
file = it->second.get();
} else {
file = cFileDesc::create(filename, numParts, false, filenameLen, allocator);
files.emplace(filename, file);
}
return filesCrit;
}
}

cFileDesc *addFile(unsigned drv,const char *fullname,const char *filePath,unsigned filePathOffset,unsigned node,const SocketEndpoint &ep,IGroup &grp,unsigned numnodes,unsigned stripeNum,unsigned numStripedDevices, cDirDesc *parent,XRefAllocator *allocator)
{
unsigned nf; // num parts
unsigned pf; // part num
unsigned filenameLen; // length of file name excluding extension i.e. ._$P$_of_$N$
StringAttr mask;
const char *fn = decodeName(drv,name,node,numnodes,mask,pf,nf,filenameLen);
const char *fn = decodeName(drv,fullname,node,numnodes,mask,pf,nf,filenameLen);
bool misplaced = isMisplaced(pf,nf,ep,grp,filePath,filePathOffset,stripeNum,numStripedDevices);

CriticalBlock block(filesCrit);
auto it = files.find(fn);
cFileDesc *file = nullptr;

if (it != files.end()) {
file = it->second.get();
} else {
// dirPerPart is set to false during scanDirectories, and later updated in listOrphans by mergeDirPerPartDirs
file = cFileDesc::create(fn,nf,false,filenameLen,allocator);
files.emplace(fn, file);
}
CriticalBlock block(getFile(file,parent,fn,filenameLen,pf,nf));

if (misplaced) {
cMisplacedRec *mp = file->misplaced;
Expand Down Expand Up @@ -732,95 +809,6 @@ struct cMessage: public CInterface
};


// A found file that has a dir-per-part directory will have multiple cFileDesc entries in each of the dir-per-part
// cDirDescs. For found files, we do not know if it is a dir-per-part file since there is no metadata. We only merge
// cFileDescs where only a single file was marked present, and we find matching files in the dir-per-part directories.
static void mergeDirPerPartDirs(cDirDesc *parent, cDirDesc *dir, const char *currentPath)
{
if (!isContainerized())
return;
if (dir->files.empty() || !dir->dirs.empty())
return;

// Check if dir name is a number
StringBuffer dirName;
dir->getName(dirName);
unsigned dirPerPartNum = readDigits(dirName.str());
if (dirPerPartNum == 0)
return;

for (auto fileItr = dir->files.begin(); fileItr != dir->files.end();)
{
cFileDesc *file = fileItr->second.get();
// If this is a dir-per-part directory, the dirPerPartNum cannot be larger than the number of file parts,
// and there should be enough subdirectories under the parent directory for each file part
if (dirPerPartNum > file->N)
{
fileItr++;
continue;
}

// A dir-per-part file will have only the part matching the dir name marked present
// If more than one file is marked present, it is not a dir-per-part file
unsigned present = 0;
for (unsigned j=0;j<file->N;j++)
{
if (file->testpresent(0, j))
present++;
}

// Avoid merging if multiple parts are marked present in a single directory
if (present != 1)
{
fileItr++;
continue;
}

std::string fname = fileItr->first;

cFileDesc *movedFile = nullptr;
bool currentErased = false; // True if fileItr is erased from current directory
unsigned numFileParts = file->N; // file may be deleted during for loop, save the number of parts
for (unsigned k=0;k<numFileParts;k++)
{
auto dirPerPartDirItr = parent->dirs.find(std::to_string(k+1)); // If not end(), dir-per-part directory k+1
if (dirPerPartDirItr == parent->dirs.end())
continue;

cDirDesc *dirPerPartDir = dirPerPartDirItr->second.get(); // dir-per-part directory k+1 under parent directory
auto dirPerPartFileItr = dirPerPartDir->files.find(fname);
if (dirPerPartFileItr != dirPerPartDir->files.end())
{
auto &dirPerPartFile = dirPerPartFileItr->second;
if (movedFile == nullptr)
{
// Move the file from dirPerPartDir to parent
movedFile = dirPerPartFile.get();
parent->files[fname] = std::move(dirPerPartFile);
movedFile->isDirPerPart = true;
}
else
{
movedFile->setpresent(0, k);
if (dirPerPartFile->testmarked(0, k))
movedFile->setmarked(0, k);
}
// Delete cFileDesc from part directories since it has been moved to parent
// If deleting file under dir, update fileItr because we are modifying the container
if (dirPerPartDir == dir)
{
fileItr = dirPerPartDir->files.erase(dirPerPartFileItr);
currentErased = true;
}
else
dirPerPartDir->files.erase(dirPerPartFileItr);
}
}
if (!currentErased)
fileItr++;
}
}

constexpr int64_t oneSecondNS = 1000 * 1000 * 1000; // 1 second in nanoseconds
constexpr int64_t oneHourNS = 60 * 60 * oneSecondNS; // 1 hour in nanoseconds
class XRefPeriodicTimer : public PeriodicTimer
Expand All @@ -843,6 +831,11 @@ class XRefPeriodicTimer : public PeriodicTimer
return PeriodicTimer::hasElapsed();
}

cycle_t queryStartCycles() const
{
return startCycles;
}

void reset(unsigned seconds, bool suppressFirst, const char *_clustname)
{
clustname = _clustname;
Expand Down Expand Up @@ -971,6 +964,22 @@ class CNewXRefManagerBase
log(true, "%s heartbeat started (interval: 1 minute)", op);
}

void finishHeartbeat(const char * op)
{
int64_t elapsedNS = cycle_to_nanosec(get_cycles_now() - heartbeatTimer.queryStartCycles());
unsigned elapsedMinutes = elapsedNS / oneSecondNS / 60;
unsigned elapsedHours = elapsedMinutes / 60;
unsigned elapsedDays = elapsedHours / 24;
unsigned remainingHours = elapsedHours % 24;
unsigned remainingMinutes = elapsedMinutes % 60;
if (elapsedDays > 0)
log(true, "%s complete. Total time: %ud %uh %um, Total dirs: %lu, Total files: %lu", op, elapsedDays, remainingHours, remainingMinutes, processedDirs.load(), processedFiles.load());
else if (elapsedHours > 0)
log(true, "%s complete. Total time: %uh %um, Total dirs: %lu, Total files: %lu", op, elapsedHours, remainingMinutes, processedDirs.load(), processedFiles.load());
else
log(true, "%s complete. Total time: %um, Total dirs: %lu, Total files: %lu", op, elapsedMinutes, processedDirs.load(), processedFiles.load());
}

void checkHeartbeat(const char * op)
{
time_t now = time(NULL);
Expand Down Expand Up @@ -1351,7 +1360,7 @@ class CNewXRefManager: public CNewXRefManagerBase
}


bool scanDirectory(unsigned node,const SocketEndpoint &ep,StringBuffer &path, unsigned drv, cDirDesc *pdir, IFile *cachefile, unsigned level, unsigned filePathOffset, unsigned stripeNum)
bool scanDirectory(unsigned node, const SocketEndpoint &ep, StringBuffer &path, unsigned drv, cDirDesc *pdir, IFile *cachefile, unsigned filePathOffset, unsigned stripeNum, cDirDesc *parent)
{
checkHeartbeat("Directory scan");
size32_t dsz = path.length();
Expand Down Expand Up @@ -1386,46 +1395,15 @@ class CNewXRefManager: public CNewXRefManagerBase
if (iswin)
fname.toLowerCase();
addPathSepChar(path).append(fname);
if (iter->isDir()) {
// NB: Check if a subdirectory is a stripe directory under certain conditions
// Stripe directories must be under root. The level is 0 if root
// Only look for stripe directories if the plane details say it is striped
if ((level == 0) && isPlaneStriped) {
const char *dir = fname.str();
bool isDirStriped = dir[0] == 'd' && dir[1] != '\0'; // Directory may be striped if it starts with 'd' and longer than one character
if (isDirStriped) {
dir++;
while (*dir) {
if (!isdigit(*(dir++))) {
isDirStriped = false;
break;
}
}
if (isDirStriped) {
// To properly match all file parts, we need to remove the stripe directory from the path
// so that the cDirDesc hierarchy matches the logical scope hierarchy
// /var/lib/HPCCSystems/hpcc-data/d1/somescope/otherscope/afile.1_of_2
// /var/lib/HPCCSystems/hpcc-data/d2/somescope/otherscope/afile.2_of_2
// These files would never be matched if we didn't build up the cDirDesc structure without the stripe directory
if (!scanDirectory(node,ep,path,drv,pdir,NULL,level+1,filePathOffset,stripeNum))
return false;

path.setLength(dsz);
continue;
}
}
// Top-level directory is not striped, but isPlaneStriped is true. Throw an error, but continue processing directory as normal
OERRLOG(LOGPFX "Top-level directory striping mismatch for %s: isPlaneStriped=%d", path.str(), isPlaneStriped);
}
if (iter->isDir())
dirs.append(fname.str());
}
else {
CDateTime dt;
nsz += iter->getFileSize();
iter->getModifiedTime(dt);
if (!fileFiltered(path.str(),dt)) {
try {
pdir->addFile(drv,fname.str(),path.str(),filePathOffset,node,ep,*grp,numnodes,stripeNum,numStripedDevices,&allocator);
pdir->addFile(drv,fname.str(),path.str(),filePathOffset,node,ep,*grp,numnodes,stripeNum,numStripedDevices,parent,&allocator);
processedFiles++;
}
catch (IException *e) {
Expand All @@ -1443,7 +1421,7 @@ class CNewXRefManager: public CNewXRefManagerBase
addPathSepChar(path).append(dirs.item(i));
if (file.get()&&!resetRemoteFilename(file,path.str())) // sneaky way of avoiding cache
file.clear();
if (!scanDirectory(node,ep,path,drv,pdir->lookupDir(dirs.item(i),&allocator),file,level+1,filePathOffset,stripeNum))
if (!scanDirectory(node,ep,path,drv,pdir->lookupDir(dirs.item(i),&allocator),file,filePathOffset,stripeNum,pdir))
return false;
path.setLength(dsz);
}
Expand Down Expand Up @@ -1502,7 +1480,7 @@ class CNewXRefManager: public CNewXRefManagerBase
addPathSepChar(path).append('d').append(i+1);

parent.log(false,"Scanning %s directory %s",parent.storagePlane->queryProp("@name"),path.str());
if (!parent.scanDirectory(0,localEP,path,0,parent.root.get(),NULL,1,path.length(),i+1))
if (!parent.scanDirectory(0,localEP,path,0,parent.root.get(),NULL,path.length(),i+1,nullptr))
{
ok = false;
return;
Expand All @@ -1513,7 +1491,7 @@ class CNewXRefManager: public CNewXRefManagerBase
StringBuffer hostStr;
SocketEndpoint ep = parent.rawgrp->queryNode(i).endpoint();
parent.log(false,"Scanning %s directory %s",ep.getEndpointHostText(hostStr).str(),path.str());
if (!parent.scanDirectory(i,ep,path,0,NULL,NULL,0,path.length(),0)) {
if (!parent.scanDirectory(i,ep,path,0,NULL,NULL,path.length(),0,nullptr)) {
ok = false;
return;
}
Expand All @@ -1523,7 +1501,7 @@ class CNewXRefManager: public CNewXRefManagerBase
setReplicateFilename(path,1);
ep = parent.rawgrp->queryNode(i).endpoint();
parent.log(false,"Scanning %s directory %s",ep.getEndpointHostText(hostStr.clear()).str(),path.str());
if (!parent.scanDirectory(i,ep,path,1,NULL,NULL,0,path.length(),0)) {
if (!parent.scanDirectory(i,ep,path,1,NULL,NULL,path.length(),0,nullptr)) {
ok = false;
}
}
Expand All @@ -1541,7 +1519,7 @@ class CNewXRefManager: public CNewXRefManagerBase
startHeartbeat("Directory scan"); // Initialize heartbeat mechanism
afor.For(numMaxThreads,numThreads,true,numThreads>1);
if (afor.ok)
log(true,"Directory scan complete");
finishHeartbeat("Directory scan");
else
log(true,"Errors occurred during scan");
return afor.ok;
Expand Down Expand Up @@ -1702,7 +1680,7 @@ class CNewXRefManager: public CNewXRefManagerBase
bool ret = false;
if (file->getInfo(isdir,sz,dt)&&!isdir)
ret = true;
#ifdef _DEBUG
#if 0
StringBuffer dbgname;
rfn.getPath(dbgname);
PROGLOG("checkOrphanPhysicalFile(%s) = %s",dbgname.str(),ret?"true":"false");
Expand Down Expand Up @@ -1788,7 +1766,7 @@ class CNewXRefManager: public CNewXRefManagerBase
if (!f)
return;
// first check if any orhans at all (maybe could do this faster)
#ifdef _DEBUG
#if 0
StringBuffer dbgname;
f->getNameMask(dbgname);
PROGLOG("listOrphans TEST FILE(%s)",dbgname.str());
Expand Down Expand Up @@ -2033,7 +2011,7 @@ class CNewXRefManager: public CNewXRefManagerBase
return;
basedir.append(rootdir);
}
#ifdef _DEBUG
#if 0
StringBuffer dbgname;
d->getName(dbgname);
PROGLOG("listOrphans TEST DIR(%s)",dbgname.str());
Expand All @@ -2050,7 +2028,6 @@ class CNewXRefManager: public CNewXRefManagerBase

for (auto& dirPair : d->dirs) {
cDirDesc *dir = dirPair.second.get();
mergeDirPerPartDirs(d,dir,basedir);
listOrphans(dir,basedir,scope,abort,recentCutoffDays);
if (abort)
return;
Expand Down Expand Up @@ -2099,7 +2076,7 @@ class CNewXRefManager: public CNewXRefManagerBase
listOrphans(NULL,basedir,scope,abort,recentCutoffDays);
if (abort)
return;
log(true,"Orphan scan complete");
finishHeartbeat("Orphan scan");
sorteddirs.sort(compareDirs); // NB sort reverse
while (!abort&&sorteddirs.ordinality())
dirbranch->addPropTree("Directory",&sorteddirs.popGet());
Expand Down
Loading