Skip to content

zkevm: add ECRECOVER, SHA2-256, IDENTITY and RIPEMD precompiles #1524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 142 additions & 30 deletions tests/zkevm/test_worst_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,37 @@
import pytest

from ethereum_test_forks import Fork
from ethereum_test_tools import Alloc, Block, BlockchainTestFiller, Environment, Transaction
from ethereum_test_tools import (
Address,
Alloc,
Block,
BlockchainTestFiller,
Bytecode,
Environment,
Transaction,
)
from ethereum_test_tools.vm.opcode import Opcodes as Op

REFERENCE_SPEC_GIT_PATH = "TODO"
REFERENCE_SPEC_VERSION = "TODO"

MAX_CODE_SIZE = 24 * 1024
KECCAK_RATE = 136
ECRECOVER_GAS_COST = 3_000


@pytest.mark.valid_from("Cancun")
@pytest.mark.parametrize(
"gas_limit",
[
36_000_000,
],
)
def test_worst_keccak(
blockchain_test: BlockchainTestFiller,
pre: Alloc,
fork: Fork,
gas_limit: int,
):
"""Test running a block with as many KECCAK256 permutations as possible."""
env = Environment(gas_limit=gas_limit)
env = Environment()

# Intrinsic gas cost is paid once.
intrinsic_gas_calculator = fork.transaction_intrinsic_cost_calculator()
available_gas = gas_limit - intrinsic_gas_calculator()
available_gas = env.gas_limit - intrinsic_gas_calculator()

gsc = fork.gas_costs()
mem_exp_gas_calculator = fork.memory_expansion_gas_calculator()
Expand Down Expand Up @@ -90,11 +92,8 @@ def test_worst_keccak(

tx = Transaction(
to=code_address,
gas_limit=gas_limit,
gas_price=10,
gas_limit=env.gas_limit,
sender=pre.fund_eoa(),
data=[],
value=0,
)

blockchain_test(
Expand All @@ -105,22 +104,94 @@ def test_worst_keccak(
)


@pytest.mark.zkevm
@pytest.mark.valid_from("Cancun")
@pytest.mark.parametrize(
"gas_limit",
"address,static_cost,per_word_dynamic_cost,bytes_per_unit_of_work",
[
Environment().gas_limit,
pytest.param(0x02, 60, 12, 64, id="SHA2-256"),
pytest.param(0x03, 600, 120, 64, id="RIPEMD-160"),
pytest.param(0x04, 15, 3, 1, id="IDENTITY"),
],
)
def test_worst_precompile_only_data_input(
blockchain_test: BlockchainTestFiller,
pre: Alloc,
fork: Fork,
address: Address,
static_cost: int,
per_word_dynamic_cost: int,
bytes_per_unit_of_work: int,
):
"""Test running a block with as many precompile calls which have a single `data` input."""
env = Environment()

# Intrinsic gas cost is paid once.
intrinsic_gas_calculator = fork.transaction_intrinsic_cost_calculator()
available_gas = env.gas_limit - intrinsic_gas_calculator()

gsc = fork.gas_costs()
mem_exp_gas_calculator = fork.memory_expansion_gas_calculator()

# Discover the optimal input size to maximize precompile work, not precompile calls.
max_work = 0
optimal_input_length = 0
for input_length in range(1, 1_000_000, 32):
parameters_gas = (
gsc.G_BASE # PUSH0 = arg offset
+ gsc.G_BASE # PUSH0 = arg size
+ gsc.G_BASE # PUSH0 = arg size
+ gsc.G_VERY_LOW # PUSH0 = arg offset
+ gsc.G_VERY_LOW # PUSHN = address
+ gsc.G_BASE # GAS
)
iteration_gas_cost = (
parameters_gas
+ +static_cost # Precompile static cost
+ math.ceil(input_length / 32) * per_word_dynamic_cost # Precompile dynamic cost
+ gsc.G_BASE # POP
)
# From the available gas, we substract the mem expansion costs considering we know the
# current input size length.
available_gas_after_expansion = max(
0, available_gas - mem_exp_gas_calculator(new_bytes=input_length)
)
# Calculate how many calls we can do.
num_calls = available_gas_after_expansion // iteration_gas_cost
total_work = num_calls * math.ceil(input_length / bytes_per_unit_of_work)

# If we found an input size that is better (reg permutations/gas), then save it.
if total_work > max_work:
max_work = total_work
optimal_input_length = input_length

calldata = Op.CODECOPY(0, 0, optimal_input_length)
attack_block = Op.POP(Op.STATICCALL(Op.GAS, address, 0, optimal_input_length, 0, 0))
code = code_loop_precompile_call(calldata, attack_block)

code_address = pre.deploy_contract(code=code)

tx = Transaction(
to=code_address,
gas_limit=env.gas_limit,
sender=pre.fund_eoa(),
)

blockchain_test(
env=env,
pre=pre,
post={},
blocks=[Block(txs=[tx])],
)


@pytest.mark.valid_from("Cancun")
def test_worst_modexp(
blockchain_test: BlockchainTestFiller,
pre: Alloc,
fork: Fork,
gas_limit: int,
):
"""Test running a block with as many MODEXP calls as possible."""
env = Environment(gas_limit=gas_limit)
env = Environment()

base_mod_length = 32
exp_length = 32
Expand All @@ -144,23 +215,48 @@ def test_worst_modexp(
iter_complexity = exp.bit_length() - 1
gas_cost = math.floor((mul_complexity * iter_complexity) / 3)
attack_block = Op.POP(Op.STATICCALL(gas_cost, 0x5, 0, 32 * 6, 0, 0))
code = code_loop_precompile_call(calldata, attack_block)

# The attack contract is: JUMPDEST + [attack_block]* + PUSH0 + JUMP
jumpdest = Op.JUMPDEST
jump_back = Op.JUMP(len(calldata))
max_iters_loop = (MAX_CODE_SIZE - len(calldata) - len(jumpdest) - len(jump_back)) // len(
attack_block
code_address = pre.deploy_contract(code=code)

tx = Transaction(
to=code_address,
gas_limit=env.gas_limit,
sender=pre.fund_eoa(),
)
code = calldata + jumpdest + sum([attack_block] * max_iters_loop) + jump_back
if len(code) > MAX_CODE_SIZE:
# Must never happen, but keep it as a sanity check.
raise ValueError(f"Code size {len(code)} exceeds maximum code size {MAX_CODE_SIZE}")

code_address = pre.deploy_contract(code=code)
blockchain_test(
env=env,
pre=pre,
post={},
blocks=[Block(txs=[tx])],
)


@pytest.mark.valid_from("Cancun")
def test_worst_ecrecover(
blockchain_test: BlockchainTestFiller,
pre: Alloc,
fork: Fork,
):
"""Test running a block with as many ECRECOVER calls as possible."""
env = Environment()

# Calldata
calldata = (
Op.MSTORE(0 * 32, 0x38D18ACB67D25C8BB9942764B62F18E17054F66A817BD4295423ADF9ED98873E)
+ Op.MSTORE(1 * 32, 27)
+ Op.MSTORE(2 * 32, 0x38D18ACB67D25C8BB9942764B62F18E17054F66A817BD4295423ADF9ED98873E)
+ Op.MSTORE(3 * 32, 0x789D1DD423D25F0772D2748D60F7E4B81BB14D086EBA8E8E8EFB6DCFF8A4AE02)
)

attack_block = Op.POP(Op.STATICCALL(ECRECOVER_GAS_COST, 0x1, 0, 32 * 4, 0, 0))
code = code_loop_precompile_call(calldata, attack_block)
code_address = pre.deploy_contract(code=bytes(code))

tx = Transaction(
to=code_address,
gas_limit=gas_limit,
gas_limit=env.gas_limit,
sender=pre.fund_eoa(),
)

Expand All @@ -170,3 +266,19 @@ def test_worst_modexp(
post={},
blocks=[Block(txs=[tx])],
)


def code_loop_precompile_call(calldata: Bytecode, attack_block: Bytecode):
"""Create a code loop that calls a precompile with the given calldata."""
# The attack contract is: CALLDATA_PREP + #JUMPDEST + [attack_block]* + JUMP(#)
jumpdest = Op.JUMPDEST
jump_back = Op.JUMP(len(calldata))
max_iters_loop = (MAX_CODE_SIZE - len(calldata) - len(jumpdest) - len(jump_back)) // len(
attack_block
)
code = calldata + jumpdest + sum([attack_block] * max_iters_loop) + jump_back
if len(code) > MAX_CODE_SIZE:
# Must never happen, but keep it as a sanity check.
raise ValueError(f"Code size {len(code)} exceeds maximum code size {MAX_CODE_SIZE}")

return code
Loading