
def take(self, index, dimension=None):
        """Take entries of tensor along a dimension according to the index.
            This function is identical to torch.take() when dimension=None,
            otherwise, it is identical to ONNX gather() function.
        result = self.shallow_copy()
        index = index.long()
        if dimension is None:
            result.share = torch.take(self.share, index)
            all_indices = [slice(0, x) for x in self.size()]
            all_indices[dimension] = index
            result.share = self.share[all_indices]
        return result

    # negation and reciprocal: 
def test_forward_take():
    class Take1(Module):
        def forward(self, *args):
            indices = torch.tensor([[0,0],[1,0]])
            if torch.cuda.is_available():
                indices = indices.cuda()
            return torch.take(args[0], indices)

    class Take2(Module):
        def forward(self, *args):
            return torch.take(args[0], args[1])

    input_data = torch.tensor([[1,2],[3,4]])
    verify_model(Take1().float().eval(), input_data=input_data)
    indices = torch.tensor([[0,0],[1,0]])
    verify_model(Take2().float().eval(), input_data=[input_data, indices]) 
def _find_max_per_frame(
        nccf: Tensor,
        sample_rate: int,
        freq_high: int
) -> Tensor:
    For each frame, take the highest value of NCCF,
    apply centered median smoothing, and convert to frequency.

    Note: If the max among all the lags is very close
    to the first half of lags, then the latter is taken.

    lag_min = int(math.ceil(sample_rate / freq_high))

    # Find near enough max that is smallest

    best = torch.max(nccf[..., lag_min:], -1)

    half_size = nccf.shape[-1] // 2
    half = torch.max(nccf[..., lag_min:half_size], -1)

    best = _combine_max(half, best)
    indices = best[1]

    # Add back minimal lag
    indices += lag_min
    # Add 1 empirical calibration offset
    indices += 1

    return indices 
def compute_focal_class_loss(anchor_matches, class_pred_logits, gamma=2.):
    """ Focal Loss :math:`FL = -(1-q)^g log(q)` with q = pred class probability, g = gamma hyperparameter.

    :param anchor_matches: (n_anchors). [-1, 0, class] for negative, neutral, and positive matched anchors.
    :param class_pred_logits: (n_anchors, n_classes). logits from classifier sub-network.
    :param gamma: g in above formula, good results with g=2 in original paper.
    :return: loss: torch tensor
    :return: focal loss
    # Positive and Negative anchors contribute to the loss but neutral anchors (match value = 0) don't.
    pos_indices = torch.nonzero(anchor_matches > 0).squeeze(-1) # dim=-1 instead of 1 or 0 to cover empty matches.
    neg_indices = torch.nonzero(anchor_matches == -1).squeeze(-1)
    target_classes  = (anchor_matches[pos_indices].long(), torch.LongTensor([0] * neg_indices.shape[0]).cuda()) )

    non_neutral_indices = (pos_indices, neg_indices) )
    # q shape: (n_non_neutral_anchors, n_classes)
    q = F.softmax(class_pred_logits[non_neutral_indices], dim=1)

    # one-hot encoded target classes: keep only the pred probs of the correct class.
    # that class will receive the incentive to be maximized.
    # log(q_i) where i = target class --> FL shape (n_anchors,)
    # need to transform to indices into flattened tensor to use torch.take
    target_locs_flat = q.shape[1] * torch.arange(q.shape[0]).cuda() + target_classes
    q = torch.take(q, target_locs_flat)

    FL = torch.log(q) # element-wise log
    FL *= -(1.-q)**gamma

    # take mean over all considered anchors
    FL = FL.sum() / FL.shape[0]
    return FL 
def split(self, data: torch.Tensor) -> torch.Tensor:
        ret = torch.take(data, self._sorted_indices)
        assert ret.dtype not in {torch.int8, torch.int16, torch.int32, torch.int64}, \
            'tensor cannot be any type of int, recommended to use float32'
        ret.masked_fill_(self._padding_mask, np.nan)
        return ret 
def revert(self, split_data: torch.Tensor, dbg_str='None') -> torch.Tensor:
        if tuple(split_data.shape) != self._data_shape:
            if tuple(split_data.shape[:2]) == self._data_shape[:2]:
                raise ValueError('The downstream needs shape{2}, and the input factor "{1}" is '
                                 'shape{0}. Look like this factor has multiple return values, '
                                 'use slice to select a value before using it, for example: '
                                 .format(tuple(split_data.shape), dbg_str, self._data_shape))
                raise ValueError('The return data shape{} of Factor `{}` must same as input{}.'
                                 .format(tuple(split_data.shape), dbg_str, self._data_shape))
        return torch.take(split_data, self._inverse_indices) 
def quantile(data, bins, dim=1):
    if data.dtype == torch.bool:
        data = data.char()
    if data.shape[1] == 1:  # if only one asset in universe
        return data.new_full(data.shape, 0, dtype=torch.float32)

    x, _ = torch.sort(data, dim=dim)
    # get non-nan size of each row
    mask = torch.isnan(data)
    act_size = data.shape[dim] - mask.sum(dim=dim)
    # get each bin's cut indices of each row by non-nan size
    q = torch.linspace(0, 1, bins + 1, device=data.device)
    q = q.view(-1, *[1 for _ in range(dim)])
    q_index = q * (act_size - 1)
    # calculate un-perfect cut weight
    q_weight = q % 1
    q_index = q_index.long()
    q_next = q_index + 1
    q_next[-1] = act_size - 1

    # get quantile values of each row
    dim_len = data.shape[dim]
    offset = torch.arange(0, q_index[0].nelement(), device=data.device) * dim_len
    offset = offset.reshape(q_index[0].shape)
    q_index += offset
    q_next += offset
    b_start = x.take(q_index)
    b_end = x.take(q_next)
    b = b_start + (b_end - b_start) * q_weight
    b[0] -= 1
    b = b.unsqueeze(-1)

    ret = data.new_full(data.shape, np.nan, dtype=torch.float32)
    for start, end, tile in zip(b[:-1], b[1:], range(bins)):
        ret[(data > start) & (data <= end)] = tile
    return ret 
def finalize(self):
        Finalize training with LU factorization or Pseudo-inverse
        # Reshape average
        xTx, avg, tlen = self._fix(self.xTx, self.xTx_avg, self.tlen)

        # Reshape
        self.avg = avg.unsqueeze(0)

        # We need more observations than variables
        if self.tlen < self.input_dim:
            raise Exception(u"The number of observations ({}) is larger than  the number of input variables ({})".format(self.tlen, self.input_dim))
        # end if

        # Total variance
        total_var = torch.diag(xTx).sum()

        # Compute and sort eigenvalues
        d, v = torch.symeig(xTx, eigenvectors=True)

        # Check for negative eigenvalues
        if float(d.min()) < 0:
            # raise Exception(u"Got negative eigenvalues ({}). You may either set output_dim to be smaller".format(d))
        # end if

        # Indexes
        indexes = range(d.size(0)-1, -1, -1)

        # Sort by descending order
        d = torch.take(d, Variable(torch.LongTensor(indexes)))
        v = v[:, indexes]

        # Explained covariance
        self.explained_variance = torch.sum(d) / total_var

        # Store eigenvalues
        self.d = d[:self.output_dim]

        # Store eigenvectors
        self.v = v[:, :self.output_dim]

        # Total variance
        self.total_variance = total_var

        # Stop training
    # end finalize

    # Get explained variance