Python源码示例:Crypto.Cipher.AES.MODE_EAX

示例1
def unlock(vault_path, key):
    """
        Unlock legacy vault and retrieve content
    """

    f = open(vault_path, "rb")
    try:
        nonce, tag, ciphertext = [f.read(x) for x in (16, 16, -1)]
    finally:
        f.close()

    # Unlock Vault with key
    cipher = AES.new(get_hash(key), AES.MODE_EAX, nonce)
    data = cipher.decrypt_and_verify(ciphertext, tag)

    # Set vault content to class level var
    return json.loads(data.decode("utf-8")) 
示例2
def decrypt(key, passphrase, encrypted_file_path):
    """
    Decrypts the specified file using a RSA key and its bound passphrase
    :param key: an RSA key
    :param passphrase: str
    :param encrypted_file_path: str path of the file to be decrypted
    :return: bytes decrypted data
    """
    print('Decrypting file {} ...'.format(encrypted_file_path))
    rsa_key = RSA.import_key(key, passphrase=passphrase)
    with open(encrypted_file_path, 'rb') as f:
        # Read the encoded session key, nonce, digest and encrypted data
        enc_session_key, nonce, digest, ciphertext = \
            [ f.read(x) for x in (rsa_key.size_in_bytes(), 16, 16, -1) ]

        # decode the session key
        cipher_rsa = PKCS1_OAEP.new(rsa_key)
        session_key = cipher_rsa.decrypt(enc_session_key)
        cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce)

        # finally decrypt data
        data = cipher_aes.decrypt_and_verify(ciphertext, digest)
        print('Done')
        return data 
示例3
def test_mac_len(self):
        # Invalid MAC length
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_EAX,
                          nonce=self.nonce_96, mac_len=3)
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_EAX,
                          nonce=self.nonce_96, mac_len=16+1)

        # Valid MAC length
        for mac_len in range(5, 16 + 1):
            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96,
                             mac_len=mac_len)
            _, mac = cipher.encrypt_and_digest(self.data_128)
            self.assertEqual(len(mac), mac_len)

        # Default MAC length
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        _, mac = cipher.encrypt_and_digest(self.data_128)
        self.assertEqual(len(mac), 16) 
示例4
def test_output_param_neg(self):

        pt = b'5' * 16
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        ct = cipher.encrypt(pt)

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(TypeError, cipher.encrypt, pt, output=b'0'*16)
        
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(TypeError, cipher.decrypt, ct, output=b'0'*16)

        shorter_output = bytearray(15)
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(ValueError, cipher.encrypt, pt, output=shorter_output)
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(ValueError, cipher.decrypt, ct, output=shorter_output) 
示例5
def test_valid_multiple_encrypt_or_decrypt(self):
        for method_name in "encrypt", "decrypt":
            for auth_data in (None, b"333", self.data_128,
                              self.data_128 + b"3"):
                if auth_data is None:
                    assoc_len = None
                else:
                    assoc_len = len(auth_data)
                cipher = AES.new(self.key_128, AES.MODE_EAX,
                                 nonce=self.nonce_96)
                if auth_data is not None:
                    cipher.update(auth_data)
                method = getattr(cipher, method_name)
                method(self.data_128)
                method(self.data_128)
                method(self.data_128)
                method(self.data_128) 
示例6
def test_invalid_decrypt_or_update_after_verify(self):
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        ct = cipher.encrypt(self.data_128)
        mac = cipher.digest()

        for method_name in "decrypt", "update":
            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
            cipher.decrypt(ct)
            cipher.verify(mac)
            self.assertRaises(TypeError, getattr(cipher, method_name),
                              self.data_128)

            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
            cipher.decrypt_and_verify(ct, mac)
            self.assertRaises(TypeError, getattr(cipher, method_name),
                              self.data_128) 
示例7
def test_decrypt(self, tv):
        self._id = "Wycheproof Decrypt EAX Test #" + str(tv.id)
        
        try:
            cipher = AES.new(tv.key, AES.MODE_EAX, tv.iv, mac_len=tv.tag_size)
        except ValueError as e:
            assert len(tv.iv) == 0 and "Nonce cannot be empty" in str(e)
            return

        cipher.update(tv.aad)
        try:
            pt = cipher.decrypt_and_verify(tv.ct, tv.tag)
        except ValueError:
            assert not tv.valid
        else:
            assert tv.valid
            self.assertEqual(pt, tv.msg)
            self.warn(tv) 
示例8
def create_test(cls, name, factory, key_size):

        def test_template(self, factory=factory, key_size=key_size):
            cipher = factory.new(get_tag_random("cipher", key_size),
                                 factory.MODE_EAX,
                                 nonce=b"nonce")
            ct, mac = cipher.encrypt_and_digest(b"plaintext")

            cipher = factory.new(get_tag_random("cipher", key_size),
                                 factory.MODE_EAX,
                                 nonce=b"nonce")
            pt2 = cipher.decrypt_and_verify(ct, mac)

            self.assertEqual(b"plaintext", pt2)

        setattr(cls, "test_" + name, test_template) 
示例9
def test_mac_len(self):
        # Invalid MAC length
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_EAX,
                          nonce=self.nonce_96, mac_len=3)
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_EAX,
                          nonce=self.nonce_96, mac_len=16+1)

        # Valid MAC length
        for mac_len in range(5, 16 + 1):
            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96,
                             mac_len=mac_len)
            _, mac = cipher.encrypt_and_digest(self.data_128)
            self.assertEqual(len(mac), mac_len)

        # Default MAC length
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        _, mac = cipher.encrypt_and_digest(self.data_128)
        self.assertEqual(len(mac), 16) 
示例10
def test_output_param_neg(self):

        pt = b'5' * 16
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        ct = cipher.encrypt(pt)

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(TypeError, cipher.encrypt, pt, output=b'0'*16)
        
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(TypeError, cipher.decrypt, ct, output=b'0'*16)

        shorter_output = bytearray(15)
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(ValueError, cipher.encrypt, pt, output=shorter_output)
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(ValueError, cipher.decrypt, ct, output=shorter_output) 
示例11
def test_valid_multiple_encrypt_or_decrypt(self):
        for method_name in "encrypt", "decrypt":
            for auth_data in (None, b"333", self.data_128,
                              self.data_128 + b"3"):
                if auth_data is None:
                    assoc_len = None
                else:
                    assoc_len = len(auth_data)
                cipher = AES.new(self.key_128, AES.MODE_EAX,
                                 nonce=self.nonce_96)
                if auth_data is not None:
                    cipher.update(auth_data)
                method = getattr(cipher, method_name)
                method(self.data_128)
                method(self.data_128)
                method(self.data_128)
                method(self.data_128) 
示例12
def test_invalid_decrypt_or_update_after_verify(self):
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        ct = cipher.encrypt(self.data_128)
        mac = cipher.digest()

        for method_name in "decrypt", "update":
            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
            cipher.decrypt(ct)
            cipher.verify(mac)
            self.assertRaises(TypeError, getattr(cipher, method_name),
                              self.data_128)

            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
            cipher.decrypt_and_verify(ct, mac)
            self.assertRaises(TypeError, getattr(cipher, method_name),
                              self.data_128) 
示例13
def test_decrypt(self, tv):
        self._id = "Wycheproof Decrypt EAX Test #" + str(tv.id)
        
        try:
            cipher = AES.new(tv.key, AES.MODE_EAX, tv.iv, mac_len=tv.tag_size)
        except ValueError as e:
            assert len(tv.iv) == 0 and "Nonce cannot be empty" in str(e)
            return

        cipher.update(tv.aad)
        try:
            pt = cipher.decrypt_and_verify(tv.ct, tv.tag)
        except ValueError:
            assert not tv.valid
        else:
            assert tv.valid
            self.assertEqual(pt, tv.msg)
            self.warn(tv) 
示例14
def create_test(cls, name, factory, key_size):

        def test_template(self, factory=factory, key_size=key_size):
            cipher = factory.new(get_tag_random("cipher", key_size),
                                 factory.MODE_EAX,
                                 nonce=b"nonce")
            ct, mac = cipher.encrypt_and_digest(b"plaintext")

            cipher = factory.new(get_tag_random("cipher", key_size),
                                 factory.MODE_EAX,
                                 nonce=b"nonce")
            pt2 = cipher.decrypt_and_verify(ct, mac)

            self.assertEqual(b"plaintext", pt2)

        setattr(cls, "test_" + name, test_template) 
示例15
def encrypt(key, src_file_path, encrypted_file_path):
    """
    Encrypts the specified source file to the target path using AES and the
    specified RSA key
    :param key: an RSA key
    :param src_file_path: str path of file to be encrypted
    :param encrypted_file_path: str path of target encrypted file
    :return: None
    """
    print('Encrypting file {} to {} using AES'.format(src_file_path,
                                                      encrypted_file_path))
    rsa_key = RSA.import_key(key)
    with open(encrypted_file_path, "wb") as outfile:
        # Create a random session key and encrypt it with the input RSA key
        session_key = get_random_bytes(16)
        cipher_rsa = PKCS1_OAEP.new(rsa_key)
        outfile.write(cipher_rsa.encrypt(session_key))

        # Create an AES session key
        cipher_aes = AES.new(session_key, AES.MODE_EAX)

        with open(src_file_path ,'rb') as infile:
            # Use AES session key to encrypt input file data
            data = infile.read()
            ciphertext, digest = cipher_aes.encrypt_and_digest(data)

            # write to target file
            outfile.write(cipher_aes.nonce)
            outfile.write(digest)
            outfile.write(ciphertext)
    print('Done') 
示例16
def encode_aes(text_input: str) -> str:
    """Encode a string and output an json in string form.\n"""
    secret = b'4n4nk353hlli5w311d0n3andI1ik3it!'
    cipher = AES.new(secret, AES.MODE_EAX)
    ciphertext, tag = cipher.encrypt_and_digest(bytes(text_input, 'utf-8'))
    lista = [ciphertext, tag, cipher.nonce]
    json_k = ['ciphertext', 'tag', 'nonce']
    json_v = [b64encode(x).decode('utf-8') for x in lista]
    return json.dumps(dict(zip(json_k, json_v))) 
示例17
def decode_aes(json_input: str) -> str:
    """Decode a string in json form and output a string.\n"""
    try:
        b64 = json.loads(json_input)
        json_k = ['ciphertext', 'tag', 'nonce']
        jv = {k: b64decode(b64[k]) for k in json_k}
        secret = b'4n4nk353hlli5w311d0n3andI1ik3it!'
        cipher = AES.new(secret, AES.MODE_EAX, nonce=jv['nonce'])
        cleared = (cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])).decode('utf-8')
        return cleared
    except Exception as exception_decode:
        print(exception_decode)
        print("Incorrect decryption") 
示例18
def test_loopback_128(self):
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        pt = get_tag_random("plaintext", 16 * 100)
        ct = cipher.encrypt(pt)

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        pt2 = cipher.decrypt(ct)
        self.assertEqual(pt, pt2) 
示例19
def test_loopback_64(self):
        cipher = DES3.new(self.key_192, DES3.MODE_EAX, nonce=self.nonce_96)
        pt = get_tag_random("plaintext", 8 * 100)
        ct = cipher.encrypt(pt)

        cipher = DES3.new(self.key_192, DES3.MODE_EAX, nonce=self.nonce_96)
        pt2 = cipher.decrypt(ct)
        self.assertEqual(pt, pt2) 
示例20
def test_nonce(self):
        # If not passed, the nonce is created randomly
        cipher = AES.new(self.key_128, AES.MODE_EAX)
        nonce1 = cipher.nonce
        cipher = AES.new(self.key_128, AES.MODE_EAX)
        nonce2 = cipher.nonce
        self.assertEqual(len(nonce1), 16)
        self.assertNotEqual(nonce1, nonce2)

        cipher = AES.new(self.key_128, AES.MODE_EAX, self.nonce_96)
        ct = cipher.encrypt(self.data_128)

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertEquals(ct, cipher.encrypt(self.data_128)) 
示例21
def test_nonce_must_be_bytes(self):
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_EAX,
                          nonce=u'test12345678') 
示例22
def test_nonce_length(self):
        # nonce can be of any length (but not empty)
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_EAX,
                          nonce=b"")

        for x in range(1, 128):
            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=bchr(1) * x)
            cipher.encrypt(bchr(1)) 
示例23
def test_block_size_64(self):
        cipher = DES3.new(self.key_192, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertEqual(cipher.block_size, DES3.block_size) 
示例24
def test_nonce_attribute(self):
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertEqual(cipher.nonce, self.nonce_96)

        # By default, a 16 bytes long nonce is randomly generated
        nonce1 = AES.new(self.key_128, AES.MODE_EAX).nonce
        nonce2 = AES.new(self.key_128, AES.MODE_EAX).nonce
        self.assertEqual(len(nonce1), 16)
        self.assertNotEqual(nonce1, nonce2) 
示例25
def test_unknown_parameters(self):
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_EAX,
                          self.nonce_96, 7)
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_EAX,
                          nonce=self.nonce_96, unknown=7)

        # But some are only known by the base cipher
        # (e.g. use_aesni consumed by the AES module)
        AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96,
                use_aesni=False) 
示例26
def test_null_encryption_decryption(self):
        for func in "encrypt", "decrypt":
            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
            result = getattr(cipher, func)(b"")
            self.assertEqual(result, b"") 
示例27
def test_either_encrypt_or_decrypt(self):
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        cipher.encrypt(b"")
        self.assertRaises(TypeError, cipher.decrypt, b"")

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        cipher.decrypt(b"")
        self.assertRaises(TypeError, cipher.encrypt, b"") 
示例28
def test_invalid_mac(self):
        from Crypto.Util.strxor import strxor_c
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        ct, mac = cipher.encrypt_and_digest(self.data_128)

        invalid_mac = strxor_c(mac, 0x01)

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        self.assertRaises(ValueError, cipher.decrypt_and_verify, ct,
                          invalid_mac) 
示例29
def test_hex_mac(self):
        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        mac_hex = cipher.hexdigest()
        self.assertEqual(cipher.digest(), unhexlify(mac_hex))

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        cipher.hexverify(mac_hex) 
示例30
def test_message_chunks(self):
        # Validate that both associated data and plaintext/ciphertext
        # can be broken up in chunks of arbitrary length

        auth_data = get_tag_random("authenticated data", 127)
        plaintext = get_tag_random("plaintext", 127)

        cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)
        cipher.update(auth_data)
        ciphertext, ref_mac = cipher.encrypt_and_digest(plaintext)

        def break_up(data, chunk_length):
            return [data[i:i+chunk_length] for i in range(0, len(data),
                    chunk_length)]

        # Encryption
        for chunk_length in 1, 2, 3, 7, 10, 13, 16, 40, 80, 128:

            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)

            for chunk in break_up(auth_data, chunk_length):
                cipher.update(chunk)
            pt2 = b""
            for chunk in break_up(ciphertext, chunk_length):
                pt2 += cipher.decrypt(chunk)
            self.assertEqual(plaintext, pt2)
            cipher.verify(ref_mac)

        # Decryption
        for chunk_length in 1, 2, 3, 7, 10, 13, 16, 40, 80, 128:

            cipher = AES.new(self.key_128, AES.MODE_EAX, nonce=self.nonce_96)

            for chunk in break_up(auth_data, chunk_length):
                cipher.update(chunk)
            ct2 = b""
            for chunk in break_up(plaintext, chunk_length):
                ct2 += cipher.encrypt(chunk)
            self.assertEqual(ciphertext, ct2)
            self.assertEquals(cipher.digest(), ref_mac)