Source code for cashscript_py.helpers.script

"""Script assembly, parsing, and serialization helpers for BCH Script."""

from typing import Any

from cashscript_py.helpers.bch_opcodes import OpcodesBCH
from cashscript_py.helpers.data_encoding import flatten_bin_array, hex_to_bin

uint8Bytes = 1
uint16Bytes = 2
uint32Bytes = 4

OP_PUSHDATA1 = 0x4C  # 76 in decimal
OP_PUSHDATA2 = 0x4D  # 77 in decimal
OP_PUSHDATA4 = 0x4E  # 78 in decimal


Script = list[bytes | int]


[docs] def encode_int(integer: int) -> bytes: """Encode an integer as a minimally-encoded VM Script number (little-endian). Args: integer: Signed integer to encode. Returns: Minimally-encoded number: - Zero encodes to empty bytes. - Positive values are little-endian with minimal length. - Negative values set the sign bit in the most significant byte. """ if integer == 0: return bytearray() bytes_list = [] is_negative = integer < 0 byte_states = 0xFF bits_per_byte = 8 remaining = -integer if is_negative else integer while remaining > 0: bytes_list.append(int(remaining & byte_states)) remaining >>= bits_per_byte sign_flipping_byte = 0x80 if bytes_list[-1] & sign_flipping_byte: bytes_list.append(sign_flipping_byte if is_negative else 0x00) elif is_negative: bytes_list[-1] |= sign_flipping_byte return bytearray(bytes_list)
def length_bytes_for_push_opcode(opcode: int) -> int: """Return the number of length bytes used by a push opcode. Args: opcode: Opcode value. Returns: 0 for immediate pushes (< OP_PUSHDATA1), or 1/2/4 for OP_PUSHDATA1/2/4. Raises: ValueError: If the opcode is not a valid push opcode. """ if opcode < OP_PUSHDATA1: return 0 elif opcode == OP_PUSHDATA1: return uint8Bytes elif opcode == OP_PUSHDATA2: return uint16Bytes elif opcode == OP_PUSHDATA4: return uint32Bytes else: raise ValueError("Invalid opcode for push operation") def read_little_endian_number(script: bytes, index: int, length: int) -> int: """Read a little-endian unsigned integer from a byte sequence. Args: script: Source bytecode. index: Starting offset. length: Number of bytes to read (1, 2, or 4). Returns: Decoded integer. Raises: ValueError: If length is not one of {1, 2, 4}. """ if length not in [1, 2, 4]: raise ValueError("Invalid length for little endian number") slice_of_bytes = script[index : index + length] return int.from_bytes(bytes(slice_of_bytes), byteorder="little") def read_authentication_instruction(script: bytes, index: int) -> dict[str, Any]: """Parse a single authentication instruction from bytecode. Behavior: - Non-push opcode: returns just {'opcode': <int>} and advances by 1. - Push opcode: reads length and data; returns {'opcode', 'data'} and next index. - Malformed length or data marks the instruction with 'malformed' and includes 'expectedLengthBytes' or 'expectedDataBytes'. Args: script: Full bytecode. index: Current parse position. Returns: A dict with keys: - 'instruction': data for the parsed instruction. - 'nextIndex': index at which the next instruction begins. """ opcode = script[index] if opcode > OP_PUSHDATA4: return {"instruction": {"opcode": opcode}, "nextIndex": index + 1} length_bytes = length_bytes_for_push_opcode(opcode) if length_bytes != 0 and index + length_bytes >= len(script): slice_start = index + 1 slice_end = slice_start + length_bytes return { "instruction": { "expectedLengthBytes": length_bytes, "length": script[slice_start:slice_end], "malformed": True, "opcode": opcode, }, "nextIndex": slice_end, } data_bytes = opcode if length_bytes == 0 else read_little_endian_number(script, index + 1, length_bytes) data_start = index + 1 + length_bytes data_end = data_start + data_bytes instruction_data = {"data": script[data_start:data_end], "opcode": opcode} if data_end > len(script): instruction_data.update({"expectedDataBytes": data_end - data_start, "malformed": True}) return {"instruction": instruction_data, "nextIndex": data_end} def serialize_script_token(token: bytes | int) -> bytes: """Serialize an opcode or data push to bytecode. Args: token: Opcode (int) or data bytes. Returns: Serialized bytes following BCH push rules: - Empty data -> OP_0 - Single-byte small integers (1..16) -> OP_1..OP_16, 0x81 -> OP_1NEGATE - Otherwise immediate push, or PUSHDATA1/2/4 as required by length. """ if isinstance(token, int): return bytes([token]) else: maximum_push_byte_operation_size = 75 push_number_opcodes = 16 negative_one = 129 OP_1NEGATE = 79 maximum_push_data1_size = 255 maximum_push_data2_size = 65535 push_number_opcodes_offset = 80 if len(token) <= maximum_push_byte_operation_size: if len(token) == 0: return bytes([0]) elif len(token) == 1: if 0 < token[0] <= push_number_opcodes: return bytes([token[0] + push_number_opcodes_offset]) elif token[0] == negative_one: return bytes([OP_1NEGATE]) else: return bytes([1]) + token else: return bytes([len(token)]) + token elif len(token) <= maximum_push_data1_size: return bytes([OP_PUSHDATA1, len(token)]) + token elif len(token) <= maximum_push_data2_size: return bytes([OP_PUSHDATA2]) + len(token).to_bytes(2, byteorder="little") + token else: return bytes([OP_PUSHDATA4]) + len(token).to_bytes(4, byteorder="little") + token
[docs] def serialize_script(script: Script) -> bytes: """Serialize a Script list into bytecode. Args: script: Sequence of tokens (opcodes or data pushes). Returns: Concatenated bytecode for the script. """ serialized_script = [serialize_script_token(token) for token in script] return flatten_bin_array(serialized_script)
[docs] def parse_bytecode(bytecode: bytes) -> list[dict[str, bytes | int]]: """Parse bytecode into a list of instruction descriptors. Args: bytecode: Raw script bytecode. Returns: List of instruction dicts as produced by read_authentication_instruction. """ instructions = [] i = 0 while i < len(bytecode): result = read_authentication_instruction(bytecode, i) instruction = result["instruction"] i = result["nextIndex"] instructions.append(instruction) return instructions
[docs] def asm_to_script(asm: str) -> Script: """Convert ASM text into a Script list. Notes: - Whitespace is normalized, then tokens are split on spaces. - Non-OP_* tokens are treated as hex-encoded data. Args: asm: Assembly text (e.g., 'OP_DUP OP_HASH160 <hex> OP_EQUALVERIFY OP_CHECKSIG'). Returns: Script token list. Raises: ValueError: If an opcode is unknown or hex decoding fails. """ asm = " ".join(asm.split()) asm_tokens = asm.split(" ") script: Script = [] for token in asm_tokens: if token.startswith("OP_"): opcode = OpcodesBCH.get(token) if opcode is None: raise ValueError(f"Opcode {token} not found in OpcodesBCH") script.append(opcode) else: data_push: bytes = hex_to_bin(token) script.append(data_push) return script
[docs] def generate_redeem_script(base_script: Script, encoded_args: list[bytes]) -> Script: """Prepend constructor-encoded args to a base script to build the redeem script. Args: base_script: Compiled script tokens. encoded_args: Constructor args (ABI-encoded bytes). Returns: Arguments reversed, followed by base_script. """ reversed_encoded_args: Script = list(reversed(encoded_args)) return reversed_encoded_args + base_script
[docs] def create_input_script(redeem_script: Script, complete_args: list[bytes], selector: int | None = None) -> bytes: """Create an unlocking script for a contract input. Args: redeem_script: Contract redeem script. complete_args: ABI-encoded function args (including signatures). selector: Optional function selector (script number) appended last. Returns: Serialized unlocking bytecode (pushes + serialized redeemScript). """ # Create unlock script / redeemScriptSig (add potential selector) unlock_script: Script = list(reversed(complete_args)) if selector is not None: unlock_script.append(encode_int(int(selector))) # Encode selector as script number # Create input script and compile it to bytecode input_script: Script = unlock_script + [serialize_script(redeem_script)] return serialize_script(input_script)
[docs] def count_opcodes(script: Script) -> int: """Count non-push opcodes (> OP_16) in a script (small integers excluded). Args: script: Script token list. Returns: The number of non-push opcodes. """ OP_16_value = OpcodesBCH["OP_16"] filtered_opcodes = [op for op in script if isinstance(op, int) and op > OP_16_value] return len(filtered_opcodes)
[docs] def calculate_bytesize(script: Script) -> int: """Compute the serialized byte length of a script. Args: script: Script token list. Returns: Length in bytes of the serialized script. """ bytecode = serialize_script(script) return len(bytecode)
__all__ = [ "asm_to_script", "calculate_bytesize", "count_opcodes", "create_input_script", "encode_int", "generate_redeem_script", "parse_bytecode", "serialize_script", ]