Source code for cashscript_py.helpers.transaction

"""Provides functionality for transaction serialization."""

import os

from contextlib import suppress

from bitcash.cashaddress import InvalidAddress
from bitcash.format import address_to_public_key_hash

from cashscript_py.helpers.bch_opcodes import OpcodesBCH
from cashscript_py.helpers.cashaddress import (
    address_string_to_locking_bytecode,
)
from cashscript_py.helpers.cashtoken import is_token_address, serialize_token_prefix
from cashscript_py.helpers.crypto import hash256
from cashscript_py.helpers.data_encoding import int_to_bytes, var_int_bytes
from cashscript_py.interfaces import HashType, Output, Transaction, UnlockableUtxo


def _get_sequence(utxo: UnlockableUtxo) -> bytes:
    """Return the 4-byte little-endian sequence for an input, defaulting to 0xFFFFFFFE."""
    default = 0xFFFFFFFE
    sequence = default if utxo.sequence is None else utxo.sequence
    return int_to_bytes(sequence, 4)


[docs] def serialize_input( unlockable_utxo: UnlockableUtxo, tx: Transaction, source_outputs: list[Output], input_index: int, ) -> bytes: """Serialize a single input for a transaction.""" prev_output = bytes.fromhex(unlockable_utxo.txid)[::-1] + int_to_bytes(unlockable_utxo.vout, 4) unlocking_script: bytes = unlockable_utxo.unlocking_bytecode script_length: bytes = var_int_bytes(len(unlocking_script)) sequence: bytes = _get_sequence(unlockable_utxo) return prev_output + script_length + unlocking_script + sequence
[docs] def serialize_output(output: Output) -> bytes: """Serialize a single output to bytes for transaction use.""" value_bytes = int_to_bytes(output.amount, 8) # Check the type of `to` and convert to locking bytecode if need be locking_bytecode = output.to if isinstance(output.to, bytes) else address_string_to_locking_bytecode(output.to) # Serialize token data, if present, and generate the complete script token_prefix = serialize_token_prefix(output.token) if output.token else b"" full_script = token_prefix + locking_bytecode script_length = var_int_bytes(len(full_script)) return value_bytes + script_length + full_script
[docs] def serialize_transaction(tx: Transaction) -> bytes: """Encodes a transaction object into a byte string suitable for transmission or signing.""" version_bytes = int_to_bytes(tx.version, 4) # Precompute source outputs source_outputs = [ Output(utxo.unlocker.generate_locking_bytecode(), utxo.satoshis, utxo.token) for utxo in tx.inputs ] # Serialize transaction components serialized_inputs = b"".join( serialize_input(input, tx, source_outputs, index) for index, input in enumerate(tx.inputs) ) serialized_outputs = b"".join(serialize_output(output) for output in tx.outputs) input_count = var_int_bytes(len(tx.inputs)) output_count = var_int_bytes(len(tx.outputs)) locktime_bytes = int_to_bytes(tx.locktime, 4) # Concatenate to form the complete transaction return version_bytes + input_count + serialized_inputs + output_count + serialized_outputs + locktime_bytes
[docs] def create_op_return_output(op_return_data: list[str]) -> Output: """Create an OP_RETURN output from a list of UTF-8 or hex (0x-prefixed) chunks.""" def to_bin(data: str) -> bytes: return bytes.fromhex(data[2:]) if data.startswith("0x") else data.encode("utf-8") def pushdata_opcode(chunk: bytes) -> bytes: # - empty: OP_PUSHDATA_1 0x00 # - <76 bytes: single-byte length # - <256 bytes: OP_PUSHDATA_1 + length # - >=256: unsupported n = len(chunk) if n == 0: return bytes([OpcodesBCH["OP_PUSHDATA_1"], 0x00]) if n < 76: return bytes([n]) if n < 256: return bytes([OpcodesBCH["OP_PUSHDATA_1"], n]) raise ValueError("Pushdata too large") script = bytearray([OpcodesBCH["OP_RETURN"]]) for data in op_return_data: chunk = to_bin(data) script += pushdata_opcode(chunk) script += chunk locking_script = bytes(script) return Output(locking_script, 0)
# See https://documentation.cash/protocol/forks/replay-protected-sighash.html
[docs] def create_preimage( tx: Transaction, source_outputs: list[Output], input_index: int, covered_bytecode: bytes, hash_type: int, # int (e.g., 0x41 for SIGHASH_ALL | FORKID) ) -> bytes: """Build the sighash preimage for a given input. Args: tx: Transaction template. source_outputs: Source outputs for inputs (used for SIGHASH_UTXOS). input_index: Index of the input being signed. covered_bytecode: Locking script being covered by the signature. hash_type: Full hashtype flags. Returns: The serialized preimage bytes. """ # Extract flags for conditional hashing base_type = hash_type & 0x1F # SIGHASH_ALL/NONE/SINGLE is_anyone_can_pay = (hash_type & HashType.SIGHASH_ANYONECANPAY.value) != 0 is_utxos = (hash_type & HashType.SIGHASH_UTXOS.value) != 0 # 1. Version (4 bytes LE) parts = [int_to_bytes(tx.version, 4)] # 2. hashPrevouts (32 bytes): zero if ANYONECANPAY if is_anyone_can_pay: hash_prevouts = bytes(32) prevouts_len = 0 prevouts_ser = b"" else: prevouts_ser = b"".join(bytes.fromhex(inp.txid)[::-1] + int_to_bytes(inp.vout, 4) for inp in tx.inputs) prevouts_len = len(prevouts_ser) hash_prevouts = hash256(prevouts_ser) parts.append(hash_prevouts) # 3. hashUtxos (32 bytes, if SIGHASH_UTXOS) if is_utxos: utxos_ser = b"".join(serialize_output(so) for so in source_outputs) utxos_len = len(utxos_ser) hash_utxos = hash256(utxos_ser) parts.append(hash_utxos) else: utxos_len = 0 utxos_ser = b"" # 4. hashSequence (32 bytes): zero if ANYONECANPAY or not SIGHASH_ALL if is_anyone_can_pay or base_type != HashType.SIGHASH_ALL.value: hash_sequence = bytes(32) sequences_len = 0 sequences_ser = b"" else: sequences_ser = b"".join(_get_sequence(inp) for inp in tx.inputs) sequences_len = len(sequences_ser) hash_sequence = hash256(sequences_ser) parts.append(hash_sequence) # 5. Outpoint (32 txid LE + 4 vout LE) current_input = tx.inputs[input_index] outpoint = bytes.fromhex(current_input.txid)[::-1] + int_to_bytes(current_input.vout, 4) parts.append(outpoint) # 6. Input token prefix (if any, for this input) input_token = source_outputs[input_index].token input_token_prefix = serialize_token_prefix(input_token) parts.append(input_token_prefix) # 7. Script (varint len + bytecode) script_len = var_int_bytes(len(covered_bytecode)) parts.append(script_len + covered_bytecode) # 8. Value (8 bytes LE, from source_outputs) parts.append(int_to_bytes(source_outputs[input_index].amount, 8)) # 9. Sequence (4 bytes LE) parts.append(_get_sequence(current_input)) # 10. hashOutputs (32 bytes): depends on base_type if base_type == HashType.SIGHASH_SINGLE.value and input_index < len(tx.outputs): # Hash of single corresponding output outputs_ser = serialize_output(tx.outputs[input_index]) outputs_len = len(outputs_ser) hash_outputs = hash256(outputs_ser) elif base_type == HashType.SIGHASH_NONE.value: # All zeros - no hash outputs_ser = b"" outputs_len = 0 hash_outputs = bytes(32) else: # Hash of all outputs outputs_ser = b"".join(serialize_output(out) for out in tx.outputs) outputs_len = len(outputs_ser) hash_outputs = hash256(outputs_ser) parts.append(hash_outputs) # 11. Locktime (4 bytes LE) parts.append(int_to_bytes(tx.locktime, 4)) # 12. Hash type (4 bytes LE) parts.append(int_to_bytes(hash_type, 4)) # Concatenate all parts preimage = b"".join(parts) # Debugging info if os.environ.get("CASHSCRIPT_PY_DEBUG") == "1": with suppress(Exception): covered_hash = hash256(covered_bytecode) preimage_hash = hash256(preimage) print( f"[preimage-info] prevouts_len={prevouts_len} utxos_len={utxos_len} " f"sequences_len={sequences_len} outputs_len={outputs_len}" ) print( f"[preimage-info] covered_script_len={len(covered_bytecode)} covered_script_hash={covered_hash.hex()}" ) print(f"[preimage-info] preimage_len={len(preimage)} preimage_hash={preimage_hash.hex()}") print(f"[preimage] {preimage.hex()}") return preimage
[docs] class OutputSatoshisNonPositiveError(Exception): """Raised when an output amount is zero or negative.""" def __init__(self, amount: int): """Initialize with the offending amount.""" super().__init__(f"Output amount {amount} must be greater than zero") self.amount = amount
[docs] class OutputSatoshisTooSmallError(Exception): """Raised when an output amount is below the dust threshold.""" def __init__(self, amount: int, minimum: int): """Initialize with the offending amount and computed minimum.""" super().__init__(f"Output amount {amount} is below dust minimum {minimum}") self.amount = amount self.minimum = minimum
[docs] class TokensToNonTokenAddressError(Exception): """Raised when attempting to send CashTokens to a non-token-aware address.""" def __init__(self, address: str): """Initialize with the destination address.""" super().__init__(f"Tokens cannot be sent to non-token address: {address}") self.address = address
[docs] class OutputTokenAmountTooSmallError(Exception): """Raised when a token amount is negative.""" def __init__(self, amount: int): """Initialize with the offending token amount.""" super().__init__(f"Output token amount must be non-negative, got {amount}") self.amount = amount
[docs] class OutputAddressInvalid(Exception): """Raised when an output address cannot be parsed.""" def __init__(self, address: str): """Initialize with the invalid address string.""" super().__init__(f"Output address is invalid, got {address}.") self.address = address
[docs] def validate_output(output: Output) -> None: """Validate an Output (address parsing, dust threshold, token rules).""" if isinstance(output.to, str): try: _ = address_to_public_key_hash(output.to) except InvalidAddress as e: raise OutputAddressInvalid(f"Cannot parse address '{output.to}'. {e}") from e if output.amount <= 0: raise OutputSatoshisNonPositiveError(output.amount) minimum_amount = calculate_dust(output) if output.amount < minimum_amount: raise OutputSatoshisTooSmallError(output.amount, int(minimum_amount)) if output.token: if isinstance(output.to, str) and not is_token_address(output.to): raise TokensToNonTokenAddressError(output.to) if output.token.amount < 0: raise OutputTokenAmountTooSmallError(output.token.amount)
[docs] def get_output_size(output: Output) -> int: """Return the serialized size (bytes) of an output.""" return len(serialize_output(output))
[docs] def calculate_dust(output: Output) -> int: """Compute dust minimum for a given output.""" return 444 + (get_output_size(output) * 3)