"""Script assembly, parsing, and serialization helpers for BCH Script."""
from typing import Any
from cashscript_py.helpers.bch_opcodes import OpcodesBCH
from cashscript_py.helpers.data_encoding import flatten_bin_array, hex_to_bin
uint8Bytes = 1
uint16Bytes = 2
uint32Bytes = 4
OP_PUSHDATA1 = 0x4C # 76 in decimal
OP_PUSHDATA2 = 0x4D # 77 in decimal
OP_PUSHDATA4 = 0x4E # 78 in decimal
Script = list[bytes | int]
[docs]
def encode_int(integer: int) -> bytes:
"""Encode an integer as a minimally-encoded VM Script number (little-endian).
Args:
integer: Signed integer to encode.
Returns:
Minimally-encoded number:
- Zero encodes to empty bytes.
- Positive values are little-endian with minimal length.
- Negative values set the sign bit in the most significant byte.
"""
if integer == 0:
return bytearray()
bytes_list = []
is_negative = integer < 0
byte_states = 0xFF
bits_per_byte = 8
remaining = -integer if is_negative else integer
while remaining > 0:
bytes_list.append(int(remaining & byte_states))
remaining >>= bits_per_byte
sign_flipping_byte = 0x80
if bytes_list[-1] & sign_flipping_byte:
bytes_list.append(sign_flipping_byte if is_negative else 0x00)
elif is_negative:
bytes_list[-1] |= sign_flipping_byte
return bytearray(bytes_list)
def length_bytes_for_push_opcode(opcode: int) -> int:
"""Return the number of length bytes used by a push opcode.
Args:
opcode: Opcode value.
Returns:
0 for immediate pushes (< OP_PUSHDATA1), or 1/2/4 for OP_PUSHDATA1/2/4.
Raises:
ValueError: If the opcode is not a valid push opcode.
"""
if opcode < OP_PUSHDATA1:
return 0
elif opcode == OP_PUSHDATA1:
return uint8Bytes
elif opcode == OP_PUSHDATA2:
return uint16Bytes
elif opcode == OP_PUSHDATA4:
return uint32Bytes
else:
raise ValueError("Invalid opcode for push operation")
def read_little_endian_number(script: bytes, index: int, length: int) -> int:
"""Read a little-endian unsigned integer from a byte sequence.
Args:
script: Source bytecode.
index: Starting offset.
length: Number of bytes to read (1, 2, or 4).
Returns:
Decoded integer.
Raises:
ValueError: If length is not one of {1, 2, 4}.
"""
if length not in [1, 2, 4]:
raise ValueError("Invalid length for little endian number")
slice_of_bytes = script[index : index + length]
return int.from_bytes(bytes(slice_of_bytes), byteorder="little")
def read_authentication_instruction(script: bytes, index: int) -> dict[str, Any]:
"""Parse a single authentication instruction from bytecode.
Behavior:
- Non-push opcode: returns just {'opcode': <int>} and advances by 1.
- Push opcode: reads length and data; returns {'opcode', 'data'} and next index.
- Malformed length or data marks the instruction with 'malformed' and
includes 'expectedLengthBytes' or 'expectedDataBytes'.
Args:
script: Full bytecode.
index: Current parse position.
Returns:
A dict with keys:
- 'instruction': data for the parsed instruction.
- 'nextIndex': index at which the next instruction begins.
"""
opcode = script[index]
if opcode > OP_PUSHDATA4:
return {"instruction": {"opcode": opcode}, "nextIndex": index + 1}
length_bytes = length_bytes_for_push_opcode(opcode)
if length_bytes != 0 and index + length_bytes >= len(script):
slice_start = index + 1
slice_end = slice_start + length_bytes
return {
"instruction": {
"expectedLengthBytes": length_bytes,
"length": script[slice_start:slice_end],
"malformed": True,
"opcode": opcode,
},
"nextIndex": slice_end,
}
data_bytes = opcode if length_bytes == 0 else read_little_endian_number(script, index + 1, length_bytes)
data_start = index + 1 + length_bytes
data_end = data_start + data_bytes
instruction_data = {"data": script[data_start:data_end], "opcode": opcode}
if data_end > len(script):
instruction_data.update({"expectedDataBytes": data_end - data_start, "malformed": True})
return {"instruction": instruction_data, "nextIndex": data_end}
def serialize_script_token(token: bytes | int) -> bytes:
"""Serialize an opcode or data push to bytecode.
Args:
token: Opcode (int) or data bytes.
Returns:
Serialized bytes following BCH push rules:
- Empty data -> OP_0
- Single-byte small integers (1..16) -> OP_1..OP_16, 0x81 -> OP_1NEGATE
- Otherwise immediate push, or PUSHDATA1/2/4 as required by length.
"""
if isinstance(token, int):
return bytes([token])
else:
maximum_push_byte_operation_size = 75
push_number_opcodes = 16
negative_one = 129
OP_1NEGATE = 79
maximum_push_data1_size = 255
maximum_push_data2_size = 65535
push_number_opcodes_offset = 80
if len(token) <= maximum_push_byte_operation_size:
if len(token) == 0:
return bytes([0])
elif len(token) == 1:
if 0 < token[0] <= push_number_opcodes:
return bytes([token[0] + push_number_opcodes_offset])
elif token[0] == negative_one:
return bytes([OP_1NEGATE])
else:
return bytes([1]) + token
else:
return bytes([len(token)]) + token
elif len(token) <= maximum_push_data1_size:
return bytes([OP_PUSHDATA1, len(token)]) + token
elif len(token) <= maximum_push_data2_size:
return bytes([OP_PUSHDATA2]) + len(token).to_bytes(2, byteorder="little") + token
else:
return bytes([OP_PUSHDATA4]) + len(token).to_bytes(4, byteorder="little") + token
[docs]
def serialize_script(script: Script) -> bytes:
"""Serialize a Script list into bytecode.
Args:
script: Sequence of tokens (opcodes or data pushes).
Returns:
Concatenated bytecode for the script.
"""
serialized_script = [serialize_script_token(token) for token in script]
return flatten_bin_array(serialized_script)
[docs]
def parse_bytecode(bytecode: bytes) -> list[dict[str, bytes | int]]:
"""Parse bytecode into a list of instruction descriptors.
Args:
bytecode: Raw script bytecode.
Returns:
List of instruction dicts as produced by read_authentication_instruction.
"""
instructions = []
i = 0
while i < len(bytecode):
result = read_authentication_instruction(bytecode, i)
instruction = result["instruction"]
i = result["nextIndex"]
instructions.append(instruction)
return instructions
[docs]
def asm_to_script(asm: str) -> Script:
"""Convert ASM text into a Script list.
Notes:
- Whitespace is normalized, then tokens are split on spaces.
- Non-OP_* tokens are treated as hex-encoded data.
Args:
asm: Assembly text (e.g., 'OP_DUP OP_HASH160 <hex> OP_EQUALVERIFY OP_CHECKSIG').
Returns:
Script token list.
Raises:
ValueError: If an opcode is unknown or hex decoding fails.
"""
asm = " ".join(asm.split())
asm_tokens = asm.split(" ")
script: Script = []
for token in asm_tokens:
if token.startswith("OP_"):
opcode = OpcodesBCH.get(token)
if opcode is None:
raise ValueError(f"Opcode {token} not found in OpcodesBCH")
script.append(opcode)
else:
data_push: bytes = hex_to_bin(token)
script.append(data_push)
return script
[docs]
def generate_redeem_script(base_script: Script, encoded_args: list[bytes]) -> Script:
"""Prepend constructor-encoded args to a base script to build the redeem script.
Args:
base_script: Compiled script tokens.
encoded_args: Constructor args (ABI-encoded bytes).
Returns:
Arguments reversed, followed by base_script.
"""
reversed_encoded_args: Script = list(reversed(encoded_args))
return reversed_encoded_args + base_script
[docs]
def count_opcodes(script: Script) -> int:
"""Count non-push opcodes (> OP_16) in a script (small integers excluded).
Args:
script: Script token list.
Returns:
The number of non-push opcodes.
"""
OP_16_value = OpcodesBCH["OP_16"]
filtered_opcodes = [op for op in script if isinstance(op, int) and op > OP_16_value]
return len(filtered_opcodes)
[docs]
def calculate_bytesize(script: Script) -> int:
"""Compute the serialized byte length of a script.
Args:
script: Script token list.
Returns:
Length in bytes of the serialized script.
"""
bytecode = serialize_script(script)
return len(bytecode)
__all__ = [
"asm_to_script",
"calculate_bytesize",
"count_opcodes",
"create_input_script",
"encode_int",
"generate_redeem_script",
"parse_bytecode",
"serialize_script",
]