Python源码示例:Crypto.Cipher.AES.new()

示例1
def encrypt(self, text, appid):
        """对明文进行加密
        @param text: 需要加密的明文
        @return: 加密得到的字符串
        """
        # 16位随机字符串添加到明文开头
        len_str = struct.pack("I", socket.htonl(len(text.encode())))
        # text = self.get_random_str() + binascii.b2a_hex(len_str).decode() + text + appid
        text = self.get_random_str() + len_str + text.encode() + appid
        # 使用自定义的填充方式对明文进行补位填充
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # 加密
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # 使用BASE64对加密后的字符串进行编码
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8')
        except Exception as e:
            return ierror.WXBizMsgCrypt_EncryptAES_Error, None 
示例2
def unlock(vault_path, key):
    """
        Unlock legacy vault and retrieve content
    """

    f = open(vault_path, "rb")
    try:
        nonce, tag, ciphertext = [f.read(x) for x in (16, 16, -1)]
    finally:
        f.close()

    # Unlock Vault with key
    cipher = AES.new(get_hash(key), AES.MODE_EAX, nonce)
    data = cipher.decrypt_and_verify(ciphertext, tag)

    # Set vault content to class level var
    return json.loads(data.decode("utf-8")) 
示例3
def prepare_items(secrets, categories):
    """
        Prepare all secrets to the new import format
    """

    out = []
    for secret in secrets:
        out.append({
            'name': secret.get('name'),
            'url': None,  # Not supported in legacy database
            'login': secret.get('login'),
            'password': secret.get('password'),
            'notes': secret.get('notes'),
            'category': get_category_name(secret.get('category'), categories),
        })

    return out 
示例4
def decrypt_secret(data, key):
    if not data:
        return None

    aeskey = ""
    sha256 = SHA256.new()
    sha256.update(key)
    for i in range(1000):
        sha256.update(data[28:60])
    aeskey = sha256.digest()

    secret = ""
    aes = AES.new(aeskey)
    for key_offset in range(0, len(data) - 60, 16):
        if (key_offset + 16) <= len(data) - 60:
            secret = secret + aes.decrypt(data[60 + key_offset:60 + key_offset + 16])

    return secret 
示例5
def decrypt_file(key, filename, chunk_size=24*1024):
        
    output_filename = os.path.splitext(filename)[0]

    with open(filename, 'rb') as infile:
        origsize = struct.unpack('<Q', infile.read(struct.calcsize('Q')))[0]
        iv = infile.read(16)
        decryptor = AES.new(key, AES.MODE_CBC, iv)

        with open(output_filename, 'wb') as outfile:
            while True:
                chunk = infile.read(chunk_size)
                if len(chunk) == 0:
                    break
                outfile.write(decryptor.decrypt(chunk))

            outfile.truncate(origsize) 
示例6
def decrypt(self, ciphertext: bytes) -> bytes:
        """Return plaintext for given ciphertext."""

        # Split out the nonce, tag, and encrypted data.
        nonce = ciphertext[:12]
        if len(nonce) != 12:
            raise DataIntegrityError("Cipher text is damaged: invalid nonce length")

        tag = ciphertext[12:28]
        if len(tag) != 16:
            raise DataIntegrityError("Cipher text is damaged: invalid tag length")

        encrypted = ciphertext[28:]

        # Construct AES cipher, with old nonce.
        cipher = AES.new(self.cipher_key, AES.MODE_GCM, nonce)

        # Decrypt and verify.
        try:
            plaintext = cipher.decrypt_and_verify(encrypted, tag)  # type: ignore
        except ValueError as e:
            raise DataIntegrityError("Cipher text is damaged: {}".format(e))
        return plaintext 
示例7
def __init__(self, key, iv=None):
        iv = iv or key[:16]
        super().__init__(AES.new(key, AES.MODE_CBC, iv)) 
示例8
def __init__(self, key):
        super().__init__(AES.new(key, AES.MODE_ECB)) 
示例9
def _aes_decrypt(aes_key, aes_text) -> Text:
    """
    使用密钥解密文本信息,将会自动填充空白字符
    :param aes_key: 解密密钥
    :param aes_text: 需要解密的文本
    :return: 经过解密的数据
    """
    # 初始化解码器
    cipher = AES.new(_fill_16(aes_key), AES.MODE_ECB)
    # 优先逆向解密十六进制为bytes
    converted = a2b_base64(aes_text.replace('-', '/').replace("%3D", "=").encode())
    # 使用aes解密密文
    decrypt_text = str(cipher.decrypt(converted), encoding='utf-8').replace('\0', '')
    # 返回执行结果
    return decrypt_text.strip() 
示例10
def _aes_encrypt(aes_key, aes_text) -> Text:
    """
    使用密钥加密文本信息,将会自动填充空白字符
    :param aes_key: 加密密钥
    :param aes_text: 需要加密的文本
    :return: 经过加密的数据
    """
    # 初始化加密器
    cipher = AES.new(_fill_16(aes_key), AES.MODE_ECB)
    # 先进行aes加密
    aes_encrypted = cipher.encrypt(_fill_16(aes_text))
    # 使用十六进制转成字符串形式
    encrypt_text = b2a_base64(aes_encrypted).decode().replace('/', '-').strip()
    # 返回执行结果
    return encrypt_text 
示例11
def decrypt(self, text, appid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_DecryptAES_Error, None
        try:
            # pad = ord(plain_text[-1])
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_appid = content[xml_len + 4:]
        except Exception as e:
            return ierror.WXBizMsgCrypt_IllegalBuffer, None
        if from_appid != appid:
            return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
        return 0, xml_content.decode() 
示例12
def encrypt(plaintext, key, blockSize):
	iv = bytearray(os.urandom(blockSize))
	iv = str(iv)
	reIV = bytearray(os.urandom(blockSize))
	reIV = str(reIV)
	rivPlaintext = pad(reIV + str(plaintext), blockSize)
	#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
	cipher = AES.new(key, AES.MODE_CBC, IV=iv)
	return iv + str(cipher.encrypt(rivPlaintext)) 
示例13
def decrypt(ciphertext, key, blockSize):
	#print "ciphertext: " + base64.b64encode(ciphertext)
	iv = ciphertext[0:blockSize]
	#print "iv: " + base64.b64encode(iv)
	#print "ciphertext: " + base64.b64encode(ciphertext)
	rivCiphertext = ciphertext[blockSize:]
	#print "rivCiphertext: " + base64.b64encode(rivCiphertext)
	rivCiphertext = str(rivCiphertext)
	#print "rivCiphertext: " + base64.b64encode(rivCiphertext)
	cipher = AES.new(key, AES.MODE_CBC, IV=iv)
	rivPlaintext = cipher.decrypt(rivCiphertext)
	#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
	rivPlaintext = str(rivPlaintext)
	#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
	rivPlaintext = unpad(rivPlaintext)
	#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
	#rivPlaintext = str(cipher.decrypt(str(rivCiphertext)))
	#print "rivPlaintext: " + base64.b64encode(rivPlaintext)
	#print "rivPlaintext: " + base64.b64encode(rivPlaintext[blockSize:])
	return rivPlaintext[blockSize:]
	#return rivPlaintext 
示例14
def get_hash(key):
    """
        Returns a 32 bytes hash for a given master key
    """

    h = SHA256.new()
    for i in range(1, 10000):
        h.update(str.encode(
            str(i) + config.salt + key))
    return base64.b64decode(str.encode(h.hexdigest()[:32])) 
示例15
def digest_key(self):
        """
            Use SHA-256 over our key to get a proper-sized AES key
        """

        # Add optional salt to key
        key = self.key
        if self.salted_key:
            key = self.salted_key

        return SHA256.new(key).digest() 
示例16
def get_aes(self, IV):
        """
            AES instance
        """

        return AES.new(self.digest_key(), AES.MODE_CBC, IV) 
示例17
def decrypt(self, hex):
        encoded = binascii.unhexlify(hex)
        secret = self.get_secret()
        BLOCK_SIZE = 16
        mode = AES.MODE_ECB
        cipher = AES.new(secret, mode)
        return cipher.decrypt(encoded).split('\x00')[0] 
示例18
def aes_encrypt(self, message, passphrase):
        IV = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
        aes = AES.new(passphrase, AES.MODE_CBC, IV)
        return aes.encrypt(message)

    # get value used to build the salt 
示例19
def encrypt(self, key, msg):
        from Crypto.Cipher import AES
        secret = self.getSecret(key)
        Initial16bytes = '0123456789012345'
        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
        enc = encodestring(cipher.encrypt(self.pad(msg)))
        return enc 
示例20
def decrypt(self, key, msg):
        from Crypto.Cipher import AES
        try:
            secret = self.getSecret(key)
            Initial16bytes = '0123456789012345'
            cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
            plain = self.depad(cipher.decrypt(decodestring(msg)))
        except:
            return msg
        try:
            return eval(plain)
        except SyntaxError:
            return plain 
示例21
def encrypt(self, key, msg):
        from Crypto.Cipher import AES
        secret = self.getSecret(key)
        Initial16bytes = '0123456789012345'
        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
        return encodestring(
            cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') 
示例22
def decrypt(self, key, msg):
        from Crypto.Cipher import AES
        secret = self.getSecret(key)
        Initial16bytes = '0123456789012345'
        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
        return (cipher.decrypt(
            decodestring(msg.encode('utf-8')))).decode('utf-8') 
示例23
def _pam_sign(self, msg):

        return urlsafe_b64encode(hmac.new(
            self.secret_key.encode("utf-8"),
            msg.encode("utf-8"),
            sha256
        ).digest()) 
示例24
def test_add_break(bugbuzz_client, source_file):
    source, uploaded_file = source_file
    local_vars = dict(foo='bar', eggs='spam')
    created_break_ = bugbuzz_client.add_break(
        73,
        uploaded_file['id'],
        local_vars,
    )

    url = urlparse.urljoin(
        bugbuzz_client.base_url,
        '/breaks/{}'.format(created_break_['id']),
    )
    resp = requests.get(url)
    break_ = resp.json()['break']
    assert break_['session'] == bugbuzz_client.session_id
    assert break_['lineno'] == 73
    assert break_['file'] == uploaded_file['id']

    iv = base64.b64decode(break_['aes_iv'].encode('latin1'))
    encrypted = base64.b64decode(break_['local_vars'].encode('latin1'))
    aes = AES.new(bugbuzz_client.aes_key, AES.MODE_CBC, iv)
    parsed_local_vars = json.loads(
        pkcs5_unpad(aes.decrypt(encrypted)).decode('latin1')
    )
    assert parsed_local_vars == local_vars 
示例25
def test_encrypt_decrypt(bugbuzz_client):
    plaintext = b'super foobar'
    iv, encrypted = bugbuzz_client.encrypt(plaintext)
    assert encrypted != plaintext

    aes = AES.new(bugbuzz_client.aes_key, AES.MODE_CBC, iv)
    assert pkcs5_unpad(aes.decrypt(encrypted)) == plaintext 
示例26
def encrypt(self, key, msg):
        from Crypto.Cipher import AES
        secret = self.getSecret(key)
        Initial16bytes = '0123456789012345'
        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
        enc = encodestring(cipher.encrypt(self.pad(msg)))
        return enc 
示例27
def decrypt(self, key, msg):
        from Crypto.Cipher import AES
        try:
            secret = self.getSecret(key)
            Initial16bytes = '0123456789012345'
            cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
            plain = self.depad(cipher.decrypt(decodestring(msg)))
        except:
            return msg
        try:
            return eval(plain)
        except SyntaxError:
            return plain 
示例28
def encrypt(self, key, msg):
        from Crypto.Cipher import AES
        secret = self.getSecret(key)
        Initial16bytes = '0123456789012345'
        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
        return encodestring(
            cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') 
示例29
def _pam_sign(self, msg):

        return urlsafe_b64encode(hmac.new(
            self.secret_key.encode("utf-8"),
            msg.encode("utf-8"),
            sha256
        ).digest()) 
示例30
def get_cipher(self, key, iv):
            if len(key) > 32:
                raise ValueError('Key length cannot exceed 32 bytes.')
            key = key + ' ' * (32 - len(key))
            return AES.new(key, AES.MODE_CFB, iv)