Python源码示例:Crypto.Cipher.AES.MODE_CTR
示例1
def iter_transform(filename, key):
"""Generate encrypted file with given key.
This generator function reads the file
in chunks and encrypts them using AES-CTR,
with the specified key.
:param filename: The name of the file to encrypt.
:type filename: str
:param key: The key used to encrypt the file.
:type key: str
:returns: A generator that produces encrypted file chunks.
:rtype: generator
"""
# We are not specifying the IV here.
aes = AES.new(key, AES.MODE_CTR, counter=Counter.new(128))
with open(filename, 'rb+') as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), b''):
yield aes.encrypt(chunk), f
示例2
def decrypt_package(self, dec_material, data):
if not self._good:
logging.warning('well, it is hard to decrypt with a wrong key.')
if not dec_material.encMsgV3:
logging.error('cannot decrypt with an empty encMsgV3!')
return None
salt = dec_material.encMsgV3[:32]
counter_iv = dec_material.encMsgV3[32:]
key = PBKDF2(self._bkey, salt, Decryptor.dklen, Decryptor.count,
Decryptor.prf, hmac_hash_module=None)
counter_obj = Counter.new(128, initial_value=int.from_bytes(
counter_iv, byteorder='big'), little_endian=False)
decryptor = AES.new(key, mode=AES.MODE_CTR, counter=counter_obj)
return decryptor.decrypt(data)
示例3
def decrypt_large_package(self, dec_material, entry):
if not self._good:
logging.warning('well, it is hard to decrypt with a wrong key.')
if not dec_material.encMsgV3:
logging.error('cannot decrypt with an empty encMsgV3!')
return None
salt = dec_material.encMsgV3[:32]
counter_iv = dec_material.encMsgV3[32:]
key = PBKDF2(self._bkey, salt, Decryptor.dklen, Decryptor.count,
Decryptor.prf, hmac_hash_module=None)
counter_obj = Counter.new(128, initial_value=int.from_bytes(
counter_iv, byteorder='big'), little_endian=False)
decryptor = AES.new(key, mode=AES.MODE_CTR, counter=counter_obj)
data_len = entry.stat().st_size
with open(entry, 'rb') as entry_fd:
for x in range(0, data_len, self.chunk_size):
logging.debug('decrypting chunk %d of %s', x, entry)
data = entry_fd.read(self.chunk_size)
yield decryptor.decrypt(data)
示例4
def decrypt_file(self, dec_material, data):
if not self._good:
logging.warning('well, it is hard to decrypt with a wrong key.')
if not dec_material.iv:
logging.error('cannot decrypt with an empty iv!')
return None
counter_obj = Counter.new(
128,
initial_value=int.from_bytes(dec_material.iv, byteorder='big'),
little_endian=False)
decryptor = AES.new(
self._bkey_sha256, mode=AES.MODE_CTR, counter=counter_obj)
return decryptor.decrypt(data)
# --- DecryptInfo -------------------------------------------------------------
示例5
def test_initial_value_bytes_parameter(self):
# Same result as when passing an integer
cipher1 = AES.new(self.key_128, AES.MODE_CTR,
nonce=self.nonce_64,
initial_value=b"\x00"*6+b"\xFF\xFF")
cipher2 = AES.new(self.key_128, AES.MODE_CTR,
nonce=self.nonce_64, initial_value=0xFFFF)
pt = get_tag_random("plaintext", 65536)
self.assertEqual(cipher1.encrypt(pt), cipher2.encrypt(pt))
# Fail if the iv is too large
self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
initial_value=b"5"*17)
self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
nonce=self.nonce_64, initial_value=b"5"*9)
# Fail if the iv is too short
self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
initial_value=b"5"*15)
self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
nonce=self.nonce_64, initial_value=b"5"*7)
示例6
def test_wrap_around(self):
# Counter is only 8 bits, so we can only encrypt/decrypt 256 blocks (=4096 bytes)
counter = Counter.new(8, prefix=bchr(9) * 15)
max_bytes = 4096
cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
cipher.encrypt(b'9' * max_bytes)
self.assertRaises(OverflowError, cipher.encrypt, b'9')
cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
self.assertRaises(OverflowError, cipher.encrypt, b'9' * (max_bytes + 1))
cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
cipher.decrypt(b'9' * max_bytes)
self.assertRaises(OverflowError, cipher.decrypt, b'9')
cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
self.assertRaises(OverflowError, cipher.decrypt, b'9' * (max_bytes + 1))
示例7
def test_output_param(self):
pt = b'5' * 16
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
ct = cipher.encrypt(pt)
output = bytearray(16)
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
res = cipher.encrypt(pt, output=output)
self.assertEqual(ct, output)
self.assertEqual(res, None)
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
res = cipher.decrypt(ct, output=output)
self.assertEqual(pt, output)
self.assertEqual(res, None)
示例8
def test_output_param_neg(self):
pt = b'5' * 16
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
ct = cipher.encrypt(pt)
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
self.assertRaises(TypeError, cipher.encrypt, pt, output=b'0'*16)
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
self.assertRaises(TypeError, cipher.decrypt, ct, output=b'0'*16)
shorter_output = bytearray(15)
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
self.assertRaises(ValueError, cipher.encrypt, pt, output=shorter_output)
cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
self.assertRaises(ValueError, cipher.decrypt, ct, output=shorter_output)
示例9
def test_aes_128(self):
plaintext = '6bc1bee22e409f96e93d7e117393172a' +\
'ae2d8a571e03ac9c9eb76fac45af8e51' +\
'30c81c46a35ce411e5fbc1191a0a52ef' +\
'f69f2445df4f9b17ad2b417be66c3710'
ciphertext = '874d6191b620e3261bef6864990db6ce' +\
'9806f66b7970fdff8617187bb9fffdff' +\
'5ae4df3edbd5d35e5b4f09020db03eab' +\
'1e031dda2fbe03d1792170a0f3009cee'
key = '2b7e151628aed2a6abf7158809cf4f3c'
counter = Counter.new(nbits=16,
prefix=unhexlify('f0f1f2f3f4f5f6f7f8f9fafbfcfd'),
initial_value=0xfeff)
key = unhexlify(key)
plaintext = unhexlify(plaintext)
ciphertext = unhexlify(ciphertext)
cipher = AES.new(key, AES.MODE_CTR, counter=counter)
self.assertEqual(cipher.encrypt(plaintext), ciphertext)
cipher = AES.new(key, AES.MODE_CTR, counter=counter)
self.assertEqual(cipher.decrypt(ciphertext), plaintext)
示例10
def test_aes_192(self):
plaintext = '6bc1bee22e409f96e93d7e117393172a' +\
'ae2d8a571e03ac9c9eb76fac45af8e51' +\
'30c81c46a35ce411e5fbc1191a0a52ef' +\
'f69f2445df4f9b17ad2b417be66c3710'
ciphertext = '1abc932417521ca24f2b0459fe7e6e0b' +\
'090339ec0aa6faefd5ccc2c6f4ce8e94' +\
'1e36b26bd1ebc670d1bd1d665620abf7' +\
'4f78a7f6d29809585a97daec58c6b050'
key = '8e73b0f7da0e6452c810f32b809079e562f8ead2522c6b7b'
counter = Counter.new(nbits=16,
prefix=unhexlify('f0f1f2f3f4f5f6f7f8f9fafbfcfd'),
initial_value=0xfeff)
key = unhexlify(key)
plaintext = unhexlify(plaintext)
ciphertext = unhexlify(ciphertext)
cipher = AES.new(key, AES.MODE_CTR, counter=counter)
self.assertEqual(cipher.encrypt(plaintext), ciphertext)
cipher = AES.new(key, AES.MODE_CTR, counter=counter)
self.assertEqual(cipher.decrypt(ciphertext), plaintext)
示例11
def _set_key(self, key):
self.key = key
self._cipher = AES.new(key, AES.MODE_CTR, counter=self.counter)
示例12
def dosend(self, msg):
if self.k!='':
self.r += 1
ctr_e=Counter.new(128, initial_value=self.r)
a=AES.new(key=self.k,mode=AES.MODE_CTR,counter=ctr_e)
pad = 16 - len(msg)%16
plain = msg + chr(pad)*pad
msg = ''
for i in range(len(plain)/16):
msg += a.encrypt(plain[i*16:(i+1)*16])
self.request.sendall(msg)
示例13
def dorecv(self):
msg = self.request.recv(1024)
assert len(msg)>0
if self.k!='':
self.r += 1
ctr_e=Counter.new(128, initial_value=self.r)
a=AES.new(key=self.k,mode=AES.MODE_CTR,counter=ctr_e)
cmsg = msg
msg = ''
assert len(cmsg)%16==0
for i in range(len(cmsg)/16):
msg += a.decrypt(cmsg[i*16:(i+1)*16])
msg = msg[:-ord(msg[-1])]
return msg
示例14
def dosend(self, msg):
if self.k!='':
self.r += 1
ctr_e=Counter.new(128, initial_value=self.r)
a=AES.new(key=self.k,mode=AES.MODE_CTR,counter=ctr_e)
pad = 16 - len(msg)%16
plain = msg + chr(pad)*pad
msg = ''
for i in range(len(plain)/16):
msg += a.encrypt(plain[i*16:(i+1)*16])
self.request.sendall(msg)
示例15
def dorecv(self):
msg = self.request.recv(1024)
assert len(msg)>0
if self.k!='':
self.r += 1
ctr_e=Counter.new(128, initial_value=self.r)
a=AES.new(key=self.k,mode=AES.MODE_CTR,counter=ctr_e)
cmsg = msg
msg = ''
assert len(cmsg)%16==0
for i in range(len(cmsg)/16):
msg += a.decrypt(cmsg[i*16:(i+1)*16])
tmp = ord(msg[-1])
for i in range(len(msg)-tmp,len(msg)):
assert ord(msg[i]) == tmp
msg = msg[:-ord(msg[-1])]
return msg
示例16
def try_use_pycrypto_or_pycryptodome_module():
from Crypto.Cipher import AES
from Crypto.Util import Counter
def create_aes_ctr(key, iv):
ctr = Counter.new(128, initial_value=iv)
return AES.new(key, AES.MODE_CTR, counter=ctr)
def create_aes_cbc(key, iv):
return AES.new(key, AES.MODE_CBC, iv)
return create_aes_ctr, create_aes_cbc
示例17
def encrypt(self, data):
if not self.check_result :
self.logger.error('使用AES256解密加密数据失败,原因:' + self.err_msg)
return (False, self.err_msg)
if not self.b_password or self.b_password is None :
return (False, "加密密码为空")
data = obj2bytes(data)
if data[0] :
b_plaintext = data[1]
else :
return data
b_salt = os.urandom(32)
b_key1, b_key2, b_iv = self._gen_key_initctr(self.b_password, b_salt)
bs = AES.block_size
padding_length = (bs - len(data) % bs) or bs
temp = obj2bytes(padding_length * chr(padding_length))
if temp[0] :
temp = temp[1]
else :
return temp
b_plaintext += temp
ctr = Counter.new(128, initial_value=int(b_iv, 16))
cipher = AES.new(b_key1, AES.MODE_CTR, counter=ctr)
b_ciphertext = cipher.encrypt(b_plaintext)
hmac = HMAC.new(b_key2, b_ciphertext, SHA256)
b_temp_ciphertext = b'\n'.join([hexlify(b_salt), string2bytes(hmac.hexdigest()), hexlify(b_ciphertext)])
b_new_ciphertext = hexlify(b_temp_ciphertext)
return self._handle_result(b_new_ciphertext)
示例18
def decrypt(self, data):
if not self.check_result :
self.logger.error('使用AES256解密加密数据失败,原因:' + self.err_msg)
return (False, self.err_msg)
if not self.b_password or self.b_password is None :
return (False, "加密密码为空")
data = obj2bytes(data)
if data[0] :
data = data[1]
else :
return data
ciphertext = unhexlify(data)
b_salt, b_cryptedHmac, b_ciphertext = ciphertext.split(b"\n", 2)
b_salt = unhexlify(b_salt)
b_ciphertext = unhexlify(b_ciphertext)
b_key1, b_key2, b_iv = self._gen_key_initctr(self.b_password, b_salt)
hmacDecrypt = HMAC.new(b_key2, b_ciphertext, SHA256)
if not self._is_equal(b_cryptedHmac, bytes2string(hmacDecrypt.hexdigest())):
self.logger.error('使用AES256解密加密数据失败,原因:密码错误')
return (False, "解密失败,密码错误")
ctr = Counter.new(128, initial_value=int(b_iv, 16))
cipher = AES.new(b_key1, AES.MODE_CTR, counter=ctr)
b_plaintext = cipher.decrypt(b_ciphertext)
padding_length = b_plaintext[-1]
b_plaintext = b_plaintext[:-padding_length]
return self._handle_result(b_plaintext)
示例19
def __init__(self, key, iv, counter_wraparound=False):
"""Initialize AES with the given key and IV.
If counter_wraparound is set to True, the AES-CTR counter will
wraparound to 0 when it overflows.
"""
assert(len(key) == 16)
assert(len(iv) == 16)
self.ctr = Counter.new(128, initial_value=long(iv.encode('hex'), 16),
allow_wraparound=counter_wraparound)
self.cipher = AES.new(key, AES.MODE_CTR, counter=self.ctr)
示例20
def test_nist(self):
# Prepare the cipher
key = "\x2b\x7e\x15\x16\x28\xae\xd2\xa6\xab\xf7\x15\x88\x09\xcf\x4f\x3c"
iv = "\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff"
self.ctr = Counter.new(128, initial_value=long(iv.encode('hex'), 16))
self.cipher = AES.new(key, AES.MODE_CTR, counter=self.ctr)
input_block = "\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff"
output_block = "\xec\x8c\xdf\x73\x98\x60\x7c\xb0\xf2\xd2\x16\x75\xea\x9e\xa1\xe4"
plaintext = "\x6b\xc1\xbe\xe2\x2e\x40\x9f\x96\xe9\x3d\x7e\x11\x73\x93\x17\x2a"
ciphertext = "\x87\x4d\x61\x91\xb6\x20\xe3\x26\x1b\xef\x68\x64\x99\x0d\xb6\xce"
self._helper_test_vector(input_block, output_block, plaintext, ciphertext)
input_block = "\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xff\x00"
output_block = "\x36\x2b\x7c\x3c\x67\x73\x51\x63\x18\xa0\x77\xd7\xfc\x50\x73\xae"
plaintext = "\xae\x2d\x8a\x57\x1e\x03\xac\x9c\x9e\xb7\x6f\xac\x45\xaf\x8e\x51"
ciphertext = "\x98\x06\xf6\x6b\x79\x70\xfd\xff\x86\x17\x18\x7b\xb9\xff\xfd\xff"
self._helper_test_vector(input_block, output_block, plaintext, ciphertext)
input_block = "\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xff\x01"
output_block = "\x6a\x2c\xc3\x78\x78\x89\x37\x4f\xbe\xb4\xc8\x1b\x17\xba\x6c\x44"
plaintext = "\x30\xc8\x1c\x46\xa3\x5c\xe4\x11\xe5\xfb\xc1\x19\x1a\x0a\x52\xef"
ciphertext = "\x5a\xe4\xdf\x3e\xdb\xd5\xd3\x5e\x5b\x4f\x09\x02\x0d\xb0\x3e\xab"
self._helper_test_vector(input_block, output_block, plaintext, ciphertext)
input_block = "\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xff\x02"
output_block = "\xe8\x9c\x39\x9f\xf0\xf1\x98\xc6\xd4\x0a\x31\xdb\x15\x6c\xab\xfe"
plaintext = "\xf6\x9f\x24\x45\xdf\x4f\x9b\x17\xad\x2b\x41\x7b\xe6\x6c\x37\x10"
ciphertext = "\x1e\x03\x1d\xda\x2f\xbe\x03\xd1\x79\x21\x70\xa0\xf3\x00\x9c\xee"
self._helper_test_vector(input_block, output_block, plaintext, ciphertext)
示例21
def _set_key(self, key):
self.key = key
self._cipher = AES.new(key, AES.MODE_CTR, counter=self.counter)
示例22
def decrypt_aes_ctr(ciphertext, key, iv):
ctr = Counter.new(128, initial_value=iv, allow_wraparound=True)
encryptor = AES.new(key, AES.MODE_CTR, counter=ctr)
return encryptor.decrypt(ciphertext)
示例23
def encrypt_aes_ctr(value, key, iv):
ctr = Counter.new(128, initial_value=iv, allow_wraparound=True)
encryptor = AES.new(key, AES.MODE_CTR, counter=ctr)
ciphertext = encryptor.encrypt(value)
return ciphertext
#
# Utility
#
示例24
def get_ssl_key(self):
self.check(0x3AE0, 0x140)
initial = self.data[0x3AE0 : 0x3AF0]
cipher = self.data[0x3AF0 : 0x3BF0]
kek = self.keyset.get("ssl_rsa_kek")
aes = AES.new(kek, AES.MODE_CTR, nonce=b"", initial_value=initial)
d = int.from_bytes(aes.decrypt(cipher), "big")
pubkey = self.get_ssl_cert().public_key()
rsa = RSA.construct((pubkey.n, pubkey.e, d))
der = rsa.export_key("DER")
return ssl.SSLPrivateKey.parse(der, ssl.TYPE_DER)
示例25
def new_cipher(self, key, iv):
"""
@param key: the secret key, a byte string
@param iv: the initialization vector, a byte string
@return: an initialized cipher object for this algo
"""
if (hasattr(self.cipher, 'MODE_CTR') and self.mode == self.cipher.MODE_CTR
or hasattr(self.cipher, 'MODE_GCM') and self.mode == self.cipher.MODE_GCM):
# in counter mode, the "iv" must be incremented for each block
# it is calculated like this:
# +---------+------------------+---------+
# | nonce | IV | counter |
# +---------+------------------+---------+
# m bytes n bytes 4 bytes
# <-------------------------------------->
# block_size
nonce_size = self.cipher.block_size - self.iv_size - 4
# instead of asking for an extra parameter, we extract the last
# nonce_size bytes of the key and use them as the nonce.
# +----------------------------+---------+
# | cipher key | nonce |
# +----------------------------+---------+
# <--------->
# nonce_size
cipher_key, nonce = key[:-nonce_size], key[-nonce_size:]
return self.cipher.new(cipher_key, self.mode,
counter=Counter.new(4 * 8, prefix=nonce + iv))
else:
return self.cipher.new(key, self.mode, iv)
示例26
def encrypt(hstr, payload, encryptionKey):
paddedEncryptionKey = pad(encryptionKey, AES.block_size)
key = base64.b64encode(paddedEncryptionKey)
cipher = AES.new(key, AES.MODE_CTR)
hstrBytes = base64.b64encode(cipher.encrypt(hstr)).decode('utf-8')
payloadBytes = base64.b64encode(cipher.encrypt(payload)).decode('utf-8')
nonce = base64.b64encode(cipher.nonce).decode('utf-8')
return [hstrBytes, payloadBytes, nonce]
示例27
def decrypt_flag(data, creation_time, charset='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'):
srand(creation_time)
rand_str = []
for _ in range(32):
rand_str.append(charset[rand() % len(charset)])
key = ''.join(rand_str)
counter = Counter.new(128, initial_value=0xe4b8c75accb877dab0f5a6f0a7aa0e67)
cipher = AES.new(key.encode('ascii'), AES.MODE_CTR, counter=counter)
return cipher.decrypt(data)
示例28
def encrypt_flag(data, creation_time, charset='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'):
srand(creation_time)
rand_str = []
for _ in range(32):
rand_str.append(charset[rand() % len(charset)])
key = ''.join(rand_str)
#counter = Counter.new(32, prefix=b'\x00\x00\x00\x00\x0B\xAD\xC0\xDE\xDE\xC0\xAD\x0B')
counter = Counter.new(128, initial_value=0xe4b8c75accb877dab0f5a6f0a7aa0e67)
cipher = AES.new(key.encode('ascii'), AES.MODE_CTR, counter=counter)
return cipher.encrypt(data)
示例29
def encrypt(self, init_value, plaintext, auth_data=b''):
if init_value >= (1 << 96):
raise InvalidInputException('IV should be 96-bit')
# a naive checking for IV reuse
if init_value == self.prev_init_value:
raise InvalidInputException('IV must not be reused!')
self.prev_init_value = init_value
len_plaintext = len(plaintext)
# len_auth_data = len(auth_data)
if len_plaintext > 0:
counter = Counter.new(
nbits=32,
prefix=long_to_bytes(init_value, 12),
initial_value=2, # notice this
allow_wraparound=False)
aes_ctr = AES.new(self.__master_key, AES.MODE_CTR, counter=counter)
if 0 != len_plaintext % 16:
padded_plaintext = plaintext + \
b'\x00' * (16 - len_plaintext % 16)
else:
padded_plaintext = plaintext
ciphertext = aes_ctr.encrypt(padded_plaintext)[:len_plaintext]
else:
ciphertext = b''
auth_tag = self.__ghash(auth_data, ciphertext)
# print 'GHASH\t', hex(auth_tag)
auth_tag ^= bytes_to_long(self.__aes_ecb.encrypt(
long_to_bytes((init_value << 32) | 1, 16)))
# assert len(ciphertext) == len(plaintext)
assert auth_tag < (1 << 128)
return ciphertext, auth_tag
示例30
def decrypt(self, init_value, ciphertext, auth_tag, auth_data=b''):
# if init_value >= (1 << 96):
# raise InvalidInputException('IV should be 96-bit')
# if auth_tag >= (1 << 128):
# raise InvalidInputException('Tag should be 128-bit')
if auth_tag != self.__ghash(auth_data, ciphertext) ^ \
bytes_to_long(self.__aes_ecb.encrypt(
long_to_bytes((init_value << 32) | 1, 16))):
raise InvalidTagException
len_ciphertext = len(ciphertext)
if len_ciphertext > 0:
counter = Counter.new(
nbits=32,
prefix=long_to_bytes(init_value, 12),
initial_value=2,
allow_wraparound=True)
aes_ctr = AES.new(self.__master_key, AES.MODE_CTR, counter=counter)
if 0 != len_ciphertext % 16:
padded_ciphertext = ciphertext + \
b'\x00' * (16 - len_ciphertext % 16)
else:
padded_ciphertext = ciphertext
plaintext = aes_ctr.decrypt(padded_ciphertext)[:len_ciphertext]
else:
plaintext = b''
return plaintext