Python源码示例:binascii.Error()

示例1
def b64decode(s, altchars=None, validate=False):
    """Decode a Base64 encoded byte string.

    s is the byte string to decode.  Optional altchars must be a
    string of length 2 which specifies the alternative alphabet used
    instead of the '+' and '/' characters.

    The decoded string is returned.  A binascii.Error is raised if s is
    incorrectly padded.

    If validate is False (the default), non-base64-alphabet characters are
    discarded prior to the padding check.  If validate is True,
    non-base64-alphabet characters in the input result in a binascii.Error.
    """
    s = _bytes_from_decode_data(s)
    if altchars is not None:
        altchars = _bytes_from_decode_data(altchars)
        assert len(altchars) == 2, repr(altchars)
        s = s.translate(bytes.maketrans(altchars, b'+/'))
    if validate and not re.match(b'^[A-Za-z0-9+/]*={0,2}$', s):
        raise binascii.Error('Non-base64 digit found')
    return binascii.a2b_base64(s) 
示例2
def urlsafe_b64decode(s):
    """Decode a byte string encoded with the standard Base64 alphabet.

    s is the byte string to decode.  The decoded byte string is
    returned.  binascii.Error is raised if the input is incorrectly
    padded or if there are non-alphabet characters present in the
    input.

    The alphabet uses '-' instead of '+' and '_' instead of '/'.
    """
    s = _bytes_from_decode_data(s)
    s = s.translate(_urlsafe_decode_translation)
    return b64decode(s)



# Base32 encoding/decoding must be done in Python 
示例3
def b16decode(s, casefold=False):
    """Decode a Base16 encoded byte string.

    s is the byte string to decode.  Optional casefold is a flag
    specifying whether a lowercase alphabet is acceptable as input.
    For security purposes, the default is False.

    The decoded byte string is returned.  binascii.Error is raised if
    s were incorrectly padded or if there are non-alphabet characters
    present in the string.
    """
    s = _bytes_from_decode_data(s)
    if casefold:
        s = s.upper()
    if re.search(b'[^0-9A-F]', s):
        raise binascii.Error('Non-base16 digit found')
    return binascii.unhexlify(s)



# Legacy interface.  This code could be cleaned up since I don't believe
# binascii has any line length limitations.  It just doesn't seem worth it
# though.  The files should be opened in binary mode. 
示例4
def wrap_exception():
    def helper(function):

        @functools.wraps(function)
        def decorated_function(self, context, container, *args, **kwargs):
            try:
                return function(self, context, container, *args, **kwargs)
            except exception.DockerError as e:
                with excutils.save_and_reraise_exception(reraise=False):
                    LOG.error("Error occurred while calling Docker API: %s",
                              str(e))
            except Exception as e:
                with excutils.save_and_reraise_exception(reraise=False):
                    LOG.exception("Unexpected exception: %s", str(e))
        return decorated_function
    return helper 
示例5
def _handle_request_exception(self, e):
        if isinstance(e, Finish):
            # Not an error; just finish the request without logging.
            if not self._finished:
                self.finish(*e.args)
            return
        try:
            self.log_exception(*sys.exc_info())
        except Exception:
            # An error here should still get a best-effort send_error()
            # to avoid leaking the connection.
            app_log.error("Error in exception logger", exc_info=True)
        if self._finished:
            # Extra errors after the request has been finished should
            # be logged, but there is no reason to continue to try and
            # send a response.
            return
        if isinstance(e, HTTPError):
            if e.status_code not in httputil.responses and not e.reason:
                gen_log.error("Bad HTTP status code: %d", e.status_code)
                self.send_error(500, exc_info=sys.exc_info())
            else:
                self.send_error(e.status_code, exc_info=sys.exc_info())
        else:
            self.send_error(500, exc_info=sys.exc_info()) 
示例6
def _handle_request_exception(self, e):
        if isinstance(e, Finish):
            # Not an error; just finish the request without logging.
            if not self._finished:
                self.finish(*e.args)
            return
        try:
            self.log_exception(*sys.exc_info())
        except Exception:
            # An error here should still get a best-effort send_error()
            # to avoid leaking the connection.
            app_log.error("Error in exception logger", exc_info=True)
        if self._finished:
            # Extra errors after the request has been finished should
            # be logged, but there is no reason to continue to try and
            # send a response.
            return
        if isinstance(e, HTTPError):
            if e.status_code not in httputil.responses and not e.reason:
                gen_log.error("Bad HTTP status code: %d", e.status_code)
                self.send_error(500, exc_info=sys.exc_info())
            else:
                self.send_error(e.status_code, exc_info=sys.exc_info())
        else:
            self.send_error(500, exc_info=sys.exc_info()) 
示例7
def is_valid_user_id(b64_content: str) -> bool:
        """
        Check potential token to see if it contains a valid Discord user ID.

        See: https://discordapp.com/developers/docs/reference#snowflakes
        """
        b64_content = utils.pad_base64(b64_content)

        try:
            decoded_bytes = base64.urlsafe_b64decode(b64_content)
            string = decoded_bytes.decode('utf-8')

            # isdigit on its own would match a lot of other Unicode characters, hence the isascii.
            return string.isascii() and string.isdigit()
        except (binascii.Error, ValueError):
            return False 
示例8
def is_valid_timestamp(b64_content: str) -> bool:
        """
        Return True if `b64_content` decodes to a valid timestamp.

        If the timestamp is greater than the Discord epoch, it's probably valid.
        See: https://i.imgur.com/7WdehGn.png
        """
        b64_content = utils.pad_base64(b64_content)

        try:
            decoded_bytes = base64.urlsafe_b64decode(b64_content)
            timestamp = int.from_bytes(decoded_bytes, byteorder="big")
        except (binascii.Error, ValueError) as e:
            log.debug(f"Failed to decode token timestamp '{b64_content}': {e}")
            return False

        # Seems like newer tokens don't need the epoch added, but add anyway since an upper bound
        # is not checked.
        if timestamp + TOKEN_EPOCH >= DISCORD_EPOCH:
            return True
        else:
            log.debug(f"Invalid token timestamp '{b64_content}': smaller than Discord epoch")
            return False 
示例9
def parse_netntlm_resp_msg(headers, resp_header, seq):
    '''
    Parse the client response to the challenge
    '''
    try:
        header_val3 = headers[resp_header]
    except KeyError:
        return
    header_val3 = header_val3.split(' ', 1)

    # The header value can either start with NTLM or Negotiate
    if header_val3[0] == 'NTLM' or header_val3[0] == 'Negotiate':
        try:
            msg3 = base64.decodestring(header_val3[1])
        except binascii.Error:
            return
        return parse_ntlm_resp(msg3, seq) 
示例10
def parse_netntlm_resp_msg(headers, resp_header, seq):
    '''
    Parse the client response to the challenge
    '''
    try:
        header_val3 = headers[resp_header]
    except KeyError:
        return
    header_val3 = header_val3.split(' ', 1)

    # The header value can either start with NTLM or Negotiate
    if header_val3[0] == 'NTLM' or header_val3[0] == 'Negotiate':
        try:
            msg3 = base64.decodestring(header_val3[1])
        except binascii.Error:
            return
        return parse_ntlm_resp(msg3, seq) 
示例11
def load_user_from_auth_header(header_val):
    if header_val.startswith('Basic '):
        header_val = header_val.replace('Basic ', '', 1)
    basic_username = basic_password = ''
    try:
        header_val = base64.b64decode(header_val).decode('utf-8')
        basic_username = header_val.split(':')[0]
        basic_password = header_val.split(':')[1]
    except (TypeError, UnicodeDecodeError, binascii.Error):
        pass
    user = _fetch_user_by_name(basic_username)
    if user and config.config_login_type == constants.LOGIN_LDAP and services.ldap:
        if services.ldap.bind_user(str(user.password), basic_password):
            return user
    if user and check_password_hash(str(user.password), basic_password):
        return user
    return 
示例12
def _get_user(self, environ):
        user = None
        http_auth = environ.get("HTTP_AUTHORIZATION")
        if http_auth and http_auth.startswith('Basic'):
            auth = http_auth.split(" ", 1)
            if len(auth) == 2:
                try:
                    # b64decode doesn't accept unicode in Python < 3.3
                    # so we need to convert it to a byte string
                    auth = base64.b64decode(auth[1].strip().encode('utf-8'))
                    if PY3:  # b64decode returns a byte string in Python 3
                        auth = auth.decode('utf-8')
                    auth = auth.split(":", 1)
                except TypeError as exc:
                    self.debug("Couldn't get username: %s", exc)
                    return user
                except binascii.Error as exc:
                    self.debug("Couldn't get username: %s", exc)
                    return user
                if len(auth) == 2:
                    user = auth[0]
        return user 
示例13
def b64decode(s, altchars=None):
    """Decode a Base64 encoded string.

    s is the string to decode.  Optional altchars must be a string of at least
    length 2 (additional characters are ignored) which specifies the
    alternative alphabet used instead of the '+' and '/' characters.

    The decoded string is returned.  A TypeError is raised if s were
    incorrectly padded or if there are non-alphabet characters present in the
    string.
    """
    if altchars is not None:
        s = _translate(s, {altchars[0]: '+', altchars[1]: '/'})
    try:
        return binascii.a2b_base64(s)
    except binascii.Error, msg:
        # Transform this exception for consistency
        raise TypeError(msg) 
示例14
def loads(self, bstruct):
        """
        Given a ``bstruct`` (a bytestring), verify the signature and then
        deserialize and return the deserialized value.

        A ``ValueError`` will be raised if the signature fails to validate.
        """
        try:
            b64padding = b'=' * (-len(bstruct) % 4)
            fstruct = base64.urlsafe_b64decode(bytes_(bstruct) + b64padding)
        except (binascii.Error, TypeError) as e:
            raise ValueError('Badly formed base64 data: %s' % e)

        cstruct = fstruct[self.digest_size:]
        expected_sig = fstruct[:self.digest_size]

        sig = hmac.new(
            self.salted_secret, bytes_(cstruct), self.digestmod).digest()

        if strings_differ(sig, expected_sig):
            raise ValueError('Invalid signature')

        return self.serializer.loads(cstruct) 
示例15
def b64decode(s, altchars=None):
    """Decode a Base64 encoded string.

    s is the string to decode.  Optional altchars must be a string of at least
    length 2 (additional characters are ignored) which specifies the
    alternative alphabet used instead of the '+' and '/' characters.

    The decoded string is returned.  A TypeError is raised if s is
    incorrectly padded.  Characters that are neither in the normal base-64
    alphabet nor the alternative alphabet are discarded prior to the padding
    check.
    """
    if altchars is not None:
        s = s.translate(string.maketrans(altchars[:2], '+/'))
    try:
        return binascii.a2b_base64(s)
    except binascii.Error, msg:
        # Transform this exception for consistency
        raise TypeError(msg) 
示例16
def _get_policies(self, access_control_policy_ids):

        def get_policy(aid):
            try:
                return self.api.handle_onem2m_request(
                    OneM2MRequest(
                        OneM2MOperation.retrieve, aid, fr=self._abs_cse_id
                    )
                ).get().content
            except OneM2MErrorResponse as error:
                if error.response_status_code == STATUS_NOT_FOUND:
                    self.logger.debug("Policy '%s' NOT FOUND.", aid)
                else:
                    self.logger.debug("Error getting policy: %s:", error)
                return None

        return [_f for _f in map(get_policy, access_control_policy_ids) if _f]

    # def _notify_das_server(self, notify_uri, payload): 
示例17
def b64decode(s, altchars=None):
    """Decode a Base64 encoded string.

    s is the string to decode.  Optional altchars must be a string of at least
    length 2 (additional characters are ignored) which specifies the
    alternative alphabet used instead of the '+' and '/' characters.

    The decoded string is returned.  A TypeError is raised if s were
    incorrectly padded or if there are non-alphabet characters present in the
    string.
    """
    if altchars is not None:
        s = _translate(s, {altchars[0]: '+', altchars[1]: '/'})
    try:
        return binascii.a2b_base64(s)
    except binascii.Error, msg:
        # Transform this exception for consistency
        raise TypeError(msg) 
示例18
def build_status_header(self):
        """Build org-admin header for internal status delivery."""
        try:
            encoded_auth_header = self._identity_header.get("x-rh-identity")
            identity = json.loads(b64decode(encoded_auth_header))
            account = identity["identity"]["account_number"]

            identity_header = {
                "identity": {
                    "account_number": account,
                    "type": "User",
                    "user": {"username": "cost-mgmt", "email": "cost-mgmt@redhat.com", "is_org_admin": True},
                }
            }
            json_identity = json_dumps(identity_header)
            cost_internal_header = b64encode(json_identity.encode("utf-8"))

            return {"x-rh-identity": cost_internal_header}
        except (binascii.Error, json.JSONDecodeError, TypeError, KeyError) as error:
            LOG.error(f"Unable to build internal status header. Error: {str(error)}") 
示例19
def _handle_request_exception(self, e: BaseException) -> None:
        if isinstance(e, Finish):
            # Not an error; just finish the request without logging.
            if not self._finished:
                self.finish(*e.args)
            return
        try:
            self.log_exception(*sys.exc_info())
        except Exception:
            # An error here should still get a best-effort send_error()
            # to avoid leaking the connection.
            app_log.error("Error in exception logger", exc_info=True)
        if self._finished:
            # Extra errors after the request has been finished should
            # be logged, but there is no reason to continue to try and
            # send a response.
            return
        if isinstance(e, HTTPError):
            self.send_error(e.status_code, exc_info=sys.exc_info())
        else:
            self.send_error(500, exc_info=sys.exc_info()) 
示例20
def b64decode(s, altchars=None, validate=False):
    """Decode the Base64 encoded bytes-like object or ASCII string s.

    Optional altchars must be a bytes-like object or ASCII string of length 2
    which specifies the alternative alphabet used instead of the '+' and '/'
    characters.

    The result is returned as a bytes object.  A binascii.Error is raised if
    s is incorrectly padded.

    If validate is False (the default), characters that are neither in the
    normal base-64 alphabet nor the alternative alphabet are discarded prior
    to the padding check.  If validate is True, these non-alphabet characters
    in the input result in a binascii.Error.
    """
    s = _bytes_from_decode_data(s)
    if altchars is not None:
        altchars = _bytes_from_decode_data(altchars)
        assert len(altchars) == 2, repr(altchars)
        s = s.translate(bytes.maketrans(altchars, b'+/'))
    if validate and not re.match(b'^[A-Za-z0-9+/]*={0,2}$', s):
        raise binascii.Error('Non-base64 digit found')
    return binascii.a2b_base64(s) 
示例21
def get_decoded_data(self):
        """Decode the result data from base64."""
        try:
            return base64.b64decode(self.data)
        except binascii.Error:
            return None 
示例22
def is_valid(token):
        """
        Validate a OTP token.

        :param token: OTP token
        :type token: str

        :return: bool
        """
        try:
            TOTP(token).now()
            return True
        except (binascii.Error, ValueError, TypeError):
            return False 
示例23
def update(self):
        """
            Generate a new OTP based on the same token.
        """
        try:
            self.pin = self.now()
        except binascii.Error:
            self.pin = None 
示例24
def standard_b64decode(s):
    """Decode a byte string encoded with the standard Base64 alphabet.

    s is the byte string to decode.  The decoded byte string is
    returned.  binascii.Error is raised if the input is incorrectly
    padded or if there are non-alphabet characters present in the
    input.
    """
    return b64decode(s) 
示例25
def _load(self, jwt):
        if isinstance(jwt, text_type):
            jwt = jwt.encode('utf-8')

        if not issubclass(type(jwt), binary_type):
            raise DecodeError("Invalid token type. Token must be a {0}".format(
                binary_type))

        try:
            signing_input, crypto_segment = jwt.rsplit(b'.', 1)
            header_segment, payload_segment = signing_input.split(b'.', 1)
        except ValueError:
            raise DecodeError('Not enough segments')

        try:
            header_data = base64url_decode(header_segment)
        except (TypeError, binascii.Error):
            raise DecodeError('Invalid header padding')

        try:
            header = json.loads(header_data.decode('utf-8'))
        except ValueError as e:
            raise DecodeError('Invalid header string: %s' % e)

        if not isinstance(header, Mapping):
            raise DecodeError('Invalid header string: must be a json object')

        try:
            payload = base64url_decode(payload_segment)
        except (TypeError, binascii.Error):
            raise DecodeError('Invalid payload padding')

        try:
            signature = base64url_decode(crypto_segment)
        except (TypeError, binascii.Error):
            raise DecodeError('Invalid crypto padding')

        return (payload, signing_input, header, signature) 
示例26
def decode_b(encoded):
    defects = []
    pad_err = len(encoded) % 4
    if pad_err:
        defects.append(errors.InvalidBase64PaddingDefect())
        padded_encoded = encoded + b'==='[:4-pad_err]
    else:
        padded_encoded = encoded
    try:
        # The validate kwarg to b64decode is not supported in Py2.x
        if not re.match(b'^[A-Za-z0-9+/]*={0,2}$', padded_encoded):
            raise binascii.Error('Non-base64 digit found')
        return base64.b64decode(padded_encoded), defects
    except binascii.Error:
        # Since we had correct padding, this must an invalid char error.
        defects = [errors.InvalidBase64CharactersDefect()]
        # The non-alphabet characters are ignored as far as padding
        # goes, but we don't know how many there are.  So we'll just
        # try various padding lengths until something works.
        for i in 0, 1, 2, 3:
            try:
                return base64.b64decode(encoded+b'='*i), defects
            except (binascii.Error, TypeError):    # Py2 raises a TypeError
                if i==0:
                    defects.append(errors.InvalidBase64PaddingDefect())
        else:
            # This should never happen.
            raise AssertionError("unexpected binascii.Error") 
示例27
def decode_file_data(data):
    # Py3 raises binascii.Error instead of TypeError as in Py27
    try:
        return base64.b64decode(data)
    except (TypeError, binascii.Error):
        raise exception.Base64Exception() 
示例28
def validateUserPasswordDynamoDB(app, username, password):
    """
    validateUserPassword: verify user and password.
        throws exception if not valid
    Note: make this async since we'll eventually need some sort of http request to validate user/passwords
    """
    if getPassword(app, username) is None:
        # look up name in dynamodb table
        dynamodb = getDynamoDBClient(app)
        table_name = config.get("aws_dynamodb_users_table")
        log.info(f"looking for user: {username} in DynamoDB table: {table_name}")
        try:
            response = await dynamodb.get_item(
                TableName=table_name,
                Key={'username': {'S': username}}
            )
        except ClientError as e:
            log.error("Unable to read dyanamodb table: {}".format(e.response['Error']['Message']))
            raise HTTPInternalServerError()  # 500
        if "Item" not in response:
            log.info(f"user: {username} not found")
            raise HTTPUnauthorized()  # 401
        item = response['Item']
        if "password" not in item:
            log.error("Expected to find password key in DynamoDB table")
            raise HTTPInternalServerError()  # 500
        password_item = item["password"]
        if 'S' not in password_item:
            log.error("Expected to find 'S' key for password item")
            raise HTTPInternalServerError()  # 500
        log.debug(f"password: {password_item}")
        if password_item['S'] != password:
            log.warn(f"user password is not valid for user: {username}")
            raise HTTPUnauthorized()  # 401
        # add user/password to user_db map
        setPassword(app, username, password) 
示例29
def _decode_telegram_base64(string):
    """
    Decodes an url-safe base64-encoded string into its bytes
    by first adding the stripped necessary padding characters.

    This is the way Telegram shares binary data as strings,
    such as Bot API-style file IDs or invite links.

    Returns `None` if the input string was not valid.
    """
    try:
        return base64.urlsafe_b64decode(string + '=' * (len(string) % 4))
    except (binascii.Error, ValueError, TypeError):
        return None  # not valid base64, not valid ascii, not a string 
示例30
def _encode_telegram_base64(string):
    """
    Inverse for `_decode_telegram_base64`.
    """
    try:
        return base64.urlsafe_b64encode(string).rstrip(b'=').decode('ascii')
    except (binascii.Error, ValueError, TypeError):
        return None  # not valid base64, not valid ascii, not a string