Python源码示例:Crypto.Cipher.AES.new()
示例1
def encrypt(self, text, appid):
"""对明文进行加密
@param text: 需要加密的明文
@return: 加密得到的字符串
"""
# 16位随机字符串添加到明文开头
len_str = struct.pack("I", socket.htonl(len(text.encode())))
# text = self.get_random_str() + binascii.b2a_hex(len_str).decode() + text + appid
text = self.get_random_str() + len_str + text.encode() + appid
# 使用自定义的填充方式对明文进行补位填充
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# 加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# 使用BASE64对加密后的字符串进行编码
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8')
except Exception as e:
return ierror.WXBizMsgCrypt_EncryptAES_Error, None
示例2
def unlock(vault_path, key):
"""
Unlock legacy vault and retrieve content
"""
f = open(vault_path, "rb")
try:
nonce, tag, ciphertext = [f.read(x) for x in (16, 16, -1)]
finally:
f.close()
# Unlock Vault with key
cipher = AES.new(get_hash(key), AES.MODE_EAX, nonce)
data = cipher.decrypt_and_verify(ciphertext, tag)
# Set vault content to class level var
return json.loads(data.decode("utf-8"))
示例3
def prepare_items(secrets, categories):
"""
Prepare all secrets to the new import format
"""
out = []
for secret in secrets:
out.append({
'name': secret.get('name'),
'url': None, # Not supported in legacy database
'login': secret.get('login'),
'password': secret.get('password'),
'notes': secret.get('notes'),
'category': get_category_name(secret.get('category'), categories),
})
return out
示例4
def decrypt_secret(data, key):
if not data:
return None
aeskey = ""
sha256 = SHA256.new()
sha256.update(key)
for i in range(1000):
sha256.update(data[28:60])
aeskey = sha256.digest()
secret = ""
aes = AES.new(aeskey)
for key_offset in range(0, len(data) - 60, 16):
if (key_offset + 16) <= len(data) - 60:
secret = secret + aes.decrypt(data[60 + key_offset:60 + key_offset + 16])
return secret
示例5
def decrypt_file(key, filename, chunk_size=24*1024):
output_filename = os.path.splitext(filename)[0]
with open(filename, 'rb') as infile:
origsize = struct.unpack('<Q', infile.read(struct.calcsize('Q')))[0]
iv = infile.read(16)
decryptor = AES.new(key, AES.MODE_CBC, iv)
with open(output_filename, 'wb') as outfile:
while True:
chunk = infile.read(chunk_size)
if len(chunk) == 0:
break
outfile.write(decryptor.decrypt(chunk))
outfile.truncate(origsize)
示例6
def decrypt(self, ciphertext: bytes) -> bytes:
"""Return plaintext for given ciphertext."""
# Split out the nonce, tag, and encrypted data.
nonce = ciphertext[:12]
if len(nonce) != 12:
raise DataIntegrityError("Cipher text is damaged: invalid nonce length")
tag = ciphertext[12:28]
if len(tag) != 16:
raise DataIntegrityError("Cipher text is damaged: invalid tag length")
encrypted = ciphertext[28:]
# Construct AES cipher, with old nonce.
cipher = AES.new(self.cipher_key, AES.MODE_GCM, nonce)
# Decrypt and verify.
try:
plaintext = cipher.decrypt_and_verify(encrypted, tag) # type: ignore
except ValueError as e:
raise DataIntegrityError("Cipher text is damaged: {}".format(e))
return plaintext
示例7
def __init__(self, key, iv=None):
iv = iv or key[:16]
super().__init__(AES.new(key, AES.MODE_CBC, iv))
示例8
def __init__(self, key):
super().__init__(AES.new(key, AES.MODE_ECB))
示例9
def _aes_decrypt(aes_key, aes_text) -> Text:
"""
使用密钥解密文本信息,将会自动填充空白字符
:param aes_key: 解密密钥
:param aes_text: 需要解密的文本
:return: 经过解密的数据
"""
# 初始化解码器
cipher = AES.new(_fill_16(aes_key), AES.MODE_ECB)
# 优先逆向解密十六进制为bytes
converted = a2b_base64(aes_text.replace('-', '/').replace("%3D", "=").encode())
# 使用aes解密密文
decrypt_text = str(cipher.decrypt(converted), encoding='utf-8').replace('\0', '')
# 返回执行结果
return decrypt_text.strip()
示例10
def _aes_encrypt(aes_key, aes_text) -> Text:
"""
使用密钥加密文本信息,将会自动填充空白字符
:param aes_key: 加密密钥
:param aes_text: 需要加密的文本
:return: 经过加密的数据
"""
# 初始化加密器
cipher = AES.new(_fill_16(aes_key), AES.MODE_ECB)
# 先进行aes加密
aes_encrypted = cipher.encrypt(_fill_16(aes_text))
# 使用十六进制转成字符串形式
encrypt_text = b2a_base64(aes_encrypted).decode().replace('/', '-').strip()
# 返回执行结果
return encrypt_text
示例11
def decrypt(self, text, appid):
"""对解密后的明文进行补位删除
@param text: 密文
@return: 删除填充补位后的明文
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# 使用BASE64对密文进行解码,然后AES-CBC解密
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_DecryptAES_Error, None
try:
# pad = ord(plain_text[-1])
pad = plain_text[-1]
# 去掉补位字符串
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# 去除16位随机字符串
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
xml_content = content[4: xml_len + 4]
from_appid = content[xml_len + 4:]
except Exception as e:
return ierror.WXBizMsgCrypt_IllegalBuffer, None
if from_appid != appid:
return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
return 0, xml_content.decode()
示例12
def encrypt(plaintext, key, blockSize):
iv = bytearray(os.urandom(blockSize))
iv = str(iv)
reIV = bytearray(os.urandom(blockSize))
reIV = str(reIV)
rivPlaintext = pad(reIV + str(plaintext), blockSize)
#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
cipher = AES.new(key, AES.MODE_CBC, IV=iv)
return iv + str(cipher.encrypt(rivPlaintext))
示例13
def decrypt(ciphertext, key, blockSize):
#print "ciphertext: " + base64.b64encode(ciphertext)
iv = ciphertext[0:blockSize]
#print "iv: " + base64.b64encode(iv)
#print "ciphertext: " + base64.b64encode(ciphertext)
rivCiphertext = ciphertext[blockSize:]
#print "rivCiphertext: " + base64.b64encode(rivCiphertext)
rivCiphertext = str(rivCiphertext)
#print "rivCiphertext: " + base64.b64encode(rivCiphertext)
cipher = AES.new(key, AES.MODE_CBC, IV=iv)
rivPlaintext = cipher.decrypt(rivCiphertext)
#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
rivPlaintext = str(rivPlaintext)
#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
rivPlaintext = unpad(rivPlaintext)
#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
#rivPlaintext = str(cipher.decrypt(str(rivCiphertext)))
#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
#print "rivPlaintext: " + base64.b64encode(rivPlaintext[blockSize:])
return rivPlaintext[blockSize:]
#return rivPlaintext
示例14
def get_hash(key):
"""
Returns a 32 bytes hash for a given master key
"""
h = SHA256.new()
for i in range(1, 10000):
h.update(str.encode(
str(i) + config.salt + key))
return base64.b64decode(str.encode(h.hexdigest()[:32]))
示例15
def digest_key(self):
"""
Use SHA-256 over our key to get a proper-sized AES key
"""
# Add optional salt to key
key = self.key
if self.salted_key:
key = self.salted_key
return SHA256.new(key).digest()
示例16
def get_aes(self, IV):
"""
AES instance
"""
return AES.new(self.digest_key(), AES.MODE_CBC, IV)
示例17
def decrypt(self, hex):
encoded = binascii.unhexlify(hex)
secret = self.get_secret()
BLOCK_SIZE = 16
mode = AES.MODE_ECB
cipher = AES.new(secret, mode)
return cipher.decrypt(encoded).split('\x00')[0]
示例18
def aes_encrypt(self, message, passphrase):
IV = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
aes = AES.new(passphrase, AES.MODE_CBC, IV)
return aes.encrypt(message)
# get value used to build the salt
示例19
def encrypt(self, key, msg):
from Crypto.Cipher import AES
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
enc = encodestring(cipher.encrypt(self.pad(msg)))
return enc
示例20
def decrypt(self, key, msg):
from Crypto.Cipher import AES
try:
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
plain = self.depad(cipher.decrypt(decodestring(msg)))
except:
return msg
try:
return eval(plain)
except SyntaxError:
return plain
示例21
def encrypt(self, key, msg):
from Crypto.Cipher import AES
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
return encodestring(
cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8')
示例22
def decrypt(self, key, msg):
from Crypto.Cipher import AES
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
return (cipher.decrypt(
decodestring(msg.encode('utf-8')))).decode('utf-8')
示例23
def _pam_sign(self, msg):
return urlsafe_b64encode(hmac.new(
self.secret_key.encode("utf-8"),
msg.encode("utf-8"),
sha256
).digest())
示例24
def test_add_break(bugbuzz_client, source_file):
source, uploaded_file = source_file
local_vars = dict(foo='bar', eggs='spam')
created_break_ = bugbuzz_client.add_break(
73,
uploaded_file['id'],
local_vars,
)
url = urlparse.urljoin(
bugbuzz_client.base_url,
'/breaks/{}'.format(created_break_['id']),
)
resp = requests.get(url)
break_ = resp.json()['break']
assert break_['session'] == bugbuzz_client.session_id
assert break_['lineno'] == 73
assert break_['file'] == uploaded_file['id']
iv = base64.b64decode(break_['aes_iv'].encode('latin1'))
encrypted = base64.b64decode(break_['local_vars'].encode('latin1'))
aes = AES.new(bugbuzz_client.aes_key, AES.MODE_CBC, iv)
parsed_local_vars = json.loads(
pkcs5_unpad(aes.decrypt(encrypted)).decode('latin1')
)
assert parsed_local_vars == local_vars
示例25
def test_encrypt_decrypt(bugbuzz_client):
plaintext = b'super foobar'
iv, encrypted = bugbuzz_client.encrypt(plaintext)
assert encrypted != plaintext
aes = AES.new(bugbuzz_client.aes_key, AES.MODE_CBC, iv)
assert pkcs5_unpad(aes.decrypt(encrypted)) == plaintext
示例26
def encrypt(self, key, msg):
from Crypto.Cipher import AES
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
enc = encodestring(cipher.encrypt(self.pad(msg)))
return enc
示例27
def decrypt(self, key, msg):
from Crypto.Cipher import AES
try:
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
plain = self.depad(cipher.decrypt(decodestring(msg)))
except:
return msg
try:
return eval(plain)
except SyntaxError:
return plain
示例28
def encrypt(self, key, msg):
from Crypto.Cipher import AES
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
return encodestring(
cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8')
示例29
def _pam_sign(self, msg):
return urlsafe_b64encode(hmac.new(
self.secret_key.encode("utf-8"),
msg.encode("utf-8"),
sha256
).digest())
示例30
def get_cipher(self, key, iv):
if len(key) > 32:
raise ValueError('Key length cannot exceed 32 bytes.')
key = key + ' ' * (32 - len(key))
return AES.new(key, AES.MODE_CFB, iv)