Source code for cryptnox_cli.lib.cryptos.keystore

# flake8: noqa
# -*- coding: utf-8 -*-
"""
HD wallet structures, serialization, derivation, and key store helpers.
"""
#!/usr/bin/env python2
# -*- mode: python -*-
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2016  The Electrum developers with changes by pycryptotools developers
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from unicodedata import normalize

from .wallet_utils import pw_encode, pw_decode, hfu, InvalidPassword
from .mnemonic import *
from .deterministic import *
from .main import *



[docs] class KeyStore(object):
[docs] def __init__(self, coin, addresses=()): self.coin=coin self.addresses = list(addresses) self.root_derivation = None self.bip39_prefixes = () self.xtype = '' self.electrum = False
[docs] def has_seed(self): return False
[docs] def is_watching_only(self): return False
[docs] def can_import(self): return False
[docs] def get_tx_derivations(self, tx): keypairs = {} for txin in tx.inputs(): num_sig = txin.get('num_sig') if num_sig is None: continue x_signatures = txin['signatures'] signatures = [sig for sig in x_signatures if sig] if len(signatures) == num_sig: # input is complete continue for k, x_pubkey in enumerate(txin['x_pubkeys']): if x_signatures[k] is not None: # this pubkey already signed continue derivation = self.get_pubkey_derivation(x_pubkey) if not derivation: continue keypairs[x_pubkey] = derivation return keypairs
[docs] def can_sign(self, tx): if self.is_watching_only(): return False return bool(self.get_tx_derivations(tx))
[docs] class Software_KeyStore(KeyStore):
[docs] def may_have_password(self): return not self.is_watching_only()
"""TODO def sign_message(self, sequence, message, password=None): privkey, compressed = self.get_private_key(sequence, password) return ecdsa_raw_sign(message, privkey) def decrypt_message(self, sequence, message, password=None): privkey, compressed = self.get_private_key(sequence, password) return ecdsa_raw_recover(message) def sign_transaction(self, tx, password=None): if self.is_watching_only(): return # Raise if password is not correct. self.check_password(password) # Add private keys keypairs = self.get_tx_derivations(tx) for k, v in keypairs.items(): keypairs[k] = self.get_private_key(v, password) # Sign if keypairs: tx.sign(keypairs)"""
[docs] class Imported_KeyStore(Software_KeyStore): # keystore for imported private keys
[docs] def __init__(self, d, coin): Software_KeyStore.__init__(self, coin) self.keypairs = d.get('keypairs', {})
[docs] def is_deterministic(self): return False
[docs] def can_change_password(self): return True
[docs] def get_master_public_key(self): return None
[docs] def dump(self): return { 'type': 'imported', 'keypairs': self.keypairs, }
[docs] def can_import(self): return True
[docs] def check_password(self, password=None): pubkey = list(self.keypairs.keys())[0] self.get_private_key(pubkey, password)
[docs] def import_privkey(self, sec, password=None): pubkey = bip32_privtopub(sec, self.bip39_prefixes) self.keypairs[pubkey] = pw_encode(sec, password) return "p2pkh", pubkey
[docs] def delete_imported_key(self, key): self.keypairs.pop(key)
[docs] def get_private_key(self, pubkey, password=None): sec = pw_decode(self.keypairs[pubkey], password) privkey = bip32_extract_key(sec, self.bip39_prefixes) # this checks the password if pubkey != privtopub(privkey): raise InvalidPassword() return privkey, True
[docs] def get_pubkey_derivation(self, x_pubkey): if get_pubkey_format(x_pubkey) in ['bin', 'bin_compressed']: if x_pubkey in self.keypairs.keys(): return x_pubkey elif x_pubkey[0:2] == 'fd': addr = self.coin.p2sh_scriptaddr(x_pubkey[2:]) if addr in self.addresses: return self.addresses[addr].get('pubkey')
[docs] def update_password(self, old_password, new_password): self.check_password(old_password) if new_password == '': new_password = None for k, v in self.keypairs.items(): b = pw_decode(v, old_password) c = pw_encode(b, new_password) self.keypairs[k] = c
[docs] class Deterministic_KeyStore(Software_KeyStore):
[docs] def __init__(self, d, coin): Software_KeyStore.__init__(self, coin) self.seed = d.get('seed', '') self.passphrase = d.get('passphrase', '')
[docs] def is_deterministic(self): return True
[docs] def dump(self): d = {} if self.seed: d['seed'] = self.seed if self.passphrase: d['passphrase'] = self.passphrase return d
[docs] def has_seed(self): return bool(self.seed)
[docs] def is_watching_only(self): return not self.has_seed()
[docs] def can_change_password(self): return not self.is_watching_only()
[docs] def add_seed(self, seed): if self.seed: raise Exception("a seed exists") self.seed = self.format_seed(seed)
[docs] def get_seed(self, password): return pw_decode(self.seed, password)
[docs] def get_passphrase(self, password): return pw_decode(self.passphrase, password) if self.passphrase else ''
[docs] class Xpub:
[docs] def __init__(self): self.xpub = None self.xpub_receive = None self.xpub_change = None
[docs] def get_master_public_key(self): return self.xpub
[docs] def derive_pubkey(self, for_change, n): xpub = self.xpub_change if for_change else self.xpub_receive if xpub is None: xpub = bip32_ckd(self.xpub, 1 if for_change else 0, self.bip39_prefixes) if for_change: self.xpub_change = xpub else: self.xpub_receive = xpub return self.get_pubkey_from_xpub(xpub, (n,), self.bip39_prefixes)
[docs] @classmethod def get_pubkey_from_xpub(self, xpub, sequence, bip39_prefixes): return bip32_derive_key(xpub, sequence, bip39_prefixes)
"""needed? def get_xpubkey(self, c, i): s = ''.join(map(lambda x: bitcoin.int_to_hex(x,2), (c, i))) return 'ff' + bh2u(b58check_to_hex(self.xpub)) + s @classmethod def parse_xpubkey(self, pubkey): assert pubkey[0:2] == 'ff' pk = bfh(pubkey) pk = pk[1:] xkey = bin_to_b58check(pk[0:78]) dd = pk[78:] s = [] while dd: n = int(bitcoin.rev_hex(bh2u(dd[0:2])), 16) dd = dd[2:] s.append(n) assert len(s) == 2 return xkey, s def get_pubkey_derivation(self, x_pubkey): if x_pubkey[0:2] != 'ff': return xpub, derivation = self.parse_xpubkey(x_pubkey) if self.xpub != xpub: return return derivation"""
[docs] class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
[docs] def __init__(self, d, coin): Xpub.__init__(self) Deterministic_KeyStore.__init__(self, d, coin) self.xpub = d.get('xpub') self.xprv = d.get('xprv')
[docs] def format_seed(self, seed): return ' '.join(seed.split())
[docs] def dump(self): d = Deterministic_KeyStore.dump(self) d['type'] = 'bip32' d['xpub'] = self.xpub d['xprv'] = self.xprv return d
[docs] def get_master_private_key(self, password=None): return pw_decode(self.xprv, password)
[docs] def check_password(self, password=None): xprv = pw_decode(self.xprv, password) if bip32_deserialize(xprv, self.bip39_prefixes)[4] != bip32_deserialize(self.xpub, self.bip39_prefixes)[4]: raise InvalidPassword()
[docs] def update_password(self, old_password, new_password,): self.check_password(old_password) if new_password == '': new_password = None if self.has_seed(): decoded = self.get_seed(old_password) self.seed = pw_encode(decoded, new_password) if self.passphrase: decoded = self.get_passphrase(old_password) self.passphrase = pw_encode(decoded, new_password) if self.xprv is not None: b = pw_decode(self.xprv, old_password) self.xprv = pw_encode(b, new_password)
[docs] def is_watching_only(self): return self.xprv is None
[docs] def add_xprv(self, xprv): self.xprv = xprv self.xpub = bip32_privtopub(xprv, self.bip39_prefixes)
[docs] def add_xpub(self, xpub, xtype, electrum=False): self.xtype = xtype self.electrum = electrum self.bip39_prefixes = (encode(self.coin.electrum_xprv_headers[xtype], 256, 4), encode(self.coin.electrum_xpub_headers[xtype], 256, 4)) if electrum else ( encode(self.coin.xprv_headers[xtype], 256, 4), encode(self.coin.xpub_headers[xtype], 256, 4)) self.xpub = xpub
[docs] def add_xprv_from_seed(self, bip32_seed, xtype, derivation, electrum=False): self.root_derivation = derivation self.xtype = xtype self.electrum = electrum self.bip39_prefixes = (encode(self.coin.electrum_xprv_headers[xtype], 256, 4), encode(self.coin.electrum_xpub_headers[xtype], 256, 4)) if electrum else ( encode(self.coin.xprv_headers[xtype], 256, 4), encode(self.coin.xpub_headers[xtype], 256, 4)) xprv = bip32_master_key(bip32_seed, self.bip39_prefixes) xprv = bip32_ckd(xprv, derivation, self.bip39_prefixes) self.add_xprv(xprv)
[docs] def get_private_key(self, sequence, password): xprv = self.get_master_private_key(password) pk = bip32_derive_key(xprv, sequence, self.bip39_prefixes) return pk, True
[docs] class Hardware_KeyStore(KeyStore, Xpub): # Derived classes must set: # - device # - DEVICE_IDS # - wallet_type #restore_wallet_class = BIP32_RD_Wallet max_change_outputs = 1
[docs] def __init__(self, d, coin): Xpub.__init__(self, coin) KeyStore.__init__(self, coin) # Errors and other user interaction is done through the wallet's # handler. The handler is per-window and preserved across # device reconnects self.xpub = d.get('xpub') self.label = d.get('label') self.derivation = d.get('derivation') self.handler = None
[docs] def set_label(self, label): self.label = label
[docs] def may_have_password(self): return False
[docs] def is_deterministic(self): return True
[docs] def dump(self): return { 'type': 'hardware', 'hw_type': self.hw_type, 'xpub': self.xpub, 'derivation':self.derivation, 'label':self.label, }
[docs] def unpaired(self): '''A device paired with the wallet was diconnected. This can be called in any thread context.''' print("unpaired")
[docs] def paired(self): '''A device paired with the wallet was (re-)connected. This can be called in any thread context.''' print("paired")
[docs] def can_export(self): return False
[docs] def is_watching_only(self): '''The wallet is not watching-only; the user will be prompted for pin and passphrase as appropriate when needed.''' assert not self.has_seed() return False
[docs] def can_change_password(self): return False
[docs] def bip39_to_seed(mnemonic, passphrase): return mnemonic_to_seed(mnemonic, passphrase)
# returns tuple (is_checksum_valid, is_wordlist_valid)
[docs] def bip39_is_checksum_valid(mnemonic): words = [ normalize('NFKD', word) for word in mnemonic.split()] words_len = len(words) wordlist = wordlist_english n = len(wordlist) checksum_length = 11*words_len//33 entropy_length = 32*checksum_length i = 0 words.reverse() while words: w = words.pop() try: k = wordlist.index(w) except ValueError: return False, False i = i*n + k if words_len not in [12, 15, 18, 21, 24]: return False, True entropy = i >> checksum_length checksum = i % 2**checksum_length h = '{:x}'.format(entropy) while len(h) < entropy_length/4: h = '0'+h b = bytearray.fromhex(h) hashed = int(hfu(hashlib.sha256(b).digest()), 16) calculated_checksum = hashed >> (256 - checksum_length) return checksum == calculated_checksum, True
[docs] def from_bip39_seed(seed, passphrase, derivation, coin): k = BIP32_KeyStore({}, coin) bip32_seed = bip39_to_seed(seed, passphrase) xtype = xtype_from_derivation(derivation) k.add_xprv_from_seed(bip32_seed, xtype, derivation) return k
[docs] def standard_from_bip39_seed(seed, passphrase, coin): derivation = "m/44'/%s'/0'" % coin.hd_path return from_bip39_seed(seed, passphrase, derivation, coin)
[docs] def p2wpkh_from_bip39_seed(seed, passphrase, coin): derivation = "m/84'/%s'/0'" % coin.hd_path return from_bip39_seed(seed, passphrase, derivation, coin)
[docs] def p2wpkh_p2sh_from_bip39_seed(seed, passphrase, coin): derivation = "m/49'/%s'/0'" % coin.hd_path return from_bip39_seed(seed, passphrase, derivation, coin)
[docs] def xtype_from_derivation(derivation): """Returns the script type to be used for this derivation.""" if derivation.startswith("m/84'"): return 'p2wpkh' elif derivation.startswith("m/49'"): return 'p2wpkh-p2sh' else: return 'p2pkh'
# extended pubkeys
[docs] def is_xpubkey(x_pubkey): return x_pubkey[0:2] == 'ff'
[docs] def parse_xpubkey(x_pubkey): assert x_pubkey[0:2] == 'ff' return BIP32_KeyStore.parse_xpubkey(x_pubkey)
[docs] def xpubkey_to_address(x_pubkey, coin): if x_pubkey[0:2] == 'fd': address = coin.p2sh_scriptaddr(x_pubkey[2:]) return x_pubkey, address if x_pubkey[0:2] in ['02', '03', '04']: pubkey = x_pubkey elif x_pubkey[0:2] == 'ff': xpub, s = BIP32_KeyStore.parse_xpubkey(x_pubkey) pubkey = BIP32_KeyStore.get_pubkey_from_xpub(xpub, s) else: raise BaseException("Cannot parse pubkey") address = coin.pubtoaddr(pubkey) return pubkey, address
[docs] def xpubkey_to_pubkey(x_pubkey, coin): pubkey, address = xpubkey_to_address(x_pubkey, coin) return pubkey
hw_keystores = {}
[docs] def register_keystore(hw_type, constructor): hw_keystores[hw_type] = constructor
[docs] def hardware_keystore(d): hw_type = d['hw_type'] if hw_type in hw_keystores: constructor = hw_keystores[hw_type] return constructor(d) raise BaseException('unknown hardware type', hw_type)
[docs] def is_address_list(text, coin): parts = text.split() return bool(parts) and all(coin.is_address(x) for x in parts)
[docs] def get_private_keys(text): parts = text.split('\n') parts = map(lambda x: ''.join(x.split()), parts) parts = list(filter(bool, parts)) if bool(parts) and all(bitcoin.is_private_key(x) for x in parts): return parts
[docs] def is_private_key_list(text): return bool(get_private_keys(text))
is_mpk = lambda x: is_xpub(x) is_private = lambda x: is_seed(x) or is_xprv(x) or is_private_key_list(x) is_master_key = lambda x: is_xprv(x) or is_xpub(x) is_private_key = lambda x: is_xprv(x) or is_private_key_list(x) is_bip32_key = lambda x: is_xprv(x) or is_xpub(x)
[docs] def from_electrum_seed(seed, passphrase, is_p2sh, coin): t = seed_type(seed) if t in ['standard', 'segwit']: keystore = BIP32_KeyStore({}, coin) keystore.add_seed(seed) keystore.passphrase = passphrase bip32_seed = electrum_mnemonic_to_seed(seed, passphrase) if t == 'standard': der = "m/" xtype = 'p2pkh' else: der = "m/1'/" if is_p2sh else "m/0'/" xtype = 'p2wsh' if is_p2sh else 'p2wpkh' keystore.add_xprv_from_seed(bip32_seed, xtype, der, electrum=True) else: raise BaseException(t) return keystore
[docs] def from_private_key_list(text, coin): keystore = Imported_KeyStore({}, coin) for x in get_private_keys(text): keystore.import_key(x, None) return keystore
[docs] def from_xpub(xpub, coin, xtype, electrum=False): k = BIP32_KeyStore({}, coin) k.add_xpub(xpub, xtype, electrum=electrum) return k
[docs] def from_xprv(xprv, coin): xpub = bip32_privtopub(xprv, coin.bip39_prefixes) k = BIP32_KeyStore({}, coin) k.xprv = xprv k.xpub = xpub return k
[docs] def from_master_key(text, coin): prefixes = coin.bip39_prefixes if is_xprv(text, prefixes): k = from_xprv(text, coin) elif is_xpub(text, prefixes): k = from_xpub(text, coin) else: raise BaseException('Invalid key') return k