Skip to content

Index prefetch packs in parallel #1843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 72 additions & 42 deletions GVFS/GVFS.Common/Git/GitObjects.cs
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,16 @@ private LooseObjectToWrite GetLooseObjectDestination(string sha)
string tempPackFolderPath = Path.Combine(this.Enlistment.GitPackRoot, TempPackFolder);
this.fileSystem.CreateDirectory(tempPackFolderPath);

List<TempPrefetchPackAndIdx> tempPacks = new List<TempPrefetchPackAndIdx>();
foreach (PrefetchPacksDeserializer.PackAndIndex pack in deserializer.EnumeratePacks())
var tempPacksTasks = new List<Task<TempPrefetchPackAndIdx>>();
// Future: We could manage cancellation of index building tasks if one fails (to stop indexing of later
// files if an early one fails), but in practice the first pack file takes the majority of the time and
// all the others will finish long before it so there would be no benefit to doing so.
bool allSucceeded = true;

// Read each pack from the stream to a temp file, and start a task to index it.
foreach (PrefetchPacksDeserializer.PackAndIndex packHandle in deserializer.EnumeratePacks())
{
// The advertised size may not match the actual, on-disk size.
long indexLength = 0;
var pack = packHandle; // Capture packHandle in a new variable to avoid closure issues with async index task
long packLength;

// Write the temp and index to a temp folder to avoid putting corrupt files in the pack folder
Expand All @@ -701,73 +706,98 @@ private LooseObjectToWrite GetLooseObjectDestination(string sha)
if (!this.TryWriteTempFile(activity, pack.PackStream, packTempPath, out packLength, out packFlushTask))
{
bytesDownloaded += packLength;
return new RetryWrapper<GitObjectsHttpRequestor.GitObjectTaskResult>.CallbackResult(null, true);
allSucceeded = false;
break;
}

bytesDownloaded += packLength;

// We can't trust the index file from the server, so we will always build our own.
// We still need to consume and handle any exceptions from the index stream though.
var canContinue = true;
GitProcess.Result result;
if (this.TryBuildIndex(activity, packTempPath, out result, gitProcess))
// For performance, we run the index build in the background while we continue downloading the next pack.
var indexTask = Task.Run(async () =>
{
tempPacks.Add(new TempPrefetchPackAndIdx(pack.Timestamp, packName, packTempPath, packFlushTask, idxName, idxTempPath, idxFlushTask: null));
if (pack.IndexStream != null)
// GitProcess only permits one process per instance at a time, so we need to duplicate it to run the index build in parallel.
// This is safe because each process is only accessing the pack file we direct it to which is not yet part
// of the enlistment.
GitProcess gitProcessForIndex = new GitProcess(this.Enlistment);
if (this.TryBuildIndex(activity, packTempPath, out var _, gitProcessForIndex))
{
try
return new TempPrefetchPackAndIdx(pack.Timestamp, packName, packTempPath, packFlushTask, idxName, idxTempPath, idxFlushTask: null);
}
else
{
await packFlushTask;
return null;
}
});
tempPacksTasks.Add(indexTask);

// If the server provided an index stream, we still need to consume and handle any exceptions it even
// though we are otherwise ignoring it.
if (pack.IndexStream != null)
{
try
{
bytesDownloaded += pack.IndexStream.Length;
if (pack.IndexStream.CanSeek)
{
bytesDownloaded += pack.IndexStream.Length;
if (pack.IndexStream.CanSeek)
{
pack.IndexStream.Seek(0, SeekOrigin.End);
}
else
{
pack.IndexStream.CopyTo(Stream.Null);
}
pack.IndexStream.Seek(0, SeekOrigin.End);
}
catch (Exception e)
else
{
canContinue = false;
EventMetadata metadata = CreateEventMetadata(e);
activity.RelatedWarning(metadata, "Failed to read to end of index stream");
pack.IndexStream.CopyTo(Stream.Null);
}
}
catch (Exception e)
{
EventMetadata metadata = CreateEventMetadata(e);
activity.RelatedWarning(metadata, "Failed to read to end of index stream");
allSucceeded = false;
break;
}
}
else
}

// Wait for the index tasks to complete. If any fail, we still copy the prior successful ones
// to the pack folder so that the retry will be incremental from where the failure occurred.
var tempPacks = new List<TempPrefetchPackAndIdx>();
bool indexTasksSucceededSoFar = true;
foreach (var task in tempPacksTasks)
{
TempPrefetchPackAndIdx tempPack = task.Result;
if (tempPack != null && indexTasksSucceededSoFar)
{
canContinue = false;
tempPacks.Add(tempPack);
}

if (!canContinue)
else
{
if (packFlushTask != null)
{
packFlushTask.Wait();
}

// Move whatever has been successfully downloaded so far
Exception moveException;
this.TryFlushAndMoveTempPacks(tempPacks, ref latestTimestamp, out moveException);

return new RetryWrapper<GitObjectsHttpRequestor.GitObjectTaskResult>.CallbackResult(null, true);
indexTasksSucceededSoFar = false;
tempPack?.PackFlushTask.Wait();
break;
}
}
allSucceeded = allSucceeded && indexTasksSucceededSoFar;

Exception exception = null;
if (!this.TryFlushAndMoveTempPacks(tempPacks, ref latestTimestamp, out exception))
{
return new RetryWrapper<GitObjectsHttpRequestor.GitObjectTaskResult>.CallbackResult(exception, true);
allSucceeded = false;
}

foreach (TempPrefetchPackAndIdx tempPack in tempPacks)
{
packIndexes.Add(tempPack.IdxName);
}

return new RetryWrapper<GitObjectsHttpRequestor.GitObjectTaskResult>.CallbackResult(
new GitObjectsHttpRequestor.GitObjectTaskResult(success: true));
if (allSucceeded)
{
return new RetryWrapper<GitObjectsHttpRequestor.GitObjectTaskResult>.CallbackResult(
new GitObjectsHttpRequestor.GitObjectTaskResult(success: true));
}
else
{
return new RetryWrapper<GitObjectsHttpRequestor.GitObjectTaskResult>.CallbackResult(exception, shouldRetry: true);
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion GVFS/GVFS.Common/Git/GitProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,11 @@ public Result VerifyCommitGraph(string objectDir)

public Result IndexPack(string packfilePath, string idxOutputPath)
{
return this.InvokeGitAgainstDotGitFolder($"index-pack -o \"{idxOutputPath}\" \"{packfilePath}\"");
/* Git's default thread count is Environment.ProcessorCount / 2, with a maximum of 20.
* Testing shows that we can get a 5% decrease in gvfs clone time for large repositories by using more threads, but
* we won't go over ProcessorCount or 20. */
var threadCount = Math.Min(Environment.ProcessorCount, 20);
return this.InvokeGitAgainstDotGitFolder($"index-pack --threads={threadCount} -o \"{idxOutputPath}\" \"{packfilePath}\"");
}

/// <summary>
Expand Down
Loading