Skip to content

Commit 4748f93

Browse files
authored
Fix revocation accum sync when endorsement txn fails (#3547) (#3555)
Signed-off-by: jamshale <jamiehalebc@gmail.com>
1 parent 09b6c52 commit 4748f93

File tree

5 files changed

+277
-3
lines changed

5 files changed

+277
-3
lines changed

acapy_agent/protocols/endorse_transaction/v1_0/handlers/endorsed_transaction_response_handler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
HandlerException,
77
RequestContext,
88
)
9+
from .....revocation.util import notify_rev_reg_entry_txn_failed
910
from .....storage.error import StorageError
1011
from ..manager import TransactionManager, TransactionManagerError
1112
from ..messages.endorsed_transaction_response import EndorsedTransactionResponse
@@ -30,12 +31,15 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
3031
if not context.connection_ready:
3132
raise HandlerException("No connection established")
3233

33-
# profile_session = await context.session()
34+
async def send_failed_transaction_event(err_msg: str):
35+
await notify_rev_reg_entry_txn_failed(context.profile, err_msg)
36+
3437
mgr = TransactionManager(context.profile)
3538
try:
3639
transaction = await mgr.receive_endorse_response(context.message)
3740
except TransactionManagerError as err:
3841
self._logger.exception("Error receiving endorsed transaction response")
42+
await send_failed_transaction_event(str(err))
3943
raise HandlerException(str(err))
4044

4145
# Automatically write transaction if flag is set
@@ -52,4 +56,5 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
5256
)
5357
except (StorageError, TransactionManagerError) as err:
5458
self._logger.exception(err)
59+
await send_failed_transaction_event(str(err))
5560
raise HandlerException(str(err))

acapy_agent/revocation/manager.py

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,40 @@
11
"""Classes to manage credential revocation."""
22

3+
import asyncio
34
import json
45
import logging
56
from typing import Mapping, NamedTuple, Optional, Sequence, Text, Tuple
67

8+
from ..cache.base import BaseCache
79
from ..connections.models.conn_record import ConnRecord
810
from ..core.error import BaseError
9-
from ..core.profile import Profile
11+
from ..core.profile import Profile, ProfileSession
12+
from ..indy.credx.issuer import CATEGORY_REV_REG
1013
from ..indy.issuer import IndyIssuer
14+
from ..ledger.base import BaseLedger
15+
from ..messaging.responder import BaseResponder
16+
from ..protocols.endorse_transaction.v1_0.manager import (
17+
TransactionManager,
18+
TransactionManagerError,
19+
)
20+
from ..protocols.endorse_transaction.v1_0.util import (
21+
get_endorser_connection_id,
22+
)
1123
from ..protocols.issue_credential.v1_0.models.credential_exchange import (
1224
V10CredentialExchange,
1325
)
1426
from ..protocols.issue_credential.v2_0.models.cred_ex_record import V20CredExRecord
1527
from ..protocols.revocation_notification.v1_0.models.rev_notification_record import (
1628
RevNotificationRecord,
1729
)
18-
from ..storage.error import StorageNotFoundError
30+
from ..storage.error import StorageError, StorageNotFoundError
1931
from .indy import IndyRevocation
2032
from .models.issuer_cred_rev_record import IssuerCredRevRecord
2133
from .models.issuer_rev_reg_record import IssuerRevRegRecord
2234
from .util import notify_pending_cleared_event, notify_revocation_published_event
2335

36+
LOGGER = logging.getLogger(__name__)
37+
2438

2539
class RevocationManagerError(BaseError):
2640
"""Revocation manager error."""
@@ -498,3 +512,140 @@ async def set_cred_revoked_state(
498512
await txn.commit()
499513
except StorageNotFoundError:
500514
pass
515+
516+
async def _get_endorser_info(self) -> Tuple[Optional[str], Optional[ConnRecord]]:
517+
connection_id = await get_endorser_connection_id(self._profile)
518+
519+
endorser_did = None
520+
async with self._profile.session() as session:
521+
connection_record = await ConnRecord.retrieve_by_id(session, connection_id)
522+
endorser_info = await connection_record.metadata_get(session, "endorser_info")
523+
endorser_did = endorser_info.get("endorser_did")
524+
525+
return endorser_did, connection_record
526+
527+
async def fix_and_publish_from_invalid_accum_err(self, err_msg: str):
528+
"""Fix and publish revocation registry entries from invalid accumulator error."""
529+
cache = self._profile.inject_or(BaseCache)
530+
531+
async def check_retry(accum):
532+
"""Used to manage retries for fixing revocation registry entries."""
533+
retry_value = await cache.get(accum)
534+
if not retry_value:
535+
await cache.set(accum, 5)
536+
else:
537+
if retry_value > 0:
538+
await cache.set(accum, retry_value - 1)
539+
else:
540+
LOGGER.error(
541+
f"Revocation registry entry transaction failed for {accum}"
542+
)
543+
544+
def get_genesis_transactions():
545+
"""Get the genesis transactions needed for fixing broken accum."""
546+
genesis_transactions = self._profile.context.settings.get(
547+
"ledger.genesis_transactions"
548+
)
549+
if not genesis_transactions:
550+
write_ledger = self._profile.context.injector.inject(BaseLedger)
551+
pool = write_ledger.pool
552+
genesis_transactions = pool.genesis_txns
553+
return genesis_transactions
554+
555+
async def sync_accumulator(session: ProfileSession):
556+
"""Sync the local accumulator with the ledger and create recovery txn."""
557+
rev_reg_record = await IssuerRevRegRecord.retrieve_by_id(
558+
session, rev_reg_entry.name
559+
)
560+
561+
# Fix and get the recovery transaction
562+
(
563+
rev_reg_delta,
564+
recovery_txn,
565+
applied_txn,
566+
) = await rev_reg_record.fix_ledger_entry(
567+
self._profile, False, genesis_transactions
568+
)
569+
570+
# Update locally assuming ledger write will succeed
571+
rev_reg = await session.handle.fetch(
572+
CATEGORY_REV_REG,
573+
rev_reg_entry.value_json["revoc_reg_id"],
574+
for_update=True,
575+
)
576+
new_value_json = rev_reg.value_json
577+
new_value_json["value"]["accum"] = recovery_txn["value"]["accum"]
578+
await session.handle.replace(
579+
CATEGORY_REV_REG,
580+
rev_reg.name,
581+
json.dumps(new_value_json),
582+
rev_reg.tags,
583+
)
584+
585+
return rev_reg_record, recovery_txn
586+
587+
async def create_and_send_endorser_txn():
588+
"""Create and send the endorser transaction again."""
589+
async with ledger:
590+
# Create the revocation registry entry
591+
rev_entry_res = await ledger.send_revoc_reg_entry(
592+
rev_reg_entry.value_json["revoc_reg_id"],
593+
"CL_ACCUM",
594+
recovery_txn,
595+
rev_reg_record.issuer_did,
596+
write_ledger=False,
597+
endorser_did=endorser_did,
598+
)
599+
600+
# Send the transaction to the endorser again with recovery txn
601+
transaction_manager = TransactionManager(self._profile)
602+
try:
603+
revo_transaction = await transaction_manager.create_record(
604+
messages_attach=rev_entry_res["result"],
605+
connection_id=connection.connection_id,
606+
)
607+
(
608+
revo_transaction,
609+
revo_transaction_request,
610+
) = await transaction_manager.create_request(transaction=revo_transaction)
611+
except (StorageError, TransactionManagerError) as err:
612+
raise RevocationManagerError(err.roll_up) from err
613+
614+
responder = self._profile.inject_or(BaseResponder)
615+
if not responder:
616+
raise RevocationManagerError(
617+
"No responder found. Unable to send transaction request"
618+
)
619+
await responder.send(
620+
revo_transaction_request,
621+
connection_id=connection.connection_id,
622+
)
623+
624+
async with self._profile.session() as session:
625+
rev_reg_records = await session.handle.fetch_all(
626+
IssuerRevRegRecord.RECORD_TYPE
627+
)
628+
# Cycle through all rev_rev_def records to find the offending accumulator
629+
for rev_reg_entry in rev_reg_records:
630+
ledger = session.inject_or(BaseLedger)
631+
# Get the value from the ledger
632+
async with ledger:
633+
(accum_response, _) = await ledger.get_revoc_reg_delta(
634+
rev_reg_entry.value_json["revoc_reg_id"]
635+
)
636+
accum = accum_response.get("value", {}).get("accum")
637+
638+
# If the accum from the ledger matches the error message, fix it
639+
if accum and accum in err_msg:
640+
await check_retry(accum)
641+
642+
# Get the genesis transactions needed for fix
643+
genesis_transactions = get_genesis_transactions()
644+
645+
# We know this needs endorsement
646+
endorser_did, connection = await self._get_endorser_info()
647+
rev_reg_record, recovery_txn = await sync_accumulator(session=session)
648+
await create_and_send_endorser_txn()
649+
650+
# Some time in between re-tries
651+
await asyncio.sleep(1)

acapy_agent/revocation/routes.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,6 +1557,10 @@ def register_events(event_bus: EventBus):
15571557
re.compile(f"^{REVOCATION_EVENT_PREFIX}{REVOCATION_ENTRY_EVENT}.*"),
15581558
on_revocation_entry_event,
15591559
)
1560+
event_bus.subscribe(
1561+
re.compile(f"^{REVOCATION_EVENT_PREFIX}REV_REG_ENTRY_TXN_FAILED.*"),
1562+
on_rev_reg_entry_txn_failed,
1563+
)
15601564

15611565

15621566
async def on_revocation_registry_init_event(profile: Profile, event: Event):
@@ -1747,6 +1751,12 @@ async def on_revocation_registry_endorsed_event(profile: Profile, event: Event):
17471751
)
17481752

17491753

1754+
async def on_rev_reg_entry_txn_failed(profile: Profile, event: Event):
1755+
"""Handle revocation registry entry transaction failed event."""
1756+
manager = RevocationManager(profile)
1757+
await manager.fix_and_publish_from_invalid_accum_err(event.payload.get("msg"))
1758+
1759+
17501760
class TailsDeleteResponseSchema(OpenAPISchema):
17511761
"""Return schema for tails deletion."""
17521762

acapy_agent/revocation/tests/test_manager.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
import json
22
from unittest import IsolatedAsyncioTestCase
33

4+
from uuid_utils import uuid4
5+
6+
from ...cache.base import BaseCache
7+
from ...cache.in_memory import InMemoryCache
48
from ...connections.models.conn_record import ConnRecord
9+
from ...indy.credx.issuer import CATEGORY_REV_REG
510
from ...indy.issuer import IndyIssuer
11+
from ...ledger.base import BaseLedger
12+
from ...messaging.responder import BaseResponder
613
from ...protocols.issue_credential.v1_0.models.credential_exchange import (
714
V10CredentialExchange,
815
)
@@ -14,6 +21,7 @@
1421
from ...utils.testing import create_test_profile
1522
from .. import manager as test_module
1623
from ..manager import RevocationManager, RevocationManagerError
24+
from ..models.issuer_rev_reg_record import IssuerRevRegRecord
1725

1826
TEST_DID = "LjgpST2rjsoxYegQDRm7EL"
1927
SCHEMA_NAME = "bc-reg"
@@ -889,3 +897,94 @@ async def test_set_revoked_state_v2(self):
889897
session, crev_record.record_id
890898
)
891899
assert check_crev_record.state == IssuerCredRevRecord.STATE_REVOKED
900+
901+
@mock.patch.object(
902+
ConnRecord,
903+
"retrieve_by_id",
904+
mock.CoroutineMock(
905+
return_value=mock.MagicMock(
906+
connection_id="endorser-id",
907+
metadata_get=mock.CoroutineMock(
908+
return_value={"endorser_did": "test_endorser_did"}
909+
),
910+
)
911+
),
912+
)
913+
@mock.patch.object(
914+
IssuerRevRegRecord,
915+
"fix_ledger_entry",
916+
mock.CoroutineMock(
917+
return_value=(
918+
"1 ...",
919+
{
920+
"ver": "1.0",
921+
"value": {
922+
"prevAccum": "1 ...",
923+
"accum": "fixed-accum",
924+
"issued": [1],
925+
},
926+
},
927+
[1],
928+
)
929+
),
930+
)
931+
async def test_fix_and_publish_from_invalid_accum_err(
932+
self,
933+
):
934+
# Setup
935+
self.profile.context.injector.bind_instance(BaseCache, InMemoryCache())
936+
self.profile.context.injector.bind_instance(
937+
BaseResponder, mock.MagicMock(BaseResponder, autospec=True)
938+
)
939+
mock_ledger = mock.MagicMock(BaseLedger, autospec=True)
940+
mock_ledger.get_revoc_reg_delta = mock.CoroutineMock(
941+
side_effect=[
942+
({"value": {"accum": "other-accum"}}, None),
943+
({"value": {"accum": "invalid-accum"}}, None),
944+
]
945+
)
946+
mock_ledger.send_revoc_reg_entry = mock.CoroutineMock(
947+
return_value={"result": {"txn": "..."}}
948+
)
949+
self.profile.context.injector.bind_instance(BaseLedger, mock_ledger)
950+
self.profile.context.settings.set_value(
951+
"ledger.genesis_transactions", {"txn": "..."}
952+
)
953+
self.profile.context.settings.set_value("endorser.endorser_alias", "endorser")
954+
955+
async with self.profile.session() as session:
956+
# Add an endorser connection
957+
await session.handle.insert(
958+
ConnRecord.RECORD_TYPE,
959+
name="endorser",
960+
value_json={"connection_id": "endorser", "alias": "endorser"},
961+
)
962+
record = ConnRecord(
963+
alias="endorser",
964+
)
965+
await record.save(session)
966+
967+
# Add a couple rev reg records
968+
for _ in range(2):
969+
await session.handle.insert(
970+
IssuerRevRegRecord.RECORD_TYPE,
971+
name=str(uuid4()),
972+
value_json={
973+
"revoc_reg_id": "test-rr-id",
974+
},
975+
)
976+
977+
# Need a matching revocation_reg record
978+
await session.handle.insert(
979+
CATEGORY_REV_REG,
980+
name="test-rr-id",
981+
value_json={
982+
"value": {
983+
"accum": "invalid-accum",
984+
"revoked": [1],
985+
}
986+
},
987+
)
988+
989+
# Execute
990+
await self.manager.fix_and_publish_from_invalid_accum_err("invalid-accum")

acapy_agent/revocation/util.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,12 @@ async def notify_pending_cleared_event(
7676
"""Send notification of credential revoked as issuer."""
7777
topic = f"{REVOCATION_EVENT_PREFIX}{REVOCATION_CLEAR_PENDING_EVENT}::{rev_reg_id}"
7878
await profile.notify(topic, {"rev_reg_id": rev_reg_id})
79+
80+
81+
async def notify_rev_reg_entry_txn_failed(
82+
profile: Profile,
83+
msg: str,
84+
):
85+
"""Send notification that a revocation registry entry transaction failed."""
86+
topic = f"{REVOCATION_EVENT_PREFIX}REV_REG_ENTRY_TXN_FAILED"
87+
await profile.notify(topic, {"msg": msg})

0 commit comments

Comments
 (0)