Skip to content

Commit 3c9bcd7

Browse files
committed
fix major bug when receiving empty ouput
1 parent 1ff3a62 commit 3c9bcd7

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

sinker/sinker.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,19 +103,22 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
103103

104104
dbChanges := &pbdatabase.DatabaseChanges{}
105105
mapOutput := output.GetMapOutput()
106-
if !mapOutput.MessageIs(dbChanges) && mapOutput.TypeUrl != "type.googleapis.com/sf.substreams.database.v1.DatabaseChanges" {
107-
return fmt.Errorf("mismatched message type: trying to unmarshal unknown type %q", mapOutput.MessageName())
108-
}
109106

110-
// We do not use UnmarshalTo here because we need to parse an older proto type and
111-
// UnmarshalTo enforces the type check. So we check manually the `TypeUrl` above and we use
112-
// `Unmarshal` instead which only deals with the bytes value.
113-
if err := proto.Unmarshal(mapOutput.Value, dbChanges); err != nil {
114-
return fmt.Errorf("unmarshal database changes: %w", err)
115-
}
107+
if mapOutput.String() != "" {
108+
if !mapOutput.MessageIs(dbChanges) && mapOutput.TypeUrl != "type.googleapis.com/sf.substreams.database.v1.DatabaseChanges" {
109+
return fmt.Errorf("mismatched message type: trying to unmarshal unknown type %q", mapOutput.MessageName())
110+
}
116111

117-
if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
118-
return fmt.Errorf("apply database changes: %w", err)
112+
// We do not use UnmarshalTo here because we need to parse an older proto type and
113+
// UnmarshalTo enforces the type check. So we check manually the `TypeUrl` above and we use
114+
// `Unmarshal` instead which only deals with the bytes value.
115+
if err := proto.Unmarshal(mapOutput.Value, dbChanges); err != nil {
116+
return fmt.Errorf("unmarshal database changes: %w", err)
117+
}
118+
119+
if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil {
120+
return fmt.Errorf("apply database changes: %w", err)
121+
}
119122
}
120123

121124
if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 {

0 commit comments

Comments
 (0)