Skip to content

Commit 8c70b0e

Browse files
authored
Merge pull request #407 from onflow/fxamacker/add-batch-preload-without-inlining
Add BatchPreload to decode slabs in parallel and cache (for branch without atree inlining)
2 parents aea96d5 + 49ce3d2 commit 8c70b0e

File tree

3 files changed

+441
-0
lines changed

3 files changed

+441
-0
lines changed

storage.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,3 +1528,155 @@ func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) (
15281528

15291529
return references, brokenReferences, nil
15301530
}
1531+
1532+
// BatchPreload decodeds and caches slabs of given ids in parallel.
1533+
// This is useful for storage health or data validation in migration programs.
1534+
func (s *PersistentSlabStorage) BatchPreload(ids []StorageID, numWorkers int) error {
1535+
if len(ids) == 0 {
1536+
return nil
1537+
}
1538+
1539+
// Use 11 for min slab count for parallel decoding because micro benchmarks showed
1540+
// performance regression for <= 10 slabs when decoding slabs in parallel.
1541+
const minCountForBatchPreload = 11
1542+
if len(ids) < minCountForBatchPreload {
1543+
1544+
for _, id := range ids {
1545+
// fetch from base storage last
1546+
data, ok, err := s.baseStorage.Retrieve(id)
1547+
if err != nil {
1548+
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
1549+
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
1550+
}
1551+
if !ok {
1552+
continue
1553+
}
1554+
1555+
slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
1556+
if err != nil {
1557+
// err is already categorized by DecodeSlab().
1558+
return err
1559+
}
1560+
1561+
// save decoded slab to cache
1562+
s.cache[id] = slab
1563+
}
1564+
1565+
return nil
1566+
}
1567+
1568+
type slabToBeDecoded struct {
1569+
slabID StorageID
1570+
data []byte
1571+
}
1572+
1573+
type decodedSlab struct {
1574+
slabID StorageID
1575+
slab Slab
1576+
err error
1577+
}
1578+
1579+
// Define decoder (worker) to decode slabs in parallel
1580+
decoder := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan slabToBeDecoded, results chan<- decodedSlab) {
1581+
defer wg.Done()
1582+
1583+
for slabData := range jobs {
1584+
// Check if goroutine is signaled to stop before proceeding.
1585+
select {
1586+
case <-done:
1587+
return
1588+
default:
1589+
}
1590+
1591+
id := slabData.slabID
1592+
data := slabData.data
1593+
1594+
slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
1595+
// err is already categorized by DecodeSlab().
1596+
results <- decodedSlab{
1597+
slabID: id,
1598+
slab: slab,
1599+
err: err,
1600+
}
1601+
}
1602+
}
1603+
1604+
if numWorkers > len(ids) {
1605+
numWorkers = len(ids)
1606+
}
1607+
1608+
var wg sync.WaitGroup
1609+
1610+
// Construct done signal channel
1611+
done := make(chan struct{})
1612+
1613+
// Construct job queue
1614+
jobs := make(chan slabToBeDecoded, len(ids))
1615+
1616+
// Construct result queue
1617+
results := make(chan decodedSlab, len(ids))
1618+
1619+
defer func() {
1620+
// This ensures that all goroutines are stopped before output channel is closed.
1621+
1622+
// Wait for all goroutines to finish
1623+
wg.Wait()
1624+
1625+
// Close output channel
1626+
close(results)
1627+
}()
1628+
1629+
// Preallocate cache map if empty
1630+
if len(s.cache) == 0 {
1631+
s.cache = make(map[StorageID]Slab, len(ids))
1632+
}
1633+
1634+
// Launch workers
1635+
wg.Add(numWorkers)
1636+
for i := 0; i < numWorkers; i++ {
1637+
go decoder(&wg, done, jobs, results)
1638+
}
1639+
1640+
// Send jobs
1641+
jobCount := 0
1642+
{
1643+
// Need to close input channel (jobs) here because
1644+
// if there isn't any job in jobs channel,
1645+
// done is never processed inside loop "for slabData := range jobs".
1646+
defer close(jobs)
1647+
1648+
for _, id := range ids {
1649+
// fetch from base storage last
1650+
data, ok, err := s.baseStorage.Retrieve(id)
1651+
if err != nil {
1652+
// Closing done channel signals goroutines to stop.
1653+
close(done)
1654+
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
1655+
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
1656+
}
1657+
if !ok {
1658+
continue
1659+
}
1660+
1661+
jobs <- slabToBeDecoded{id, data}
1662+
jobCount++
1663+
}
1664+
}
1665+
1666+
// Process results
1667+
for i := 0; i < jobCount; i++ {
1668+
result := <-results
1669+
1670+
if result.err != nil {
1671+
// Closing done channel signals goroutines to stop.
1672+
close(done)
1673+
// result.err is already categorized by DecodeSlab().
1674+
return result.err
1675+
}
1676+
1677+
// save decoded slab to cache
1678+
s.cache[result.slabID] = result.slab
1679+
}
1680+
1681+
return nil
1682+
}

storage_bench_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,122 @@ func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
132132
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
133133
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
134134
}
135+
136+
func benchmarkRetrieve(b *testing.B, seed int64, numberOfSlabs int) {
137+
138+
r := rand.New(rand.NewSource(seed))
139+
140+
encMode, err := cbor.EncOptions{}.EncMode()
141+
require.NoError(b, err)
142+
143+
decMode, err := cbor.DecOptions{}.DecMode()
144+
require.NoError(b, err)
145+
146+
encodedSlabs := make(map[StorageID][]byte)
147+
ids := make([]StorageID, 0, numberOfSlabs)
148+
for i := 0; i < numberOfSlabs; i++ {
149+
addr := generateRandomAddress(r)
150+
151+
var index StorageIndex
152+
binary.BigEndian.PutUint64(index[:], uint64(i))
153+
154+
id := StorageID{addr, index}
155+
156+
slab := generateLargeSlab(id)
157+
158+
data, err := Encode(slab, encMode)
159+
require.NoError(b, err)
160+
161+
encodedSlabs[id] = data
162+
ids = append(ids, id)
163+
}
164+
165+
b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
166+
for i := 0; i < b.N; i++ {
167+
b.StopTimer()
168+
169+
baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
170+
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)
171+
172+
b.StartTimer()
173+
174+
for _, id := range ids {
175+
_, found, err := storage.Retrieve(id)
176+
require.True(b, found)
177+
require.NoError(b, err)
178+
}
179+
}
180+
})
181+
}
182+
183+
func benchmarkBatchPreload(b *testing.B, seed int64, numberOfSlabs int) {
184+
185+
r := rand.New(rand.NewSource(seed))
186+
187+
encMode, err := cbor.EncOptions{}.EncMode()
188+
require.NoError(b, err)
189+
190+
decMode, err := cbor.DecOptions{}.DecMode()
191+
require.NoError(b, err)
192+
193+
encodedSlabs := make(map[StorageID][]byte)
194+
ids := make([]StorageID, 0, numberOfSlabs)
195+
for i := 0; i < numberOfSlabs; i++ {
196+
addr := generateRandomAddress(r)
197+
198+
var index StorageIndex
199+
binary.BigEndian.PutUint64(index[:], uint64(i))
200+
201+
id := StorageID{addr, index}
202+
203+
slab := generateLargeSlab(id)
204+
205+
data, err := Encode(slab, encMode)
206+
require.NoError(b, err)
207+
208+
encodedSlabs[id] = data
209+
ids = append(ids, id)
210+
}
211+
212+
b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
213+
for i := 0; i < b.N; i++ {
214+
b.StopTimer()
215+
216+
baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
217+
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)
218+
219+
b.StartTimer()
220+
221+
err = storage.BatchPreload(ids, runtime.NumCPU())
222+
require.NoError(b, err)
223+
224+
for _, id := range ids {
225+
_, found, err := storage.Retrieve(id)
226+
require.True(b, found)
227+
require.NoError(b, err)
228+
}
229+
}
230+
})
231+
}
232+
233+
func BenchmarkStorageRetrieve(b *testing.B) {
234+
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.
235+
236+
benchmarkRetrieve(b, fixedSeed, 10)
237+
benchmarkRetrieve(b, fixedSeed, 100)
238+
benchmarkRetrieve(b, fixedSeed, 1_000)
239+
benchmarkRetrieve(b, fixedSeed, 10_000)
240+
benchmarkRetrieve(b, fixedSeed, 100_000)
241+
benchmarkRetrieve(b, fixedSeed, 1_000_000)
242+
}
243+
244+
func BenchmarkStorageBatchPreload(b *testing.B) {
245+
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.
246+
247+
benchmarkBatchPreload(b, fixedSeed, 10)
248+
benchmarkBatchPreload(b, fixedSeed, 100)
249+
benchmarkBatchPreload(b, fixedSeed, 1_000)
250+
benchmarkBatchPreload(b, fixedSeed, 10_000)
251+
benchmarkBatchPreload(b, fixedSeed, 100_000)
252+
benchmarkBatchPreload(b, fixedSeed, 1_000_000)
253+
}

0 commit comments

Comments
 (0)