Skip to content

Commit e149618

Browse files
Differential Privacy Teammiracvbasaran
Differential Privacy Team
authored andcommitted
Fix a rare privacy bug in DistinctPerKey in Privacy on Beam.
The bug occurred when there are outlier users in the input that contribute to many partitions and/or to many values AND the values contributed are the same as values from other users (the second part is critical, if the contributed values only come from a single user then the bug does not occur). Then, the output might not have be DP due to incorrect contribution bounding. See the comments in the newly added tests for concrete examples of when/how the bug used to occur. GitOrigin-RevId: 27bd7a0f5ccb16fe5bd0acda5d141713262638ab Change-Id: I68c3765e95a6b97ed537d1c188736b28b776417e
1 parent 30525af commit e149618

File tree

6 files changed

+261
-22
lines changed

6 files changed

+261
-22
lines changed

privacy-on-beam/pbeam/aggregations.go

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ func init() {
4141
beam.RegisterType(reflect.TypeOf((*decodePairInt64Fn)(nil)))
4242
beam.RegisterType(reflect.TypeOf((*decodePairFloat64Fn)(nil)))
4343
beam.RegisterType(reflect.TypeOf((*dropValuesFn)(nil)))
44+
beam.RegisterType(reflect.TypeOf((*encodeKVFn)(nil)))
4445
beam.RegisterType(reflect.TypeOf((*encodeIDKFn)(nil)))
46+
beam.RegisterType(reflect.TypeOf((*decodeIDKFn)(nil)))
4547
beam.RegisterType(reflect.TypeOf((*expandValuesCombineFn)(nil)))
48+
beam.RegisterType(reflect.TypeOf((*expandFloat64ValuesCombineFn)(nil)))
4649
beam.RegisterType(reflect.TypeOf((*decodePairArrayFloat64Fn)(nil)))
4750
beam.RegisterType(reflect.TypeOf((*partitionsMapFn)(nil)).Elem())
4851
beam.RegisterType(reflect.TypeOf((*prunePartitionsVFn)(nil)).Elem())
@@ -732,6 +735,26 @@ func (fn *dropValuesFn) ProcessElement(id beam.Z, kv kv.Pair) (beam.Z, beam.W, e
732735
return id, k, err
733736
}
734737

738+
// encodeKVFn takes a PCollection<kv.Pair{ID,K}, codedV> as input, and returns a
739+
// PCollection<ID, kv.Pair{K,V}>; where K and V have been coded, and ID has been
740+
// decoded.
741+
type encodeKVFn struct {
742+
InputPairCodec *kv.Codec // Codec for the input kv.Pair{ID,K}
743+
}
744+
745+
func newEncodeKVFn(idkCodec *kv.Codec) *encodeKVFn {
746+
return &encodeKVFn{InputPairCodec: idkCodec}
747+
}
748+
749+
func (fn *encodeKVFn) Setup() error {
750+
return fn.InputPairCodec.Setup()
751+
}
752+
753+
func (fn *encodeKVFn) ProcessElement(pair kv.Pair, codedV []byte) (beam.W, kv.Pair, error) {
754+
id, _, err := fn.InputPairCodec.Decode(pair)
755+
return id, kv.Pair{pair.V, codedV}, err // pair.V is the K in PCollection<kv.Pair{ID,K}, codedV>
756+
}
757+
735758
// encodeIDKFn takes a PCollection<ID,kv.Pair{K,V}> as input, and returns a
736759
// PCollection<kv.Pair{ID,K},V>; where ID and K have been coded, and V has been
737760
// decoded.
@@ -762,6 +785,36 @@ func (fn *encodeIDKFn) ProcessElement(id beam.W, pair kv.Pair) (kv.Pair, beam.V,
762785
return kv.Pair{idBuf.Bytes(), pair.K}, v, err
763786
}
764787

788+
// decodeIDKFn is the reverse operation of encodeIDKFn. It takes a PCollection<kv.Pair{ID,K},V>
789+
// as input, and returns a PCollection<ID, kv.Pair{K,V}>; where K and V has been coded, and ID
790+
// has been decoded.
791+
type decodeIDKFn struct {
792+
VType beam.EncodedType // Type information of the value V
793+
vEnc beam.ElementEncoder // Encoder for privacy ID, set during Setup() according to VType
794+
InputPairCodec *kv.Codec // Codec for the input kv.Pair{ID,K}
795+
}
796+
797+
func newDecodeIDKFn(vType typex.FullType, idkCodec *kv.Codec) *decodeIDKFn {
798+
return &decodeIDKFn{
799+
VType: beam.EncodedType{vType.Type()},
800+
InputPairCodec: idkCodec,
801+
}
802+
}
803+
804+
func (fn *decodeIDKFn) Setup() error {
805+
fn.vEnc = beam.NewElementEncoder(fn.VType.T)
806+
return fn.InputPairCodec.Setup()
807+
}
808+
809+
func (fn *decodeIDKFn) ProcessElement(pair kv.Pair, v beam.V) (beam.W, kv.Pair, error) {
810+
var vBuf bytes.Buffer
811+
if err := fn.vEnc.Encode(v, &vBuf); err != nil {
812+
return nil, kv.Pair{}, fmt.Errorf("pbeam.decodeIDKFn.ProcessElement: couldn't encode V %v: %w", v, err)
813+
}
814+
id, _, err := fn.InputPairCodec.Decode(pair)
815+
return id, kv.Pair{pair.V, vBuf.Bytes()}, err // pair.V is the K in PCollection<kv.Pair{ID,K},V>
816+
}
817+
765818
// decodePairArrayFloat64Fn transforms a PCollection<pairArrayFloat64<codedX,[]float64>> into a
766819
// PCollection<X,[]float64>.
767820
type decodePairArrayFloat64Fn struct {
@@ -858,29 +911,71 @@ func convertUint64ToFloat64Fn(z beam.Z, i uint64) (beam.Z, float64) {
858911
}
859912

860913
type expandValuesAccum struct {
914+
Values [][]byte
915+
}
916+
917+
// expandValuesCombineFn converts a PCollection<K,V> to PCollection<K,[]V> where each value
918+
// corresponding to the same key are collected in a slice. Resulting PCollection has a
919+
// single slice for each key.
920+
type expandValuesCombineFn struct {
921+
VType beam.EncodedType
922+
vEnc beam.ElementEncoder
923+
}
924+
925+
func newExpandValuesCombineFn(vType beam.EncodedType) *expandValuesCombineFn {
926+
return &expandValuesCombineFn{VType: vType}
927+
}
928+
929+
func (fn *expandValuesCombineFn) Setup() {
930+
fn.vEnc = beam.NewElementEncoder(fn.VType.T)
931+
}
932+
933+
func (fn *expandValuesCombineFn) CreateAccumulator() expandValuesAccum {
934+
return expandValuesAccum{Values: make([][]byte, 0)}
935+
}
936+
937+
func (fn *expandValuesCombineFn) AddInput(a expandValuesAccum, value beam.V) (expandValuesAccum, error) {
938+
var vBuf bytes.Buffer
939+
if err := fn.vEnc.Encode(value, &vBuf); err != nil {
940+
return a, fmt.Errorf("pbeam.expandValuesCombineFn.AddInput: couldn't encode V %v: %w", value, err)
941+
}
942+
a.Values = append(a.Values, vBuf.Bytes())
943+
return a, nil
944+
}
945+
946+
func (fn *expandValuesCombineFn) MergeAccumulators(a, b expandValuesAccum) expandValuesAccum {
947+
a.Values = append(a.Values, b.Values...)
948+
return a
949+
}
950+
951+
func (fn *expandValuesCombineFn) ExtractOutput(a expandValuesAccum) [][]byte {
952+
return a.Values
953+
}
954+
955+
type expandFloat64ValuesAccum struct {
861956
Values []float64
862957
}
863958

864-
// expandValuesCombineFn converts a PCollection<K,float64> to PCollection<K,[]float64>
959+
// expandFloat64ValuesCombineFn converts a PCollection<K,float64> to PCollection<K,[]float64>
865960
// where each value corresponding to the same key are collected in a slice. Resulting
866961
// PCollection has a single slice for each key.
867-
type expandValuesCombineFn struct{}
962+
type expandFloat64ValuesCombineFn struct{}
868963

869-
func (fn *expandValuesCombineFn) CreateAccumulator() expandValuesAccum {
870-
return expandValuesAccum{Values: make([]float64, 0)}
964+
func (fn *expandFloat64ValuesCombineFn) CreateAccumulator() expandFloat64ValuesAccum {
965+
return expandFloat64ValuesAccum{Values: make([]float64, 0)}
871966
}
872967

873-
func (fn *expandValuesCombineFn) AddInput(a expandValuesAccum, value float64) expandValuesAccum {
968+
func (fn *expandFloat64ValuesCombineFn) AddInput(a expandFloat64ValuesAccum, value float64) expandFloat64ValuesAccum {
874969
a.Values = append(a.Values, value)
875970
return a
876971
}
877972

878-
func (fn *expandValuesCombineFn) MergeAccumulators(a, b expandValuesAccum) expandValuesAccum {
973+
func (fn *expandFloat64ValuesCombineFn) MergeAccumulators(a, b expandFloat64ValuesAccum) expandFloat64ValuesAccum {
879974
a.Values = append(a.Values, b.Values...)
880975
return a
881976
}
882977

883-
func (fn *expandValuesCombineFn) ExtractOutput(a expandValuesAccum) []float64 {
978+
func (fn *expandFloat64ValuesCombineFn) ExtractOutput(a expandFloat64ValuesAccum) []float64 {
884979
return a.Values
885980
}
886981

privacy-on-beam/pbeam/coders.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func init() {
3333
beam.RegisterCoder(reflect.TypeOf(boundedMeanAccumFloat64{}), encodeBoundedMeanAccumFloat64, decodeBoundedMeanAccumFloat64)
3434
beam.RegisterCoder(reflect.TypeOf(boundedQuantilesAccum{}), encodeBoundedQuantilesAccum, decodeBoundedQuantilesAccum)
3535
beam.RegisterCoder(reflect.TypeOf(expandValuesAccum{}), encodeExpandValuesAccum, decodeExpandValuesAccum)
36+
beam.RegisterCoder(reflect.TypeOf(expandFloat64ValuesAccum{}), encodeExpandFloat64ValuesAccum, decodeExpandFloat64ValuesAccum)
3637
beam.RegisterCoder(reflect.TypeOf(partitionSelectionAccum{}), encodePartitionSelectionAccum, decodePartitionSelectionAccum)
3738
}
3839

@@ -96,6 +97,16 @@ func decodeExpandValuesAccum(data []byte) (expandValuesAccum, error) {
9697
return ret, err
9798
}
9899

100+
func encodeExpandFloat64ValuesAccum(v expandFloat64ValuesAccum) ([]byte, error) {
101+
return encode(v)
102+
}
103+
104+
func decodeExpandFloat64ValuesAccum(data []byte) (expandFloat64ValuesAccum, error) {
105+
var ret expandFloat64ValuesAccum
106+
err := decode(&ret, data)
107+
return ret, err
108+
}
109+
99110
func encodePartitionSelectionAccum(v partitionSelectionAccum) ([]byte, error) {
100111
return encode(v)
101112
}

privacy-on-beam/pbeam/distinct_per_key.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type DistinctPerKeyParams struct {
7474
func DistinctPerKey(s beam.Scope, pcol PrivatePCollection, params DistinctPerKeyParams) beam.PCollection {
7575
s = s.Scope("pbeam.DistinctPerKey")
7676
// Obtain type information from the underlying PCollection<K,V>.
77-
_, kvT := beam.ValidateKVType(pcol.col)
77+
idT, kvT := beam.ValidateKVType(pcol.col)
7878
if kvT.Type() != reflect.TypeOf(kv.Pair{}) {
7979
log.Fatalf("DistinctPerKey must be used on a PrivatePCollection of type <K,V>, got type %v instead", kvT)
8080
}
@@ -106,14 +106,62 @@ func DistinctPerKey(s beam.Scope, pcol PrivatePCollection, params DistinctPerKey
106106
log.Fatal(err)
107107
}
108108

109-
// Perform partition selection
109+
// Do initial per- and cross-partition contribution bounding and swap kv.Pair<K,V> and ID.
110+
// This is not great in terms of utility, since dropping contributions randomly might
111+
// mean that we keep duplicates instead of distinct values. However, this is necessary
112+
// for the current algorithm to be DP.
113+
if spec.testMode != noNoiseWithoutContributionBounding {
114+
// First, rekey by kv.Pair{ID,K} and do per-partition contribution bounding.
115+
rekeyed := beam.ParDo(
116+
s,
117+
newEncodeIDKFn(idT, pcol.codec),
118+
pcol.col,
119+
beam.TypeDefinition{Var: beam.VType, T: pcol.codec.VType.T}) // PCollection<kv.Pair{ID,K}, V>.
120+
// Keep only maxContributionsPerPartition values per (privacyKey, partitionKey) pair.
121+
sampled := boundContributions(s, rekeyed, params.MaxContributionsPerPartition)
122+
123+
// Collect all values per kv.Pair{ID,K} in a slice.
124+
combined := beam.CombinePerKey(s,
125+
newExpandValuesCombineFn(pcol.codec.VType),
126+
sampled) // PCollection<kv.Pair{ID,K}, []codedV}>, where codedV=[]byte
127+
128+
_, codedVSliceType := beam.ValidateKVType(combined)
129+
130+
decoded := beam.ParDo(
131+
s,
132+
newDecodeIDKFn(codedVSliceType, kv.NewCodec(idT.Type(), pcol.codec.KType.T)),
133+
combined,
134+
beam.TypeDefinition{Var: beam.WType, T: idT.Type()}) // PCollection<ID, kv.Pair{K,[]codedV}>, where codedV=[]byte
135+
136+
// Second, do cross-partition contribution bounding.
137+
decoded = boundContributions(s, decoded, params.MaxPartitionsContributed)
138+
139+
rekeyed = beam.ParDo(
140+
s,
141+
newEncodeIDKFn(idT, kv.NewCodec(pcol.codec.KType.T, codedVSliceType.Type())),
142+
decoded,
143+
beam.TypeDefinition{Var: beam.VType, T: codedVSliceType.Type()}) // PCollection<kv.Pair{ID,K}, []codedV>, where codedV=[]byte
144+
145+
flattened := beam.ParDo(s, flattenValuesFn, rekeyed) // PCollection<kv.Pair{ID,K}, codedV>, where codedV=[]byte
146+
147+
pcol.col = beam.ParDo(
148+
s,
149+
newEncodeKVFn(kv.NewCodec(idT.Type(), pcol.codec.KType.T)),
150+
flattened,
151+
beam.TypeDefinition{Var: beam.WType, T: idT.Type()}) // PCollection<ID, kv.Pair{K,V}>
152+
}
153+
154+
// Perform partition selection.
155+
// We do partition selection after cross-partition contribution bounding because
156+
// we want to keep the same contributions across partitions for partition selection
157+
// and Count.
110158
noiseEpsilon, partitionSelectionEpsilon, noiseDelta, partitionSelectionDelta := splitBudget(epsilon, delta, noiseKind)
111159
partitions := SelectPartitions(s, pcol, SelectPartitionsParams{Epsilon: partitionSelectionEpsilon, Delta: partitionSelectionDelta, MaxPartitionsContributed: params.MaxPartitionsContributed})
112160

113-
// Deduplicate (partitionKey,value) pairs across users.
114-
rekeyed := beam.SwapKV(s, pcol.col) // PCollection<kv.Pair{K,V}, ID>.
115-
// Only keep one privacyKey per (partitionKey,value) pair.
116-
sampled := boundContributions(s, rekeyed, 1)
161+
// Keep only one privacyKey per (partitionKey, value) pair
162+
// (i.e. remove duplicate values for each partition).
163+
swapped := beam.SwapKV(s, pcol.col) // PCollection<kv.Pair{K,V}, ID>
164+
sampled := boundContributions(s, swapped, 1)
117165

118166
// Drop V's, each <privacyKey, partitionKey> pair now corresponds to a unique V.
119167
sampled = beam.SwapKV(s, sampled) // PCollection<ID, kv.Pair{K,V}>.

privacy-on-beam/pbeam/distinct_per_key_test.go

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@ import (
3030
// are correctly counted (without duplicates).
3131
func TestDistinctPrivacyKeyNoNoise(t *testing.T) {
3232
var triples []testutils.TripleWithIntValue
33-
for i := 0; i < 100; i++ { // Add 400 values of which 200 are distinct to Partition 0.
33+
for i := 0; i < 100; i++ { // Add 200 distinct values to Partition 0.
3434
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})
3535
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: 100 + i})
36-
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i}) // Duplicate each value. Should be discarded by DistinctPerKey.
37-
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: 100 + i}) // Duplicate each value. Should be discarded by DistinctPerKey.
36+
}
37+
for i := 100; i < 200; i++ { // Add 200 additional values, all of which are duplicates of the existing distinct values, to Partition 0.
38+
// The duplicates come from users different from the 100 users above in order to not discard
39+
// any distinct values during the initial per-partition contribution bounding step.
40+
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i - 100}) // Duplicate. Should be discarded by DistinctPerKey.
41+
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i}) // Duplicate. Should be discarded by DistinctPerKey.
3842
}
3943
for i := 0; i < 50; i++ { // Add 200 values of which 100 are distinct to Partition 1.
4044
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 1, Value: i})
@@ -183,6 +187,49 @@ func TestDistinctPerKeyPerKeyCrossPartitionContributionBounding(t *testing.T) {
183187
}
184188
}
185189

190+
// Checks that DistinctPrivacyKey bounds cross-partition contributions before doing deduplication of
191+
// values. This is to ensure we don't run into a contribution bounding-related privacy bug in some
192+
// rare cases.
193+
func TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication(t *testing.T) {
194+
var triples []testutils.TripleWithIntValue
195+
for i := 0; i < 100; i++ { // Add value=1 to 100 partitions.
196+
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: i, Value: 1})
197+
}
198+
for i := 0; i < 100; i++ { // Add a user that contributes value=1 to all 100 partitions.
199+
triples = append(triples, testutils.TripleWithIntValue{ID: 100, Partition: i, Value: 1})
200+
}
201+
// Assume cross-partition contribution bounding is not done before deduplication of values.
202+
// Each value=1 in each of the i ∈ {0, ..., 99} partitions would have two users associated
203+
// with it: user with ID=i and user with ID=100. We pick one of these two users randomly,
204+
// so in expectation about 50 of 100 partitions' deduplicated values would have user with id=100
205+
// associated with them. After cross-partition contribution bounding happens, we would be
206+
// left with around 50 partitions with a single distinct value each and the test would fail.
207+
result := []testutils.TestInt64Metric{}
208+
for i := 0; i < 100; i++ {
209+
result = append(result, testutils.TestInt64Metric{i, 1})
210+
}
211+
p, s, col, want := ptest.CreateList2(triples, result)
212+
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
213+
214+
// ε=50, δ=1-10⁻¹⁵ and l1Sensitivity=1 gives a threshold of ≈2.
215+
// However, since δ is very large, a partition with a single user
216+
// is kept with a probability almost 1.
217+
// We have 100 partitions. So, to get an overall flakiness of 10⁻²³,
218+
// we can have each partition fail with 1-10⁻²⁵ probability (k=25).
219+
epsilon, delta, k, l1Sensitivity := 50.0, 1-1e-15, 25.0, 1.0
220+
// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
221+
pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
222+
pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
223+
got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 1, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 1})
224+
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
225+
if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
226+
t.Fatalf("TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication: %v", err)
227+
}
228+
if err := ptest.Run(p); err != nil {
229+
t.Errorf("TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
230+
}
231+
}
232+
186233
// Checks that DistinctPrivacyKey bounds per-partition contributions correctly.
187234
func TestDistinctPrivacyKeyPerPartitionContributionBounding(t *testing.T) {
188235
var triples []testutils.TripleWithIntValue
@@ -218,19 +265,57 @@ func TestDistinctPrivacyKeyPerPartitionContributionBounding(t *testing.T) {
218265
// ε=50, δ=10⁻¹⁰⁰ and l1Sensitivity=6 gives a threshold of ≈33.
219266
// We have 3 partitions. So, to get an overall flakiness of 10⁻²³,
220267
// we can have each partition fail with 1-10⁻²⁵ probability (k=25).
221-
// To see the logic and the math behind flakiness and tolerance calculation,
222-
// See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf.
223268
epsilon, delta, k, l1Sensitivity := 50.0, 1e-100, 25.0, 6.0
224269
// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
225270
pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
226271
pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
227272
got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 3, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 2})
228273
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
229274
if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
230-
t.Fatalf("TestDistinctPerKeyNoNoise: %v", err)
275+
t.Fatalf("TestDistinctPrivacyKeyPerPartitionContributionBounding: %v", err)
231276
}
232277
if err := ptest.Run(p); err != nil {
233-
t.Errorf("TestDistinctPerKeyNoNoise: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
278+
t.Errorf("TestDistinctPrivacyKeyPerPartitionContributionBounding: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
279+
}
280+
}
281+
282+
// Checks that DistinctPrivacyKey bounds per-partition contributions before doing deduplication of
283+
// values. This is to ensure we don't run into a contribution bounding-related privacy bug in some
284+
// rare cases.
285+
func TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication(t *testing.T) {
286+
var triples []testutils.TripleWithIntValue
287+
for i := 0; i < 100; i++ { // Add 100 distinct values to Partition 0.
288+
triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})
289+
}
290+
for i := 0; i < 100; i++ { // Add a user that contributes all these 100 distinct values to Partition 0.
291+
triples = append(triples, testutils.TripleWithIntValue{ID: 100, Partition: 0, Value: i})
292+
}
293+
// Assume per-partition contribution bounding is not done before deduplication of values.
294+
// Each value i ∈ {0, ..., 99} would have two users associated with it: user with ID=i and
295+
// user with ID=100. We pick one of these two users randomly, so in expectation about 50
296+
// of 100 deduplicated values would have user with id=100 associated with them. After
297+
// per-partition contribution bounding happens, we would be left with around 50 distinct
298+
// values and the test would fail.
299+
result := []testutils.TestInt64Metric{
300+
{0, 100},
301+
}
302+
p, s, col, want := ptest.CreateList2(triples, result)
303+
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
304+
305+
// ε=50, δ=10⁻¹⁰⁰ and l1Sensitivity=1 gives a threshold of ≈6.
306+
// We have 1 partition. So, to get an overall flakiness of 10⁻²³,
307+
// we need to have each partition pass with 1-10⁻²³ probability (k=23).
308+
epsilon, delta, k, l1Sensitivity := 50.0, 1e-100, 23.0, 1.0
309+
// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
310+
pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
311+
pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
312+
got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 1, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 1})
313+
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
314+
if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
315+
t.Fatalf("TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication: %v", err)
316+
}
317+
if err := ptest.Run(p); err != nil {
318+
t.Errorf("TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
234319
}
235320
}
236321

0 commit comments

Comments
 (0)