Python源码示例:binascii.a2b_hex()

示例1
def test_body_encoding(self):
        unicode_body = u("\xe9")
        byte_body = binascii.a2b_hex(b"e9")

        # unicode string in body gets converted to utf8
        response = self.fetch("/echopost", method="POST", body=unicode_body,
                              headers={"Content-Type": "application/blah"})
        self.assertEqual(response.headers["Content-Length"], "2")
        self.assertEqual(response.body, utf8(unicode_body))

        # byte strings pass through directly
        response = self.fetch("/echopost", method="POST",
                              body=byte_body,
                              headers={"Content-Type": "application/blah"})
        self.assertEqual(response.headers["Content-Length"], "1")
        self.assertEqual(response.body, byte_body)

        # Mixing unicode in headers and byte string bodies shouldn't
        # break anything
        response = self.fetch("/echopost", method="POST", body=byte_body,
                              headers={"Content-Type": "application/blah"},
                              user_agent=u("foo"))
        self.assertEqual(response.headers["Content-Length"], "1")
        self.assertEqual(response.body, byte_body) 
示例2
def test_body_encoding(self):
        unicode_body = u("\xe9")
        byte_body = binascii.a2b_hex(b"e9")

        # unicode string in body gets converted to utf8
        response = self.fetch("/echopost", method="POST", body=unicode_body,
                              headers={"Content-Type": "application/blah"})
        self.assertEqual(response.headers["Content-Length"], "2")
        self.assertEqual(response.body, utf8(unicode_body))

        # byte strings pass through directly
        response = self.fetch("/echopost", method="POST",
                              body=byte_body,
                              headers={"Content-Type": "application/blah"})
        self.assertEqual(response.headers["Content-Length"], "1")
        self.assertEqual(response.body, byte_body)

        # Mixing unicode in headers and byte string bodies shouldn't
        # break anything
        response = self.fetch("/echopost", method="POST", body=byte_body,
                              headers={"Content-Type": "application/blah"},
                              user_agent=u("foo"))
        self.assertEqual(response.headers["Content-Length"], "1")
        self.assertEqual(response.body, byte_body) 
示例3
def test_hash_password(tor_cmd):
    """
    Hash a controller password. It's salted so can't assert that we get a
    particular value. Also, tor's output is unnecessarily verbose so including
    hush to cut it down.
    """

    output = run_tor(tor_cmd, '--hush', '--hash-password', 'my_password').splitlines()[-1]

    if not re.match('^16:[0-9A-F]{58}$', output):
      raise AssertionError("Unexpected response from 'tor --hash-password my_password': %s" % output)

    # I'm not gonna even pretend to understand the following. Ported directly
    # from tor's test_cmdline_args.py.

    output_hex = binascii.a2b_hex(stem.util.str_tools._to_bytes(output).strip()[3:])
    salt, how, hashed = output_hex[:8], output_hex[8], output_hex[9:]

    count = (16 + (how & 15)) << ((how >> 4) + 6)
    stuff = salt + b'my_password'
    repetitions = count // len(stuff) + 1
    inp = (stuff * repetitions)[:count]
    assert_equal(hashlib.sha1(inp).digest(), hashed) 
示例4
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        # The cipher should work like a stream cipher

        # Test counter mode encryption, 3 bytes at a time
        ct3 = []
        cipher = self._new()
        for i in range(0, len(plaintext), 3):
            ct3.append(cipher.encrypt(plaintext[i:i+3]))
        ct3 = b2a_hex(b("").join(ct3))
        self.assertEqual(self.ciphertext, ct3)  # encryption (3 bytes at a time)

        # Test counter mode decryption, 3 bytes at a time
        pt3 = []
        cipher = self._new()
        for i in range(0, len(ciphertext), 3):
            pt3.append(cipher.encrypt(ciphertext[i:i+3]))
        # PY3K: This is meant to be text, do not change to bytes (data)
        pt3 = b2a_hex(b("").join(pt3))
        self.assertEqual(self.plaintext, pt3)  # decryption (3 bytes at a time) 
示例5
def hex_decode(input,errors='strict'):

    """ Decodes the object input and returns a tuple (output
        object, length consumed).

        input must be an object which provides the bf_getreadbuf
        buffer slot. Python strings, buffer objects and memory
        mapped files are examples of objects providing this slot.

        errors defines the error handling to apply. It defaults to
        'strict' handling which is the only currently supported
        error handling for this codec.

    """
    assert errors == 'strict'
    output = binascii.a2b_hex(input)
    return (output, len(input)) 
示例6
def handle_email_opened(self, query):
		# image size: 43 Bytes
		img_data = '47494638396101000100800100000000ffffff21f90401000001002c00000000'
		img_data += '010001000002024c01003b'
		img_data = binascii.a2b_hex(img_data)
		self.send_response(200)
		self.send_header('Content-Type', 'image/gif')
		self.send_header('Content-Length', str(len(img_data)))
		self.end_headers()
		self.wfile.write(img_data)

		msg_id = self.get_query('id')
		if not msg_id:
			return
		self.semaphore_acquire()
		query = self._session.query(db_models.Message)
		query = query.filter_by(id=msg_id, opened=None)
		message = query.first()
		if message and not message.campaign.has_expired:
			message.opened = db_models.current_timestamp()
			message.opener_ip = self.get_client_ip()
			message.opener_user_agent = self.headers.get('user-agent', None)
			self._session.commit()
		signals.send_safe('email-opened', self.logger, self)
		self.semaphore_release() 
示例7
def hex_decode(input,errors='strict'):

    """ Decodes the object input and returns a tuple (output
        object, length consumed).

        input must be an object which provides the bf_getreadbuf
        buffer slot. Python strings, buffer objects and memory
        mapped files are examples of objects providing this slot.

        errors defines the error handling to apply. It defaults to
        'strict' handling which is the only currently supported
        error handling for this codec.

    """
    assert errors == 'strict'
    output = binascii.a2b_hex(input)
    return (output, len(input)) 
示例8
def YARACompile(ruledata):
    if ruledata.startswith('#'):
        if ruledata.startswith('#h#'):
            rule = binascii.a2b_hex(ruledata[3:])
        elif ruledata.startswith('#b#'):
            rule = binascii.a2b_base64(ruledata[3:])
        elif ruledata.startswith('#s#'):
            rule = 'rule string {strings: $a = "%s" ascii wide nocase condition: $a}' % ruledata[3:]
        elif ruledata.startswith('#q#'):
            rule = ruledata[3:].replace("'", '"')
        else:
            rule = ruledata[1:]
        return yara.compile(source=rule)
    else:
        dFilepaths = {}
        if os.path.isdir(ruledata):
            for root, dirs, files in os.walk(ruledata):
                for file in files:
                    filename = os.path.join(root, file)
                    dFilepaths[filename] = filename
        else:
            for filename in ProcessAt(ruledata):
                dFilepaths[filename] = filename
        return yara.compile(filepaths=dFilepaths) 
示例9
def test_dispatch_opcode_iquery(self):
        # DNS packet with IQUERY opcode
        payload = "271109000001000000000000076578616d706c6503636f6d0000010001"

        # expected response is an error code REFUSED.  The other fields are
        # id 10001
        # opcode IQUERY
        # rcode REFUSED
        # flags QR RD
        # ;QUESTION
        # example.com. IN A
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b"271189050001000000000000076578616d706c6503636f"
                             b"6d0000010001")

        request = dns.message.from_wire(binascii.a2b_hex(payload))
        request.environ = {'addr': self.addr, 'context': self.context}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例10
def test_dispatch_opcode_status(self):
        # DNS packet with STATUS opcode
        payload = "271211000001000000000000076578616d706c6503636f6d0000010001"

        # expected response is an error code REFUSED.  The other fields are
        # id 10002
        # opcode STATUS
        # rcode REFUSED
        # flags QR RD
        # ;QUESTION
        # example.com. IN A
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b"271291050001000000000000076578616d706c6503636f"
                             b"6d0000010001")

        request = dns.message.from_wire(binascii.a2b_hex(payload))
        request.environ = {'addr': self.addr, 'context': self.context}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例11
def test_dispatch_opcode_query_non_existent_zone(self):
        # DNS packet with QUERY opcode
        # query is for example.com. IN A
        payload = ("271501200001000000000001076578616d706c6503636f6d0000010001"
                   "0000291000000000000000")

        # expected_response is an error code REFUSED.  The other fields are
        # id 10005
        # opcode QUERY
        # rcode REFUSED
        # flags QR RD
        # edns 0
        # payload 8192
        # ;QUESTION
        # example.com. IN A
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b"271581050001000000000001076578616d706c6503636f"
                             b"6d00000100010000292000000000000000")
        request = dns.message.from_wire(binascii.a2b_hex(payload))
        request.environ = {'addr': self.addr, 'context': self.context}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例12
def test_dispatch_opcode_query_unsupported_recordtype(self):
        # query is for example.com. IN DNAME
        payload = "271901000001000000000000076578616d706c6503636f6d0000270001"

        # expected_response is REFUSED.  The other fields are
        # id 10009
        # opcode QUERY
        # rcode REFUSED
        # flags QR RD
        # ;QUESTION
        # example.com. IN DNAME
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b"271981050001000000000000076578616d706c6503636f"
                             b"6d0000270001")

        request = dns.message.from_wire(binascii.a2b_hex(payload))
        request.environ = {'addr': self.addr, 'context': self.context}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例13
def test_send_notify_message(self):
        # id 10001
        # opcode NOTIFY
        # rcode NOERROR
        # flags QR AA
        # ;QUESTION
        # example.com. IN SOA
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_notify_response = ("2711a4000001000000000000076578616d706c650"
                                    "3636f6d0000060001")
        context = self.get_context()
        with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
                binascii.a2b_hex(expected_notify_response))):
            response, retry = self.notify.notify_zone_changed(
                context, objects.Zone.from_dict(self.test_zone),
                self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
            self.assertEqual(response, dns.message.from_wire(
                binascii.a2b_hex(expected_notify_response)))
            self.assertEqual(retry, 1) 
示例14
def test_poll_for_serial_number(self):
        # id 10001
        # opcode QUERY
        # rcode NOERROR
        # flags QR AA
        # ;QUESTION
        # example.com. IN SOA
        # ;ANSWER
        # example.com. 3600 IN SOA example-ns.com. admin.example.com. 100 3600
        #  600 86400 3600
        # ;AUTHORITY
        # ;ADDITIONAL
        poll_response = ("271184000001000100000000076578616d706c6503636f6d0000"
                         "060001c00c0006000100000e1000290a6578616d706c652d6e73"
                         "c0140561646d696ec00c0000006400000e100000025800015180"
                         "00000e10")
        context = self.get_context()
        with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
                binascii.a2b_hex(poll_response))):
            status, serial, retries = self.notify.get_serial_number(
                context, objects.Zone.from_dict(self.test_zone),
                self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
            self.assertEqual(status, 'SUCCESS')
            self.assertEqual(serial, self.test_zone['serial'])
            self.assertEqual(retries, 2) 
示例15
def test_poll_for_serial_number_lower_serial(self):
        # id 10001
        # opcode QUERY
        # rcode NOERROR
        # flags QR AA
        # ;QUESTION
        # example.com. IN SOA
        # ;ANSWER
        # example.com. 3600 IN SOA example-ns.com. admin.example.com. 99 3600
        #  600 86400 3600
        # ;AUTHORITY
        # ;ADDITIONAL
        poll_response = ("271184000001000100000000076578616d706c6503636f6d0000"
                         "060001c00c0006000100000e1000290a6578616d706c652d6e73"
                         "c0140561646d696ec00c0000006300000e100000025800015180"
                         "00000e10")
        context = self.get_context()
        with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
                binascii.a2b_hex(poll_response))):
            status, serial, retries = self.notify.get_serial_number(
                context, objects.Zone.from_dict(self.test_zone),
                self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
            self.assertEqual(status, 'ERROR')
            self.assertEqual(serial, 99)
            self.assertEqual(retries, 0) 
示例16
def test_poll_for_serial_number_higher_serial(self):
        # id 10001
        # opcode QUERY
        # rcode NOERROR
        # flags QR AA
        # ;QUESTION
        # example.com. IN SOA
        # ;ANSWER
        # example.com. 3600 IN SOA example-ns.com. admin.example.com. 101 3600
        #  600 86400 3600
        # ;AUTHORITY
        # ;ADDITIONAL
        poll_response = ("271184000001000100000000076578616d706c6503636f6d0000"
                         "060001c00c0006000100000e1000290a6578616d706c652d6e73"
                         "c0140561646d696ec00c0000006500000e100000025800015180"
                         "00000e10")
        context = self.get_context()
        with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
                binascii.a2b_hex(poll_response))):
            status, serial, retries = self.notify.get_serial_number(
                context, objects.Zone.from_dict(self.test_zone),
                self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
            self.assertEqual(status, 'SUCCESS')
            self.assertEqual(serial, 101)
            self.assertEqual(retries, 2) 
示例17
def test_receive_notify(self, mock_doaxfr, mock_query):
        """
        Get a NOTIFY and ensure the response is right,
        and an AXFR is triggered
        """
        payload = ('1a7220000001000000000000076578616d706c6503636f6d000006'
                   '0001')
        # expected response is NOERROR, other fields are
        # opcode NOTIFY
        # rcode NOERROR
        # flags QR AA
        # ;QUESTION
        # example.com. IN SOA
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b'1a72a4000001000000000000076578616d706c6503'
                             b'636f6d0000060001')
        request = dns.message.from_wire(binascii.a2b_hex(payload))
        request.environ = {'addr': ['0.0.0.0', 1234]}
        response = next(self.handler(request)).to_wire()
        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例18
def test_receive_notify_bad_notifier(self):
        payload = '243520000001000000000000076578616d706c6503636f6d0000060001'
        # expected response is REFUSED, other fields are
        # opcode NOTIFY
        # rcode REFUSED
        # flags QR
        # ;QUESTION
        # example.com. IN SOA
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b'2435a0050001000000000000076578616d706c6503636f'
                             b'6d0000060001')
        request = dns.message.from_wire(binascii.a2b_hex(payload))
        # Bad 'requester'
        request.environ = {'addr': ['6.6.6.6', 1234]}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例19
def test_receive_create_bad_notifier(self):
        payload = '8dfd70000001000000000000076578616d706c6503636f6d00ff02ff00'
        # expected response is REFUSED, other fields are
        # opcode 14
        # rcode REFUSED
        # flags QR
        # ;QUESTION
        # example.com. CLASS65280 TYPE65282
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b'8dfdf0050001000000000000076578616d706c6503636f'
                             b'6d00ff02ff00')
        request = dns.message.from_wire(binascii.a2b_hex(payload))
        # Bad 'requester'
        request.environ = {'addr': ['6.6.6.6', 1234]}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(binascii.b2a_hex(response), expected_response) 
示例20
def test_receive_delete(self, mock_execute):
        payload = '3b9970000001000000000000076578616d706c6503636f6d00ff03ff00'
        # Expected NOERROR other fields are
        # opcode 14
        # rcode NOERROR
        # flags QR AA
        # ;QUESTION
        # example.com. CLASS65280 TYPE65283
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b'3b99f4000001000000000000076578616d706c6503636f'
                             b'6d00ff03ff00')
        request = dns.message.from_wire(binascii.a2b_hex(payload))
        request.environ = {'addr': ['0.0.0.0', 1234]}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例21
def test_receive_delete_bad_notifier(self):
        payload = 'e6da70000001000000000000076578616d706c6503636f6d00ff03ff00'
        # expected response is REFUSED, other fields are
        # opcode 14
        # rcode REFUSED
        # flags QR
        # ;QUESTION
        # example.com. CLASS65280 TYPE65283
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b'e6daf0050001000000000000076578616d706c6503636f'
                             b'6d00ff03ff00')
        request = dns.message.from_wire(binascii.a2b_hex(payload))
        # Bad 'requester'
        request.environ = {'addr': ['6.6.6.6', 1234]}
        response = next(self.handler(request)).to_wire()

        self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例22
def test_transfer_source(self, mock_doaxfr, mock_query):
        payload = '735d70000001000000000000076578616d706c6503636f6d00ff02ff00'
        # Expected NOERROR other fields are
        # opcode 14
        # rcode NOERROR
        # flags QR AA
        # ;QUESTION
        # example.com. CLASS65280 TYPE65282
        # ;ANSWER
        # ;AUTHORITY
        # ;ADDITIONAL
        expected_response = (b'735df4000001000000000000076578616d706c6503636f'
                             b'6d00ff02ff00')
        request = dns.message.from_wire(binascii.a2b_hex(payload))
        request.environ = {'addr': ['0.0.0.0', 1234]}
        with mock.patch.object(
                designate.backend.agent_backend.impl_fake.FakeBackend,
                'find_zone_serial', return_value=None):
            response = next(self.handler(request)).to_wire()
            mock_doaxfr.assert_called_with(
                'example.com.', [], source='1.2.3.4'
            )
            self.assertEqual(expected_response, binascii.b2a_hex(response)) 
示例23
def check(ip, port, timeout):
    try:
        socket.setdefaulttimeout(timeout)
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((ip, int(port)))
        data = binascii.a2b_hex(
            "3a000000a741000000000000d40700000000000061646d696e2e24636d640000000000ffffffff130000001069736d6173746572000100000000")
        s.send(data)
        result = s.recv(1024)
        if "ismaster" in result:
            getlog_data = binascii.a2b_hex(
                "480000000200000000000000d40700000000000061646d696e2e24636d6400000000000100000021000000026765744c6f670010000000737461727475705761726e696e67730000")
            s.send(getlog_data)
            result = s.recv(1024)
            if "totalLinesWritten" in result:
                return u"未授权访问"
    except Exception, e:
        pass 
示例24
def hex_decode(input,errors='strict'):

    """ Decodes the object input and returns a tuple (output
        object, length consumed).

        input must be an object which provides the bf_getreadbuf
        buffer slot. Python strings, buffer objects and memory
        mapped files are examples of objects providing this slot.

        errors defines the error handling to apply. It defaults to
        'strict' handling which is the only currently supported
        error handling for this codec.

    """
    assert errors == 'strict'
    output = binascii.a2b_hex(input)
    return (output, len(input)) 
示例25
def hex_decode(input,errors='strict'):

    """ Decodes the object input and returns a tuple (output
        object, length consumed).

        input must be an object which provides the bf_getreadbuf
        buffer slot. Python strings, buffer objects and memory
        mapped files are examples of objects providing this slot.

        errors defines the error handling to apply. It defaults to
        'strict' handling which is the only currently supported
        error handling for this codec.

    """
    assert errors == 'strict'
    output = binascii.a2b_hex(input)
    return (output, len(input)) 
示例26
def number_to_bytes(num, num_bytes):
    padded_hex = '%0*x' % (2 * num_bytes, num)
    big_endian = binascii.a2b_hex(padded_hex.encode('ascii'))
    return big_endian 
示例27
def _readPage(self, page, tries=3):
        """Read a page from the flash and receive it's contents"""
        self._debug('Command: FLASH_READ_PAGE %d' % page)

        # Load page into the buffer
        crc = self._loadPageMultiple(page, tries)

        for _ in range(tries):
            # Dump the buffer
            self._sendCommand(COMMAND_BUFFER_LOAD)

            # Wait for data start
            if not self._waitForMessage(COMMAND_BUFFER_LOAD):
                self._debug('Invalid / no response for BUFFER_LOAD command')
                continue

            # Load successful -> read sector with 2 nibbles per byte
            page_data = self._readExactly(self.page_size * 2)
            if page_data is None:
                self._debug('Invalid / no response for page data')
                continue

            try:
                data = binascii.a2b_hex(page_data.decode(ENCODING))
                if crc == binascii.crc32(data):
                    self._debug('CRC did match with read data')
                    return data
                else:
                    self._debug('CRC did not match with read data')
                    continue

            except TypeError:
                self._debug('CRC could not be parsed')
                continue

        self._debug('Page read tries exceeded')
        return None 
示例28
def _read_register(self, cmd, name):
        """Generic read register function, send cmd and read a <CMD><LEN><DATA> response"""
        self._sendCommand(cmd)
        if not self._waitForMessage(cmd):
            self._debug('Invalid / no response for %s command' % (name,))
            logError('Invalid response')
            return None

        length_str = self._readExactly(2).decode(ENCODING)
        if length_str is None:
            self._debug('Invalid / no response for %s length' % (name,))
            logError('Invalid response')
            return None

        try:
            length = int(length_str, 16)
        except ValueError:
            self._debug('Could not decode %s length' % (name,))
            logError('Invalid register length')
            return None

        data_str = self._readExactly(length * 2).decode(ENCODING)
        if data_str is None:
            self._debug('Invalid / no response for %s check' % (name,))
            logError('Invalid response')
            return None

        try:  # Check if valid data
            decoded_data = binascii.a2b_hex(data_str)
        except TypeError:
            self._debug('Could not decode %s content' % (name))
            logError('Invalid response')
            return None

        return data_str 
示例29
def _decode_xsrf_token(self, cookie):
        """把_get_raw_xsrf_token返回的cookie字符串转换成元组形式.
        """

        try:
            m = _signed_value_version_re.match(utf8(cookie))

            if m:
                version = int(m.group(1))
                if version == 2:
                    _, mask, masked_token, timestamp = cookie.split("|")

                    mask = binascii.a2b_hex(utf8(mask))
                    token = _websocket_mask(
                        mask, binascii.a2b_hex(utf8(masked_token)))
                    timestamp = int(timestamp)
                    return version, token, timestamp
                else:
                    # Treat unknown versions as not present instead of failing.
                    raise Exception("Unknown xsrf cookie version")
            else:
                version = 1
                try:
                    token = binascii.a2b_hex(utf8(cookie))
                except (binascii.Error, TypeError):
                    token = utf8(cookie)
                # We don't have a usable timestamp in older versions.
                timestamp = int(time.time())
                return (version, token, timestamp)
        except Exception:
            # Catch exceptions and return nothing instead of failing.
            gen_log.debug("Uncaught exception in _decode_xsrf_token",
                          exc_info=True)
            return None, None, None 
示例30
def test_cookie_tampering_future_timestamp(self):
        handler = CookieTestRequestHandler()
        # this string base64-encodes to '12345678'
        handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
                                  version=1)
        cookie = handler._cookies['foo']
        match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
        self.assertTrue(match)
        timestamp = match.group(1)
        sig = match.group(2)
        self.assertEqual(
            _create_signature_v1(handler.application.settings["cookie_secret"],
                                 'foo', '12345678', timestamp),
            sig)
        # shifting digits from payload to timestamp doesn't alter signature
        # (this is not desirable behavior, just confirming that that's how it
        # works)
        self.assertEqual(
            _create_signature_v1(handler.application.settings["cookie_secret"],
                                 'foo', '1234', b'5678' + timestamp),
            sig)
        # tamper with the cookie
        handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
            to_basestring(timestamp), to_basestring(sig)))
        # it gets rejected
        with ExpectLog(gen_log, "Cookie timestamp in future"):
            self.assertTrue(
                handler.get_secure_cookie('foo', min_version=1) is None)