beancount.ops

对核心模块中定义的条目执行操作。

此包包含多种对条目列表进行操作的函数。

beancount.ops.balance

自动填充条目之间的空白。

beancount.ops.balance.BalanceError (元组)

BalanceError(source, message, entry)

beancount.ops.balance.BalanceError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/ops/balance.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.balance.BalanceError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 BalanceError(source, message, entry) 的新实例

beancount.ops.balance.BalanceError.__replace__(/, self, **kwds) 特殊

返回一个新的 BalanceError 对象,用指定的新值替换字段

源代码位于 beancount/ops/balance.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.balance.BalanceError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/ops/balance.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.balance.check(entries, options_map)

处理余额断言指令。

对于每个 Balance 指令,检查其预期余额是否与当时计算的实际余额一致,并将失败的指令替换为带有失败标志的新指令。

参数:
  • entries – 一组指令列表。

  • options_map – 一个字典,包含从输入文件解析的选项。

返回:
  • 一个包含指令列表和余额检查错误列表的元组。

源代码位于 beancount/ops/balance.py
def check(entries, options_map):
    """Process the balance assertion directives.

    For each Balance directive, check that their expected balance corresponds to
    the actual balance computed at that time and replace failing ones by new
    ones with a flag that indicates failure.

    Args:
      entries: A list of directives.
      options_map: A dict of options, parsed from the input file.
    Returns:
      A pair of a list of directives and a list of balance check errors.
    """
    new_entries = []
    check_errors = []

    # This is similar to realization, but performed in a different order, and
    # where we only accumulate inventories for accounts that have balance
    # assertions in them (this saves on time). Here we process the entries one
    # by one along with the balance checks. We use a temporary realization in
    # order to hold the incremental tree of balances, so that we can easily get
    # the amounts of an account's subaccounts for making checks on parent
    # accounts.
    real_root = realization.RealAccount('')

    # Figure out the set of accounts for which we need to compute a running
    # inventory balance.
    asserted_accounts = {entry.account
                         for entry in entries
                         if isinstance(entry, Balance)}

    # Add all children accounts of an asserted account to be calculated as well,
    # and pre-create these accounts, and only those (we're just being tight to
    # make sure).
    asserted_match_list = [account.parent_matcher(account_)
                           for account_ in asserted_accounts]
    for account_ in getters.get_accounts(entries):
        if (account_ in asserted_accounts or
            any(match(account_) for match in asserted_match_list)):
            realization.get_or_create(real_root, account_)

    # Get the Open directives for each account.
    open_close_map = getters.get_account_open_close(entries)

    for entry in entries:
        if isinstance(entry, Transaction):
            # For each of the postings' accounts, update the balance inventory.
            for posting in entry.postings:
                real_account = realization.get(real_root, posting.account)

                # The account will have been created only if we're meant to track it.
                if real_account is not None:
                    # Note: Always allow negative lots for the purpose of balancing.
                    # This error should show up somewhere else than here.
                    real_account.balance.add_position(posting)

        elif isinstance(entry, Balance):
            # Check that the currency of the balance check is one of the allowed
            # currencies for that account.
            expected_amount = entry.amount
            try:
                open, _ = open_close_map[entry.account]
            except KeyError:
                check_errors.append(
                    BalanceError(entry.meta,
                                 "Invalid reference to unknown account '{}'".format(
                                     entry.account), entry))
                continue

            if (expected_amount is not None and
                open and open.currencies and
                expected_amount.currency not in open.currencies):
                check_errors.append(
                    BalanceError(entry.meta,
                                 "Invalid currency '{}' for Balance directive: ".format(
                                     expected_amount.currency),
                                 entry))

            # Sum up the current balances for this account and its
            # sub-accounts. We want to support checks for parent accounts
            # for the total sum of their subaccounts.
            #
            # FIXME: Improve the performance further by computing the balance
            # for the desired currency only. This won't allow us to cache in
            # this way but may be faster, if we're not asserting all the
            # currencies. Furthermore, we could probably avoid recomputing the
            # balance if a subtree of positions hasn't been invalidated by a new
            # position added to the realization. Do this.
            real_account = realization.get(real_root, entry.account)
            assert real_account is not None, "Missing {}".format(entry.account)
            subtree_balance = realization.compute_balance(real_account, leaf_only=False)

            # Get only the amount in the desired currency.
            balance_amount = subtree_balance.get_currency_units(expected_amount.currency)

            # Check if the amount is within bounds of the expected amount.
            diff_amount = amount.sub(balance_amount, expected_amount)

            # Use the specified tolerance or automatically infer it.
            tolerance = get_balance_tolerance(entry, options_map)

            if abs(diff_amount.number) > tolerance:
                check_errors.append(
                    BalanceError(entry.meta,
                                 ("Balance failed for '{}': "
                                  "expected {} != accumulated {} ({} {})").format(
                                      entry.account, expected_amount, balance_amount,
                                      abs(diff_amount.number),
                                      ('too much'
                                       if diff_amount.number > 0
                                       else 'too little')),
                                 entry))

                # Substitute the entry by a failing entry, with the diff_amount
                # field set on it. I'm not entirely sure that this is the best
                # of ideas, maybe leaving the original check intact and insert a
                # new error entry might be more functional or easier to
                # understand.
                entry = entry._replace(
                    meta=entry.meta.copy(),
                    diff_amount=diff_amount)

        new_entries.append(entry)

    return new_entries, check_errors

beancount.ops.balance.get_balance_tolerance(balance_entry, options_map)

获取单个条目的容差金额。

参数:
  • balance_entry – data.Balance 的一个实例

  • options_map – 一个选项字典,格式如解析器所要求。

返回:
  • 一个 Decimal 类型的值,表示该指令所隐含的容差金额。

源代码位于 beancount/ops/balance.py
def get_balance_tolerance(balance_entry, options_map):
    """Get the tolerance amount for a single entry.

    Args:
      balance_entry: An instance of data.Balance
      options_map: An options dict, as per the parser.
    Returns:
      A Decimal, the amount of tolerance implied by the directive.
    """
    if balance_entry.tolerance is not None:
        # Use the balance-specific tolerance override if it is provided.
        tolerance = balance_entry.tolerance

    else:
        expo = balance_entry.amount.number.as_tuple().exponent
        if expo < 0:
            # Be generous and always allow twice the multiplier on Balance and
            # Pad because the user creates these and the rounding of those
            # balances may often be further off than those used within a single
            # transaction.
            tolerance = options_map["inferred_tolerance_multiplier"] * 2
            tolerance = ONE.scaleb(expo) * tolerance
        else:
            tolerance = ZERO

    return tolerance

beancount.ops.basicops

对条目列表进行基本过滤和聚合操作。

本模块包含一些较为复杂的常用条目操作,因其复杂性而不适合放在 core/data.py 中。

生成所有具有指定链接的条目。

参数:
  • link – 一个字符串,表示感兴趣的链接。

生成:所有链接到 'link' 的条目。

源代码位于 beancount/ops/basicops.py
def filter_link(link, entries):
    """Yield all the entries which have the given link.

    Args:
      link: A string, the link we are interested in.
    Yields:
      Every entry in 'entries' that links to 'link.
    """
    for entry in entries:
        if (isinstance(entry, data.Transaction) and
            entry.links and link in entry.links):
            yield entry

beancount.ops.basicops.filter_tag(tag, entries)

生成所有具有指定标签的条目。

参数:
  • tag – 一个字符串,表示感兴趣的标签。

返回:'entries' 中所有标记为 'tag' 的条目。

源代码位于 beancount/ops/basicops.py
def filter_tag(tag, entries):
    """Yield all the entries which have the given tag.

    Args:
      tag: A string, the tag we are interested in.
    Yields:
      Every entry in 'entries' that tags to 'tag.
    """
    for entry in entries:
        if (isinstance(entry, data.Transaction) and
            entry.tags and
            tag in entry.tags):
            yield entry

beancount.ops.basicops.get_common_accounts(entries)

计算给定条目中账户的交集。

参数:
  • entries – 要处理的交易条目列表。

返回:
  • 一个字符串集合,包含这些条目中的公共账户名称。

源代码位于 beancount/ops/basicops.py
def get_common_accounts(entries):
    """Compute the intersection of the accounts on the given entries.

    Args:
      entries: A list of Transaction entries to process.
    Returns:
      A set of strings, the names of the common accounts from these
      entries.
    """
    assert all(isinstance(entry, data.Transaction) for entry in entries)

    # If there is a single entry, the common accounts to it is all its accounts.
    # Note that this also works with no entries (yields an empty set).
    if len(entries) < 2:
        if entries:
            intersection = {posting.account for posting in entries[0].postings}
        else:
            intersection = set()
    else:
        entries_iter = iter(entries)
        intersection = set(posting.account for posting in next(entries_iter).postings)
        for entry in entries_iter:
            accounts = set(posting.account for posting in entry.postings)
            intersection &= accounts
            if not intersection:
                break
    return intersection

按链接对条目列表进行分组。

参数:
  • entries – 要处理的指令/交易列表。

返回:
  • 一个字典,键为链接名称,值为对应的条目列表。

源代码位于 beancount/ops/basicops.py
def group_entries_by_link(entries):
    """Group the list of entries by link.

    Args:
      entries: A list of directives/transactions to process.
    Returns:
      A dict of link-name to list of entries.
    """
    link_groups = defaultdict(list)
    for entry in entries:
        if not (isinstance(entry, data.Transaction) and entry.links):
            continue
        for link in entry.links:
            link_groups[link].append(entry)
    return link_groups

beancount.ops.compress

将多个条目压缩为单个条目。

在导入过程中,此功能可用于压缩有效输出,尤其适用于具有大量相似条目的账户。例如,我曾有一个交易账户,每天都会支付利息。我并不希望导入这些每日利息的全部明细,而将这些仅含利息的条目压缩为月度条目更为合理。以下代码即用于实现此功能。

beancount.ops.compress.compress(entries, predicate)

将多个交易合并为单个交易。

将满足指定谓词的连续交易条目序列替换为一个在最后一个匹配条目日期处的新条目。'predicate' 是用于判断条目是否应被压缩的函数。

此功能可用于简化频繁发生且相似的交易列表。例如,在零售外汇交易账户中,每天都会支付金额极小的差额利息;除非存在其他交易,否则无需查看这些利息的详细信息。您可以使用此功能将这些利息压缩为其他类型交易之间的单个条目。

参数:
  • entries – 一组指令列表。

  • predicate – 一个可调用对象,接受一个条目并返回 True(若该条目应被压缩)。

返回:
  • 一个指令列表,其中可压缩的交易已被其摘要等效项替换。

源代码位于 beancount/ops/compress.py
def compress(entries, predicate):
    """Compress multiple transactions into single transactions.

    Replace consecutive sequences of Transaction entries that fulfill the given
    predicate by a single entry at the date of the last matching entry.
    'predicate' is the function that determines if an entry should be
    compressed.

    This can be used to simply a list of transactions that are similar and occur
    frequently. As an example, in a retail FOREX trading account, differential
    interest of very small amounts is paid every day; it is not relevant to look
    at the full detail of this interest unless there are other transactions. You
    can use this to compress it into single entries between other types of
    transactions.

    Args:
      entries: A list of directives.
      predicate: A callable which accepts an entry and return true if the entry
          is intended to be compressed.
    Returns:
      A list of directives, with compressible transactions replaced by a summary
      equivalent.
    """
    new_entries = []
    pending = []
    for entry in entries:
        if isinstance(entry, data.Transaction) and predicate(entry):
            # Save for compressing later.
            pending.append(entry)
        else:
            # Compress and output all the pending entries.
            if pending:
                new_entries.append(merge(pending, pending[-1]))
                pending.clear()

            # Output the differing entry.
            new_entries.append(entry)

    if pending:
        new_entries.append(merge(pending, pending[-1]))

    return new_entries

beancount.ops.compress.merge(entries, prototype_txn)

将一组交易的过账项合并为单个交易。

将给定条目的过账项合并为一个新条目,其交易属性继承自原型。返回新条目。若过账项的所有属性均相同,仅金额不同,则合并这些过账项。

参数:
  • entries – 一组指令列表。

  • prototype_txn – 用于创建压缩后交易实例的交易对象。其过账项列表将被忽略。

返回:
  • 一个新的交易实例,包含所有输入条目的过账项合并后的结果。

源代码位于 beancount/ops/compress.py
def merge(entries, prototype_txn):
    """Merge the postings of a list of Transactions into a single one.

    Merge postings the given entries into a single entry with the Transaction
    attributes of the prototype. Return the new entry. The combined list of
    postings are merged if everything about the postings is the same except the
    number.

    Args:
      entries: A list of directives.
      prototype_txn: A Transaction which is used to create the compressed
          Transaction instance. Its list of postings is ignored.
    Returns:
      A new Transaction instance which contains all the postings from the input
      entries merged together.

    """
    # Aggregate the postings together. This is a mapping of numberless postings
    # to their number of units.
    postings_map = collections.defaultdict(Decimal)
    for entry in data.filter_txns(entries):
        for posting in entry.postings:
            # We strip the number off the posting to act as an aggregation key.
            key = data.Posting(posting.account,
                               Amount(None, posting.units.currency),
                               posting.cost,
                               posting.price,
                               posting.flag,
                               None)
            postings_map[key] += posting.units.number

    # Create a new transaction with the aggregated postings.
    new_entry = data.Transaction(prototype_txn.meta,
                                 prototype_txn.date,
                                 prototype_txn.flag,
                                 prototype_txn.payee,
                                 prototype_txn.narration,
                                 data.EMPTY_SET, data.EMPTY_SET, [])

    # Sort for at least some stability of output.
    sorted_items = sorted(postings_map.items(),
                          key=lambda item: (item[0].account,
                                            item[0].units.currency,
                                            item[1]))

    # Issue the merged postings.
    for posting, number in sorted_items:
        units = Amount(number, posting.units.currency)
        new_entry.postings.append(
            data.Posting(posting.account, units, posting.cost, posting.price,
                         posting.flag, posting.meta))

    return new_entry

beancount.ops.documents

与创建 Document 指令相关的所有内容。

beancount.ops.documents.DocumentError (元组)

DocumentError(source, message, entry)

beancount.ops.documents.DocumentError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/ops/documents.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.documents.DocumentError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 DocumentError(source, message, entry) 的新实例

beancount.ops.documents.DocumentError.__replace__(/, self, **kwds) 特殊

返回一个新的 DocumentError 对象,用指定的新值替换某些字段

源代码位于 beancount/ops/documents.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.documents.DocumentError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/ops/documents.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.documents.find_documents(directory, input_filename, accounts_only=None, strict=False)

在给定目录下查找带日期的文档文件。

如果在 'accounts_only' 中提供了账户限制集合,则仅返回与指定账户之一对应的条目。

参数:
  • directory – 一个字符串,表示要搜索的目录层次结构的根目录名称。

  • input_filename – 用于 Document 指令的文件名称。此名称也用于解析相对目录路径。

  • accounts_only – 一个有效的账户字符串集合,用于搜索。

  • strict – 一个布尔值,当设置为 True 时,若在 accounts_only 未包含的账户中发现文档,则生成错误。仅当指定了 accounts_only 时此参数有效。

返回:
  • 一个包含从找到的文件创建的新 Document 对象的列表,以及在此过程中生成的错误列表。

源代码位于 beancount/ops/documents.py
def find_documents(directory, input_filename, accounts_only=None, strict=False):
    """Find dated document files under the given directory.

    If a restricting set of accounts is provided in 'accounts_only', only return
    entries that correspond to one of the given accounts.

    Args:
      directory: A string, the name of the root of the directory hierarchy to be searched.
      input_filename: The name of the file to be used for the Document directives. This is
        also used to resolve relative directory names.
      accounts_only: A set of valid accounts strings to search for.
      strict: A boolean, set to true if you want to generate errors on documents
        found in accounts not provided in accounts_only. This is only meaningful
        if accounts_only is specified.
    Returns:
      A list of new Document objects that were created from the files found, and a list
      of new errors generated.

    """
    errors = []

    # Compute the documents directory name relative to the beancount input
    # file itself.
    if not path.isabs(directory):
        input_directory = path.dirname(input_filename)
        directory = path.abspath(path.normpath(path.join(input_directory,
                                                         directory)))

    # If the directory does not exist, just generate an error and return.
    if not path.exists(directory):
        meta = data.new_metadata(input_filename, 0)
        error = DocumentError(
            meta, "Document root '{}' does not exist".format(directory), None)
        return ([], [error])

    # Walk the hierarchy of files.
    entries = []
    for root, account_name, dirs, files in account.walk(directory):

        # Look for files that have a dated filename.
        for filename in files:
            match = re.match(r'(\d\d\d\d)-(\d\d)-(\d\d).(.*)', filename)
            if not match:
                continue

            # If a restricting set of accounts was specified, skip document
            # directives found in accounts with no corresponding account name.
            if accounts_only is not None and not account_name in accounts_only:
                if strict:
                    if any(account_name.startswith(account) for account in accounts_only):
                        errors.append(DocumentError(
                            data.new_metadata(input_filename, 0),
                            "Document '{}' found in child account {}".format(
                                filename, account_name), None))
                    elif any(account.startswith(account_name) for account in accounts_only):
                        errors.append(DocumentError(
                            data.new_metadata(input_filename, 0),
                            "Document '{}' found in parent account {}".format(
                                filename, account_name), None))
                continue

            # Create a new directive.
            meta = data.new_metadata(input_filename, 0)
            try:
                date = datetime.date(*map(int, match.group(1, 2, 3)))
            except ValueError as exc:
                errors.append(DocumentError(
                    data.new_metadata(input_filename, 0),
                    "Invalid date on document file '{}': {}".format(
                        filename, exc), None))
            else:
                entry = data.Document(meta, date, account_name, path.join(root, filename),
                                      data.EMPTY_SET, data.EMPTY_SET)
                entries.append(entry)

    return (entries, errors)

beancount.ops.documents.process_documents(entries, options_map)

检查文件中的文档指令,并自动创建文档指令。

参数:
  • entries – 从文件中解析出的所有指令列表。

  • options_map – 一个选项字典,由解析器输出。我们使用其 'filename' 选项来确定搜索文档的相对路径。

返回:
  • 一个包含所有条目(包括新增的)的列表,以及在创建文档指令过程中生成的错误列表。

源代码位于 beancount/ops/documents.py
def process_documents(entries, options_map):
    """Check files for document directives and create documents directives automatically.

    Args:
      entries: A list of all directives parsed from the file.
      options_map: An options dict, as is output by the parser.
        We're using its 'filename' option to figure out relative path to
        search for documents.
    Returns:
      A pair of list of all entries (including new ones), and errors
      generated during the process of creating document directives.
    """
    filename = options_map["filename"]

    # Detect filenames that should convert into entries.
    autodoc_entries = []
    autodoc_errors = []
    document_dirs = options_map['documents']
    if document_dirs:
        # Restrict to the list of valid accounts only.
        accounts = getters.get_accounts(entries)

        # Accumulate all the entries.
        for directory in map(path.normpath, document_dirs):
            new_entries, new_errors = find_documents(directory, filename, accounts)
            autodoc_entries.extend(new_entries)
            autodoc_errors.extend(new_errors)

    # Merge the two lists of entries and errors. Keep the entries sorted.
    entries.extend(autodoc_entries)
    entries.sort(key=data.entry_sortkey)

    return (entries, autodoc_errors)

beancount.ops.documents.verify_document_files_exist(entries, unused_options_map)

验证文档条目指向的文件是否存在。

参数:
  • entries – 需要验证其文档的指令列表。

  • unused_options_map – 解析器选项字典。本函数不使用它。

返回:
  • 相同的条目列表,以及在验证过程中遇到的任何新错误列表。

源代码位于 beancount/ops/documents.py
def verify_document_files_exist(entries, unused_options_map):
    """Verify that the document entries point to existing files.

    Args:
      entries: a list of directives whose documents need to be validated.
      unused_options_map: A parser options dict. We're not using it.
    Returns:
      The same list of entries, and a list of new errors, if any were encountered.
    """
    errors = []
    for entry in entries:
        if not isinstance(entry, data.Document):
            continue
        if not path.exists(entry.filename):
            errors.append(
                DocumentError(entry.meta,
                              'File does not exist: "{}"'.format(entry.filename),
                              entry))
    return entries, errors

beancount.ops.find_prices

一个用于从字符串和文件创建价格获取任务的代码库。

beancount.ops.find_prices.find_balance_currencies(entries, date=None)

返回与指定日期相关的货币。

此函数计算截至该日期的账户余额,并返回以下两者的并集:a) 以成本价持有的货币,以及 b) 之前的兑换货币对,但仅限余额非零的货币。

此函数旨在根据历史记录,生成在特定日期相关的货币列表。

参数:
  • entries – 一组指令列表。

  • date – 一个 datetime.date 实例。

返回:
  • 一组(基础货币,报价货币)货币对。

源代码位于 beancount/ops/find_prices.py
def find_balance_currencies(entries, date=None):
    """Return currencies relevant for the given date.

    This computes the account balances as of the date, and returns the union of:
    a) The currencies held at cost, and
    b) Currency pairs from previous conversions, but only for currencies with
       non-zero balances.

    This is intended to produce the list of currencies whose prices are relevant
    at a particular date, based on previous history.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A set of (base, quote) currencies.
    """
    # Compute the balances.
    currencies = set()
    currencies_on_books = set()
    balances, _ = summarize.balance_by_account(entries, date)
    for _, balance in balances.items():
        for pos in balance:
            if pos.cost is not None:
                # Add currencies held at cost.
                currencies.add((pos.units.currency, pos.cost.currency))
            else:
                # Add regular currencies.
                currencies_on_books.add(pos.units.currency)

    # Create currency pairs from the currencies which are on account balances.
    # In order to figure out the quote currencies, we use the list of price
    # conversions until this date.
    converted = (find_currencies_converted(entries, date) |
                 find_currencies_priced(entries, date))
    for cbase in currencies_on_books:
        for base_quote in converted:
            base, quote = base_quote
            if base == cbase:
                currencies.add(base_quote)

    return currencies

beancount.ops.find_prices.find_currencies_at_cost(entries, date=None)

返回所有曾经以成本价持有的货币。

此函数返回所有货币,即使在特定时间点未出现在账本中。此代码不检查账户余额。

参数:
  • entries – 一组指令列表。

  • date – 一个 datetime.date 实例。

返回:
  • 一个(基础货币,报价货币)货币对的列表。

源代码位于 beancount/ops/find_prices.py
def find_currencies_at_cost(entries, date=None):
    """Return all currencies that were held at cost at some point.

    This returns all of them, even if not on the books at a particular point in
    time. This code does not look at account balances.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A list of (base, quote) currencies.
    """
    currencies = set()
    for entry in entries:
        if not isinstance(entry, data.Transaction):
            continue
        if date and entry.date >= date:
            break
        for posting in entry.postings:
            if posting.cost is not None and posting.cost.number is not None:
                currencies.add((posting.units.currency, posting.cost.currency))
    return currencies

beancount.ops.find_prices.find_currencies_converted(entries, date=None)

返回来自价格转换的货币。

此函数检查截至某个日期发生的所有价格转换,并生成一个列表。注意:此函数不包含 Price 指令,仅包含带有价格转换的条目。

参数:
  • entries – 一组指令列表。

  • date – 一个 datetime.date 实例。

返回:
  • 一个(基础货币,报价货币)货币对的列表。

源代码位于 beancount/ops/find_prices.py
def find_currencies_converted(entries, date=None):
    """Return currencies from price conversions.

    This function looks at all price conversions that occurred until some date
    and produces a list of them. Note: This does not include Price directives,
    only postings with price conversions.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A list of (base, quote) currencies.
    """
    currencies = set()
    for entry in entries:
        if not isinstance(entry, data.Transaction):
            continue
        if date and entry.date >= date:
            break
        for posting in entry.postings:
            price = posting.price
            if posting.cost is not None or price is None:
                continue
            currencies.add((posting.units.currency, price.currency))
    return currencies

beancount.ops.find_prices.find_currencies_priced(entries, date=None)

返回在 Price 指令中出现的货币。

参数:
  • entries – 一组指令列表。

  • date – 一个 datetime.date 实例。

返回:
  • 一个(基础货币,报价货币)货币对的列表。

源代码位于 beancount/ops/find_prices.py
def find_currencies_priced(entries, date=None):
    """Return currencies seen in Price directives.

    Args:
      entries: A list of directives.
      date: A datetime.date instance.
    Returns:
      A list of (base, quote) currencies.
    """
    currencies = set()
    for entry in entries:
        if not isinstance(entry, data.Price):
            continue
        if date and entry.date >= date:
            break
        currencies.add((entry.currency, entry.amount.currency))
    return currencies

beancount.ops.lifetimes

给定一个 Beancount 账本,计算每种商品持有的时间区间。

此脚本为每种商品计算其所需的时间区间,可用于确定需要获取价格的日期列表,以便正确填充价格数据库。

beancount.ops.lifetimes.compress_intervals_days(intervals, num_days)

压缩日期对列表,忽略短时间的未使用天数。

参数:
  • intervals — 一组 datetime.date 实例的元组列表。

  • num_days — 整数,表示两个区间之间必须间隔的最少未使用天数。

返回:
  • 一个新的生命周期映射字典,其中部分区间可能已被合并。

源代码位于 beancount/ops/lifetimes.py
def compress_intervals_days(intervals, num_days):
    """Compress a list of date pairs to ignore short stretches of unused days.

    Args:
      intervals: A list of pairs of datetime.date instances.
      num_days: An integer, the number of unused days to require for intervals
        to be distinct, to allow a gap.
    Returns:
      A new dict of lifetimes map where some intervals may have been joined.
    """
    ignore_interval = datetime.timedelta(days=num_days)
    new_intervals = []
    iter_intervals = iter(intervals)
    last_begin, last_end = next(iter_intervals)
    for date_begin, date_end in iter_intervals:
        if date_begin - last_end < ignore_interval:
            # Compress.
            last_end = date_end
            continue
        new_intervals.append((last_begin, last_end))
        last_begin, last_end = date_begin, date_end
    new_intervals.append((last_begin, last_end))
    return new_intervals

beancount.ops.lifetimes.compress_lifetimes_days(lifetimes_map, num_days)

压缩生命周期映射,忽略短时间的未使用天数。

参数:
  • lifetimes_map — 由 get_commodity_lifetimes 返回的货币区间字典。

  • num_days — 整数,表示要忽略的未使用天数。

返回:
  • 一个新的生命周期映射字典,其中部分区间可能已被合并。

源代码位于 beancount/ops/lifetimes.py
def compress_lifetimes_days(lifetimes_map, num_days):
    """Compress a lifetimes map to ignore short stretches of unused days.

    Args:
      lifetimes_map: A dict of currency intervals as returned by get_commodity_lifetimes.
      num_days: An integer, the number of unused days to ignore.
    Returns:
      A new dict of lifetimes map where some intervals may have been joined.
    """
    return {currency_pair: compress_intervals_days(intervals, num_days)
            for currency_pair, intervals in lifetimes_map.items()}

beancount.ops.lifetimes.get_commodity_lifetimes(entries)

给定一组指令,确定每种商品的存续期。

参数:
  • entries – 一组指令列表。

返回:
  • 一个字典,键为(货币,成本货币)的商品字符串,值为(开始,结束)datetime.date 对的列表。日期包含商品被观察到的当天;结束/最后日期为最后观察到日期的后一天。

源代码位于 beancount/ops/lifetimes.py
def get_commodity_lifetimes(entries):
    """Given a list of directives, figure out the life of each commodity.

    Args:
      entries: A list of directives.
    Returns:
      A dict of (currency, cost-currency) commodity strings to lists of (start,
      end) datetime.date pairs. The dates are inclusive of the day the commodity
      was seen; the end/last dates are one day _after_ the last date seen.
    """
    lifetimes = collections.defaultdict(list)

    # The current set of active commodities.
    commodities = set()

    # The current balances across all accounts.
    balances = collections.defaultdict(inventory.Inventory)

    for entry in entries:
        # Process only transaction entries.
        if not isinstance(entry, data.Transaction):
            continue

        # Update the balance of affected accounts and check locally whether that
        # triggered a change in the set of commodities.
        commodities_changed = False
        for posting in entry.postings:
            balance = balances[posting.account]
            commodities_before = balance.currency_pairs()
            balance.add_position(posting)
            commodities_after = balance.currency_pairs()
            if commodities_after != commodities_before:
                commodities_changed = True

        # If there was a change in one of the affected account's list of
        # commodities, recompute the total set globally. This should not
        # occur very frequently.
        if commodities_changed:
            new_commodities = set(
                itertools.chain(*(inv.currency_pairs() for inv in balances.values())))
            if new_commodities != commodities:
                # The new global set of commodities has changed; update our
                # the dictionary of intervals.
                for currency in new_commodities - commodities:
                    lifetimes[currency].append((entry.date, None))

                for currency in commodities - new_commodities:
                    lifetime = lifetimes[currency]
                    begin_date, end_date = lifetime.pop(-1)
                    assert end_date is None
                    lifetime.append((begin_date, entry.date + ONEDAY))

                # Update our current set.
                commodities = new_commodities

    return lifetimes

beancount.ops.lifetimes.required_daily_prices(lifetimes_map, date_last, weekdays_only=False)

枚举所有需要价格的货币和日期。

给定一组商品的生命周期映射,枚举每种商品处于活跃状态的所有日期。此功能可用于连接历史价格获取例程,以根据现有账本填充缺失的价格条目。

参数:
  • lifetimes_map – 一个字典,键为货币,值为由 get_commodity_lifetimes() 返回的活跃区间。

  • date_last – 一个 datetime.date 实例,表示我们关心的最后日期。

  • weekdays_only – 仅限获取工作日数据的选项。

返回:
  • 元组格式为 (日期, 货币, 成本货币)。

源代码位于 beancount/ops/lifetimes.py
def required_daily_prices(lifetimes_map, date_last, weekdays_only=False):
    """Enumerate all the commodities and days where the price is required.

    Given a map of lifetimes for a set of commodities, enumerate all the days
    for each commodity where it is active. This can be used to connect to a
    historical price fetcher routine to fill in missing price entries from an
    existing ledger.

    Args:
      lifetimes_map: A dict of currency to active intervals as returned by
        get_commodity_lifetimes().
      date_last: A datetime.date instance, the last date which we're interested in.
      weekdays_only: Option to limit fetching to weekdays only.
    Returns:
      Tuples of (date, currency, cost-currency).
    """
    results = []
    for currency_pair, intervals in lifetimes_map.items():
        if currency_pair[1] is None:
            continue
        for date_begin, date_end in intervals:
            # Find first Weekday starting on or before minimum date.
            date = date_begin
            if(weekdays_only):
                diff_days = 4 - date_begin.weekday()
                if diff_days < 0:
                    date += datetime.timedelta(days=diff_days)

            # Iterate over all weekdays.
            if date_end is None:
                date_end = date_last
            while date < date_end:
                results.append((date, currency_pair[0], currency_pair[1]))
                if weekdays_only and date.weekday() == 4:
                    date += 3 * ONEDAY
                else:
                    date += ONEDAY

    return sorted(results)

beancount.ops.lifetimes.required_weekly_prices(lifetimes_map, date_last)

枚举所有需要价格的货币和星期五。

给定一组商品的生命周期映射,枚举每个商品处于活跃状态的每个星期五。这可用于连接历史价格获取例程,以填充现有账本中缺失的价格条目。

参数:
  • lifetimes_map – 一个字典,键为货币,值为由 get_commodity_lifetimes() 返回的活跃区间。

  • date_last – 一个 datetime.date 实例,表示我们关心的最后日期。

返回:
  • 元组格式为 (日期, 货币, 成本货币)。

源代码位于 beancount/ops/lifetimes.py
def required_weekly_prices(lifetimes_map, date_last):
    """Enumerate all the commodities and Fridays where the price is required.

    Given a map of lifetimes for a set of commodities, enumerate all the Fridays
    for each commodity where it is active. This can be used to connect to a
    historical price fetcher routine to fill in missing price entries from an
    existing ledger.

    Args:
      lifetimes_map: A dict of currency to active intervals as returned by
        get_commodity_lifetimes().
      date_last: A datetime.date instance, the last date which we're interested in.
    Returns:
      Tuples of (date, currency, cost-currency).
    """
    results = []
    for currency_pair, intervals in lifetimes_map.items():
        if currency_pair[1] is None:
            continue
        for date_begin, date_end in intervals:
            # Find first Friday before the minimum date.
            diff_days = 4 - date_begin.weekday()
            if diff_days >= 1:
                diff_days -= 7
            date = date_begin + datetime.timedelta(days=diff_days)

            # Iterate over all Fridays.
            if date_end is None:
                date_end = date_last
            while date < date_end:
                results.append((date, currency_pair[0], currency_pair[1]))
                date += ONE_WEEK
    return sorted(results)

beancount.ops.lifetimes.trim_intervals(intervals, trim_start=None, trim_end=None)

将日期对列表裁剪至指定的起始和结束日期范围内。在更新式价格获取中非常有用。

参数:
  • intervals – 一组 datetime.date 实例的配对列表。

  • trim_start – 包含起始日期。

  • trim_end – 不包含结束日期。

返回:
  • 一组新的区间(由 (日期, 日期) 组成的配对列表)。

源代码位于 beancount/ops/lifetimes.py
def trim_intervals(intervals, trim_start=None, trim_end=None):
    """Trim a list of date pairs to be within a start and end date.
    Useful in update-style price fetching.

    Args:
      intervals: A list of pairs of datetime.date instances
      trim_start: An inclusive starting date.
      trim_end: An exclusive starting date.
    Returns:
      A list of new intervals (pairs of (date, date)).
    """
    new_intervals = []
    iter_intervals = iter(intervals)
    if(trim_start is not None and
       trim_end is not None and
       trim_end < trim_start):
        raise ValueError('Trim end date is before start date')

    for date_begin, date_end in iter_intervals:
        if(trim_start is not None and
           trim_start > date_begin):
            date_begin = trim_start
        if(trim_end is not None):
            if(date_end is None or
               trim_end < date_end):
                date_end = trim_end

        if(date_end is None or
           date_begin <= date_end):
            new_intervals.append((date_begin, date_end))
    return new_intervals

beancount.ops.pad

自动填充条目之间的空白。

beancount.ops.pad.PadError (元组)

PadError(source, message, entry)

beancount.ops.pad.PadError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/ops/pad.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.pad.PadError.__new__(_cls, source, message, entry) 特殊 静态方法

创建一个新的 PadError 实例 (source, message, entry)

beancount.ops.pad.PadError.__replace__(/, self, **kwds) 特殊

返回一个新的 PadError 对象,用指定的新值替换相应字段。

源代码位于 beancount/ops/pad.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.pad.PadError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/ops/pad.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.pad.pad(entries, options_map)

插入交易条目以满足后续余额检查。

在 Pad 条目之后立即合成并插入交易条目,以满足填充账户的检查。返回一个新的条目列表。请注意,此操作不会跨父-子关系进行填充,它是一种非常简单的填充方式。(实践中我发现这已足够,且更易于实现和理解。)

此外,此操作仅针对单一货币进行填充,即余额检查仅针对一种货币指定,填充条目也仅针对这些货币插入。

参数:
  • entries – 一组指令列表。

  • options_map – 解析器选项字典。

返回:
  • 一个新的指令列表,包含插入的 Pad 条目,以及生成的新错误列表。

源代码位于 beancount/ops/pad.py
def pad(entries, options_map):
    """Insert transaction entries for to fulfill a subsequent balance check.

    Synthesize and insert Transaction entries right after Pad entries in order
    to fulfill checks in the padded accounts. Returns a new list of entries.
    Note that this doesn't pad across parent-child relationships, it is a very
    simple kind of pad. (I have found this to be sufficient in practice, and
    simpler to implement and understand.)

    Furthermore, this pads for a single currency only, that is, balance checks
    are specified only for one currency at a time, and pads will only be
    inserted for those currencies.

    Args:
      entries: A list of directives.
      options_map: A parser options dict.
    Returns:
      A new list of directives, with Pad entries inserted, and a list of new
      errors produced.
    """
    pad_errors = []

    # Find all the pad entries and group them by account.
    pads = list(misc_utils.filter_type(entries, data.Pad))
    pad_dict = misc_utils.groupby(lambda x: x.account, pads)

    # Partially realize the postings, so we can iterate them by account.
    by_account = realization.postings_by_account(entries)

    # A dict of pad -> list of entries to be inserted.
    new_entries = {id(pad): [] for pad in pads}

    # Process each account that has a padding group.
    for account_, pad_list in sorted(pad_dict.items()):

        # Last encountered / currency active pad entry.
        active_pad = None

        # Gather all the postings for the account and its children.
        postings = []
        is_child = account.parent_matcher(account_)
        for item_account, item_postings in by_account.items():
            if is_child(item_account):
                postings.extend(item_postings)
        postings.sort(key=data.posting_sortkey)

        # A set of currencies already padded so far in this account.
        padded_lots = set()

        pad_balance = inventory.Inventory()
        for entry in postings:

            assert not isinstance(entry, data.Posting)
            if isinstance(entry, data.TxnPosting):
                # This is a transaction; update the running balance for this
                # account.
                pad_balance.add_position(entry.posting)

            elif isinstance(entry, data.Pad):
                if entry.account == account_:
                    # Mark this newly encountered pad as active and allow all lots
                    # to be padded heretofore.
                    active_pad = entry
                    padded_lots = set()

            elif isinstance(entry, data.Balance):
                check_amount = entry.amount

                # Compare the current balance amount to the expected one from
                # the check entry. IMPORTANT: You need to understand that this
                # does not check a single position, but rather checks that the
                # total amount for a particular currency (which itself is
                # distinct from the cost).
                balance_amount = pad_balance.get_currency_units(check_amount.currency)
                diff_amount = amount.sub(balance_amount, check_amount)

                # Use the specified tolerance or automatically infer it.
                tolerance = balance.get_balance_tolerance(entry, options_map)

                if abs(diff_amount.number) > tolerance:
                    # The check fails; we need to pad.

                    # Pad only if pad entry is active and we haven't already
                    # padded that lot since it was last encountered.
                    if active_pad and (check_amount.currency not in padded_lots):

                        # Note: we decide that it's an error to try to pad
                        # positions at cost; we check here that all the existing
                        # positions with that currency have no cost.
                        positions = [pos
                                     for pos in pad_balance.get_positions()
                                     if pos.units.currency == check_amount.currency]
                        for position_ in positions:
                            if position_.cost is not None:
                                pad_errors.append(
                                    PadError(entry.meta,
                                             ("Attempt to pad an entry with cost for "
                                              "balance: {}".format(pad_balance)),
                                             active_pad))

                        # Thus our padding lot is without cost by default.
                        diff_position = position.Position.from_amounts(
                            amount.Amount(check_amount.number - balance_amount.number,
                                          check_amount.currency))

                        # Synthesize a new transaction entry for the difference.
                        narration = ('(Padding inserted for Balance of {} for '
                                     'difference {})').format(check_amount, diff_position)
                        new_entry = data.Transaction(
                            active_pad.meta.copy(), active_pad.date, flags.FLAG_PADDING,
                            None, narration, data.EMPTY_SET, data.EMPTY_SET, [])

                        new_entry.postings.append(
                            data.Posting(active_pad.account,
                                         diff_position.units, diff_position.cost,
                                         None, None, {}))
                        neg_diff_position = -diff_position
                        new_entry.postings.append(
                            data.Posting(active_pad.source_account,
                                         neg_diff_position.units, neg_diff_position.cost,
                                         None, None, {}))

                        # Save it for later insertion after the active pad.
                        new_entries[id(active_pad)].append(new_entry)

                        # Fixup the running balance.
                        pos, _ = pad_balance.add_position(diff_position)
                        if pos is not None and pos.is_negative_at_cost():
                            raise ValueError(
                                "Position held at cost goes negative: {}".format(pos))

                # Mark this lot as padded. Further checks should not pad this lot.
                padded_lots.add(check_amount.currency)

    # Insert the newly created entries right after the pad entries that created them.
    padded_entries = []
    for entry in entries:
        padded_entries.append(entry)
        if isinstance(entry, data.Pad):
            entry_list = new_entries[id(entry)]
            if entry_list:
                padded_entries.extend(entry_list)
            else:
                # Generate errors on unused pad entries.
                pad_errors.append(
                    PadError(entry.meta, "Unused Pad entry", entry))

    return padded_entries, pad_errors

beancount.ops.summarize

条目摘要。

此代码用于将一系列条目(例如在某个时间段内)汇总为几个"期初余额"条目。在计算特定时间段的资产负债表时:我们不希望看到某个时间段之前的条目,因此将它们合并为每个账户一个交易,该交易包含该账户的总金额。

beancount.ops.summarize.balance_by_account(entries, date=None, compress_unbooked=False)

对所有严格早于 'date' 的条目按账户汇总余额。

参数:
  • entries – 一组指令列表。

  • date – 可选的 datetime.date 实例。如果提供,则在该日期及之后停止累加。这在对特定日期之前的条目进行摘要时非常有用。

  • compress_unbooked – 对于使用 NONE 记账方式的账户,将其头寸压缩为一个平均头寸。当导出完整头寸列表时,此选项很有用,因为这些账户通常会因费用以负成本产生大量微小头寸。

返回:
  • 一个包含两个元素的元组:第一个是账户字符串到 Inventory 实例的字典(表示在给定日期之前的账户余额),第二个是列表中遇到该日期时的条目索引。如果所有条目都位于截止日期之前,则返回比最后一个条目索引大一的值。

源代码位于 beancount/ops/summarize.py
def balance_by_account(entries, date=None, compress_unbooked=False):
    """Sum up the balance per account for all entries strictly before 'date'.

    Args:
      entries: A list of directives.
      date: An optional datetime.date instance. If provided, stop accumulating
        on and after this date. This is useful for summarization before a
        specific date.
      compress_unbooked: For accounts that have a booking method of NONE,
        compress their positions into a single average position. This can be
        used when you export the full list of positions, because those accounts
        will have a myriad of small positions from fees at negative cost and
        what-not.
    Returns:
      A pair of a dict of account string to instance Inventory (the balance of
      this account before the given date), and the index in the list of entries
      where the date was encountered. If all entries are located before the
      cutoff date, an index one beyond the last entry is returned.

    """
    balances = collections.defaultdict(inventory.Inventory)
    for index, entry in enumerate(entries):
        if date and entry.date >= date:
            break

        if isinstance(entry, Transaction):
            for posting in entry.postings:
                account_balance = balances[posting.account]

                # Note: We must allow negative lots at cost, because this may be
                # used to reduce a filtered list of entries which may not
                # include the entries necessary to keep units at cost always
                # above zero. The only summation that is guaranteed to be above
                # zero is if all the entries are being summed together, no
                # entries are filtered, at least for a particular account's
                # postings.
                account_balance.add_position(posting)
    else:
        index = len(entries)

    # If the account has "NONE" booking method, merge all its postings
    # together in order to obtain an accurate cost basis and balance of
    # units.
    #
    # (This is a complex issue.) If you accrued positions without having them
    # booked properly against existing cost bases, you have not properly accounted
    # for the profit/loss to other postings. This means that the resulting
    # profit/loss is merged in the cost basis of the positive and negative
    # postings.
    if compress_unbooked:
        oc_map = getters.get_account_open_close(entries)
        accounts_map = {account: dopen for account, (dopen, _) in oc_map.items()}

        for account, balance in balances.items():
            dopen = accounts_map.get(account, None)
            if dopen is not None and dopen.booking is data.Booking.NONE:
                average_balance = balance.average()
                balances[account] = inventory.Inventory(pos for pos in average_balance)

    return balances, index

beancount.ops.summarize.cap(entries, account_types, conversion_currency, account_earnings, account_conversions)

将净收益转入权益,并插入最终的汇兑条目。

此操作用于将收入和费用账户的余额转移至权益账户,以生成一个余额精确为零的资产负债表。

参数:
  • entries – 一组指令列表。

  • account_types – AccountTypes 实例。

  • conversion_currency – 字符串,用于汇兑条目中零价格的转账货币。

  • account_earnings – 字符串,用于接收收入和费用账户最终余额的权益账户名称。

  • account_conversions – 字符串,用于货币汇兑的权益账户名称。

返回:
  • 修改后的条目列表,其中收入和费用账户的余额已被转移。

源代码位于 beancount/ops/summarize.py
def cap(entries,
        account_types,
        conversion_currency,
        account_earnings,
        account_conversions):
    """Transfer net income to equity and insert a final conversion entry.

    This is used to move and nullify balances from the income and expense
    accounts to an equity account in order to draw up a balance sheet with a
    balance of precisely zero.

    Args:
      entries: A list of directives.
      account_types: An instance of AccountTypes.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_earnings: A string, the name of the equity account to transfer
        final balances of the income and expense accounts to.
      account_conversions: A string, the name of the equity account to use as
        the source for currency conversions.
    Returns:
      A modified list of entries, with the income and expense accounts
      transferred.
    """

    # Transfer the balances of income and expense accounts as earnings / net
    # income.
    income_statement_account_pred = (
        lambda account: is_income_statement_account(account, account_types))
    entries = transfer_balances(entries, None,
                                income_statement_account_pred,
                                account_earnings)

    # Insert final conversion entries.
    entries = conversions(entries, account_conversions, conversion_currency, None)

    return entries

beancount.ops.summarize.cap_opt(entries, options_map)

通过选项映射获取所有参数来关闭账户。

详情请参见 cap()。

参数:
  • entries – 参见 cap()。

  • options_map – 解析器的选项映射。

返回:
  • 与 close() 相同。

源代码位于 beancount/ops/summarize.py
def cap_opt(entries, options_map):
    """Close by getting all the parameters from an options map.

    See cap() for details.

    Args:
      entries: See cap().
      options_map: A parser's option_map.
    Returns:
      Same as close().
    """
    account_types = options.get_account_types(options_map)
    current_accounts = options.get_current_accounts(options_map)
    conversion_currency = options_map['conversion_currency']
    return cap(entries,
               account_types,
               conversion_currency,
               *current_accounts)

beancount.ops.summarize.clamp(entries, begin_date, end_date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)

筛选条目,仅保留指定时间段内的条目。

首先,此方法会将给定期间开始日期之前的所有收入和费用账户余额转移至 'account_earnings' 账户(期间之前的收益,或称"留存收益"),并将该日期之前的所有交易汇总至 'account_opening' 账户(通常为"期初余额")。经过此操作后,收入和费用账户应不再包含任何交易(因为其余额已被转移,而零余额的汇总不会新增任何交易)。

其次,所有在期间结束日期之后的条目将被截断,并为在期间开始和结束之间发生的交易添加一个转换条目,以反映这些变化。所有账户的最终余额应为空。

参数:
  • entries – 一组指令元组的列表。

  • begin_date – 一个 datetime.date 实例,表示期间的开始日期。

  • end_date – 一个 datetime.date 实例,表示期间结束日期的后一天。

  • account_types – AccountTypes 实例。

  • conversion_currency – 字符串,用于汇兑条目中零价格的转账货币。

  • account_earnings – 一个字符串,指定将损益表账户中之前收益转入资产负债表的账户名称。

  • account_opening – 一个字符串,指定用于转移前期余额以初始化期间开始时账户余额的权益账户名称。该账户通常称为期初余额账户。

  • account_conversions – 一个字符串,指定用于记录货币转换的权益账户名称。

返回:
  • 返回一个新的条目列表,以及指向期间开始日期之后第一个原始交易的索引。该索引可用于生成期初余额报表,该报表仅使用汇总后的条目作为资产负债表数据。

源代码位于 beancount/ops/summarize.py
def clamp(entries,
          begin_date, end_date,
          account_types,
          conversion_currency,
          account_earnings,
          account_opening,
          account_conversions):
    """Filter entries to include only those during a specified time period.

    Firstly, this method will transfer all balances for the income and expense
    accounts occurring before the given period begin date to the
    'account_earnings' account (earnings before the period, or "retained
    earnings") and summarize all of the transactions before that date against
    the 'account_opening' account (usually "opening balances"). The resulting
    income and expense accounts should have no transactions (since their
    balances have been transferred out and summarization of zero balances should
    not add any transactions).

    Secondly, all the entries after the period end date will be truncated and a
    conversion entry will be added for the resulting transactions that reflect
    changes occurring between the beginning and end of the exercise period. The
    resulting balance of all account should be empty.

    Args:
      entries: A list of directive tuples.
      begin_date: A datetime.date instance, the beginning of the period.
      end_date: A datetime.date instance, one day beyond the end of the period.
      account_types: An instance of AccountTypes.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_earnings: A string, the name of the account to transfer
        previous earnings from the income statement accounts to the balance
        sheet.
      account_opening: A string, the name of the account in equity
        to transfer previous balances from, in order to initialize account
        balances at the beginning of the period. This is typically called an
        opening balances account.
      account_conversions: A string, the name of the equity account to
        book currency conversions against.
    Returns:
      A new list of entries is returned, and the index that points to the first
      original transaction after the beginning date of the period. This index
      can be used to generate the opening balances report, which is a balance
      sheet fed with only the summarized entries.
    """
    # Transfer income and expenses before the period to equity.
    income_statement_account_pred = (
        lambda account: is_income_statement_account(account, account_types))
    entries = transfer_balances(entries, begin_date,
                                income_statement_account_pred, account_earnings)

    # Summarize all the previous balances, after transferring the income and
    # expense balances, so all entries for those accounts before the begin date
    # should now disappear.
    entries, index = summarize(entries, begin_date, account_opening)

    # Truncate the entries after this.
    entries = truncate(entries, end_date)

    # Insert conversion entries.
    entries = conversions(entries, account_conversions, conversion_currency, end_date)

    return entries, index

beancount.ops.summarize.clamp_opt(entries, begin_date, end_date, options_map)

通过从选项映射中获取所有参数来截断数据。

详情请参见 clamp()。

参数:
  • entries – 请参见 clamp()。

  • begin_date – 请参见 clamp()。

  • end_date – 请参见 clamp()。

  • options_map – 解析器的选项映射。

返回:
  • 与 clamp() 相同。

源代码位于 beancount/ops/summarize.py
def clamp_opt(entries, begin_date, end_date, options_map):
    """Clamp by getting all the parameters from an options map.

    See clamp() for details.

    Args:
      entries: See clamp().
      begin_date: See clamp().
      end_date: See clamp().
      options_map: A parser's option_map.
    Returns:
      Same as clamp().
    """
    account_types = options.get_account_types(options_map)
    previous_earnings, previous_balances, _ = options.get_previous_accounts(options_map)
    _, current_conversions = options.get_current_accounts(options_map)

    conversion_currency = options_map['conversion_currency']
    return clamp(entries, begin_date, end_date,
                 account_types,
                 conversion_currency,
                 previous_earnings,
                 previous_balances,
                 current_conversions)

beancount.ops.summarize.clear(entries, date, account_types, account_earnings)

将指定日期的收入和费用余额转入权益账户。

此方法插入条目以清零收入和费用账户的余额,并将这些余额转移至一个权益账户。

参数:
  • entries – 一组指令元组的列表。

  • date – 一个 datetime.date 实例,表示期间结束日期的后一天。此日期可选设为 None,以便在条目列表末尾关闭。

  • account_types – AccountTypes 实例。

  • account_earnings – 一个字符串,指定将损益表账户中之前收益转入资产负债表的账户名称。

返回:
  • 返回一个新的条目列表,以及指向转移操作之前最后一个原始交易的前一个索引。

源代码位于 beancount/ops/summarize.py
def clear(entries,
          date,
          account_types,
          account_earnings):
    """Transfer income and expenses balances at the given date to the equity accounts.

    This method insert entries to zero out balances on income and expenses
    accounts by transferring them to an equity account.

    Args:
      entries: A list of directive tuples.
      date: A datetime.date instance, one day beyond the end of the period. This
        date can be optionally left to None in order to close at the end of the
        list of entries.
      account_types: An instance of AccountTypes.
      account_earnings: A string, the name of the account to transfer
        previous earnings from the income statement accounts to the balance
        sheet.
    Returns:
      A new list of entries is returned, and the index that points to one before
      the last original transaction before the transfers.
    """
    index = len(entries)

    # Transfer income and expenses before the period to equity.
    income_statement_account_pred = (
        lambda account: is_income_statement_account(account, account_types))
    new_entries = transfer_balances(entries, date,
                                    income_statement_account_pred, account_earnings)

    return new_entries, index

beancount.ops.summarize.clear_opt(entries, date, options_map)

使用选项映射简化调用 clear() 的便捷函数。

源代码位于 beancount/ops/summarize.py
def clear_opt(entries, date, options_map):
    """Convenience function to clear() using an options map.
    """
    account_types = options.get_account_types(options_map)
    current_accounts = options.get_current_accounts(options_map)
    return clear(entries, date, account_types, current_accounts[0])

beancount.ops.summarize.close(entries, date, conversion_currency, account_conversions)

截断指定日期之后的条目并确保余额为零。

此方法本质上会删除指定日期之后的所有条目,即截断未来。为此,它将

  1. 移除所有在 'date' 之后发生的条目(如果提供了该日期)。

  2. 在条目列表末尾插入转换交易,以确保所有分录的总余额为零。

结果是一个总余额为零的条目列表,但收入/费用账户的余额可能非零。要生成最终的资产负债表,请使用 transfer() 将净收益转入权益账户。

参数:
  • entries – 一组指令元组的列表。

  • date – 一个 datetime.date 实例,表示期间结束日期的后一天。此日期可选设为 None,以便在条目列表末尾关闭。

  • conversion_currency – 字符串,用于汇兑条目中零价格的转账货币。

  • account_conversions – 一个字符串,指定用于记录货币转换的权益账户名称。

返回:
  • 返回一个新的条目列表,以及指向所提供最后一个原始交易之后一个位置的索引。可能已插入额外条目以标准化转换并确保总余额为零。

源代码位于 beancount/ops/summarize.py
def close(entries,
          date,
          conversion_currency,
          account_conversions):
    """Truncate entries that occur after a particular date and ensure balance.

    This method essentially removes entries after a date. It truncates the
    future. To do so, it will

    1. Remove all entries which occur after 'date', if given.

    2. Insert conversion transactions at the end of the list of entries to
       ensure that the total balance of all postings sums up to empty.

    The result is a list of entries with a total balance of zero, with possibly
    non-zero balances for the income/expense accounts. To produce a final
    balance sheet, use transfer() to move the net income to the equity accounts.

    Args:
      entries: A list of directive tuples.
      date: A datetime.date instance, one day beyond the end of the period. This
        date can be optionally left to None in order to close at the end of the
        list of entries.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_conversions: A string, the name of the equity account to
        book currency conversions against.
    Returns:
      A new list of entries is returned, and the index that points to one beyond
      the last original transaction that was provided. Further entries may have
      been inserted to normalize conversions and ensure the total balance sums
      to zero.
    """

    # Truncate the entries after the date, if a date has been provided.
    if date is not None:
        entries = truncate(entries, date)

    # Keep an index to the truncated list of entries (before conversions).
    index = len(entries)

    # Insert a conversions entry to ensure the total balance of all accounts is
    # flush zero.
    entries = conversions(entries, account_conversions, conversion_currency, date)

    return entries, index

beancount.ops.summarize.close_opt(entries, date, options_map)

用于使用选项映射关闭的便捷函数。

源代码位于 beancount/ops/summarize.py
def close_opt(entries, date, options_map):
    """Convenience function to close() using an options map.
    """
    conversion_currency = options_map['conversion_currency']
    current_accounts = options.get_current_accounts(options_map)
    return close(entries, date, conversion_currency, current_accounts[1])

beancount.ops.summarize.conversions(entries, conversion_account, conversion_currency, date=None)

在指定账户的日期 'date' 处插入一条兑换条目。

参数:
  • entries – 条目列表。

  • conversion_account – 字符串,用于记账的账户。

  • conversion_currency – 字符串,用于汇兑条目中零价格的转账货币。

  • date – 在此日期之前插入兑换条目的时间点。新条目将被插入为该日期前一个日期的最后一条条目。

返回:
  • 修改后的条目列表。

源代码位于 beancount/ops/summarize.py
def conversions(entries, conversion_account, conversion_currency, date=None):
    """Insert a conversion entry at date 'date' at the given account.

    Args:
      entries: A list of entries.
      conversion_account: A string, the account to book against.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      date: The date before which to insert the conversion entry. The new
        entry will be inserted as the last entry of the date just previous
        to this date.
    Returns:
      A modified list of entries.
    """
    # Compute the balance at the given date.
    conversion_balance = interpolate.compute_entries_balance(entries, date=date)

    # Early exit if there is nothing to do.
    conversion_cost_balance = conversion_balance.reduce(convert.get_cost)
    if conversion_cost_balance.is_empty():
        return entries

    # Calculate the index and the date for the new entry. We want to store it as
    # the last transaction of the day before.
    if date is not None:
        index = bisect_key.bisect_left_with_key(entries, date, key=lambda entry: entry.date)
        last_date = date - datetime.timedelta(days=1)
    else:
        index = len(entries)
        last_date = entries[-1].date

    meta = data.new_metadata('<conversions>', -1)
    narration = 'Conversion for {}'.format(conversion_balance)
    conversion_entry = Transaction(meta, last_date, flags.FLAG_CONVERSIONS,
                                   None, narration, data.EMPTY_SET, data.EMPTY_SET, [])
    for position in conversion_cost_balance.get_positions():
        # Important note: Set the cost to zero here to maintain the balance
        # invariant. (This is the only single place we cheat on the balance rule
        # in the entire system and this is necessary; see documentation on
        # Conversions.)
        price = amount.Amount(ZERO, conversion_currency)
        neg_pos = -position
        conversion_entry.postings.append(
            data.Posting(conversion_account, neg_pos.units, neg_pos.cost,
                         price, None, None))

    # Make a copy of the list of entries and insert the new transaction into it.
    new_entries = list(entries)
    new_entries.insert(index, conversion_entry)

    return new_entries

beancount.ops.summarize.create_entries_from_balances(balances, date, source_account, direction, meta, flag, narration_template)

从余额字典创建一组条目。

此方法创建一组新条目,将 'balances' 字典中的金额转移到/从 'source_account' 指定的账户中转移。

平衡的记账项将以成本等价形式创建。换句话说,如果您尝试平衡 10 HOOL {500 USD},则会在一端生成包含此头寸的记账项,而在 'source_account' 端生成 5000 USD 的记账项。

参数:
  • balances – 账户名称字符串到 Inventory 实例的字典。

  • date – datetime.date 对象,用于创建交易的日期。

  • source_account – 字符串,从中提取余额的账户名称。这是魔术师的帽子,从中取出兔子。

  • direction – 如果 'direction' 为 True,则新条目将金额从源账户转移到余额账户;否则,新条目将金额从余额账户转移到源账户。

  • meta – 用作交易元数据的字典。

  • flag – 字符串,用于交易的标记。

  • narration_template – 用于生成说明的格式字符串。它将使用 'account' 和 'date' 作为替换变量进行格式化。

返回:
  • 新合成的 Transaction 条目列表。

源代码位于 beancount/ops/summarize.py
def create_entries_from_balances(balances, date, source_account, direction,
                                 meta, flag, narration_template):
    """"Create a list of entries from a dict of balances.

    This method creates a list of new entries to transfer the amounts in the
    'balances' dict to/from another account specified in 'source_account'.

    The balancing posting is created with the equivalent at cost. In other
    words, if you attempt to balance 10 HOOL {500 USD}, this will synthesize a
    posting with this position on one leg, and with 5000 USD on the
    'source_account' leg.

    Args:
      balances: A dict of account name strings to Inventory instances.
      date: A datetime.date object, the date at which to create the transaction.
      source_account: A string, the name of the account to pull the balances
        from. This is the magician's hat to pull the rabbit from.
      direction: If 'direction' is True, the new entries transfer TO the
        balances account from the source account; otherwise the new entries
        transfer FROM the balances into the source account.
      meta: A dict to use as metadata for the transactions.
      flag: A string, the flag to use for the transactions.
      narration_template: A format string for creating the narration. It is
        formatted with 'account' and 'date' replacement variables.
    Returns:
      A list of newly synthesizes Transaction entries.
    """
    new_entries = []
    for account, account_balance in sorted(balances.items()):

        # Don't create new entries where there is no balance.
        if account_balance.is_empty():
            continue

        narration = narration_template.format(account=account, date=date)

        if not direction:
            account_balance = -account_balance

        postings = []
        new_entry = Transaction(
            meta, date, flag, None, narration, data.EMPTY_SET, data.EMPTY_SET, postings)

        for position in account_balance.get_positions():
            postings.append(data.Posting(account, position.units, position.cost,
                                         None, None, None))
            cost = -convert.get_cost(position)
            postings.append(data.Posting(source_account, cost, None,
                                         None, None, None))

        new_entries.append(new_entry)

    return new_entries

beancount.ops.summarize.get_open_entries(entries, date)

收集在指定日期仍有效的 Open 条目列表。

返回在给定日期尚未关闭的 Open 条目列表,顺序与在文档中观察到的顺序一致。

参数:
  • entries – 一组指令列表。

  • date – 查找开放条目的日期。如未指定,则返回在最新日期仍保持开放的条目。

返回:
  • Open 指令列表。

源代码位于 beancount/ops/summarize.py
def get_open_entries(entries, date):
    """Gather the list of active Open entries at date.

    This returns the list of Open entries that have not been closed at the given
    date, in the same order they were observed in the document.

    Args:
      entries: A list of directives.
      date: The date at which to look for an open entry. If not specified, will
        return the entries still open at the latest date.
    Returns:
      A list of Open directives.
    """
    open_entries = {}
    for index, entry in enumerate(entries):
        if date is not None and entry.date >= date:
            break

        if isinstance(entry, Open):
            try:
                ex_index, ex_entry = open_entries[entry.account]
                if entry.date < ex_entry.date:
                    open_entries[entry.account] = (index, entry)
            except KeyError:
                open_entries[entry.account] = (index, entry)

        elif isinstance(entry, Close):
            # If there is no corresponding open, don't raise an error.
            open_entries.pop(entry.account, None)

    return [entry for (index, entry) in sorted(open_entries.values())]

beancount.ops.summarize.open(entries, date, account_types, conversion_currency, account_earnings, account_opening, account_conversions)

在指定日期前汇总条目,并将收入/支出转入权益账户。

此方法本质上准备一个仅包含特定日期之后交易的指令列表。它截断过去的数据。为此,它将

  1. 在指定的开放日期插入兑换交易,然后

  2. 在该日期插入交易,将此前累积的余额从收入和支出账户转移到权益账户,最后

  3. 删除所有早于该日期的交易,并用开仓余额条目替换,以使余额保持相同金额。

结果是一个条目列表,其中收入和支出账户的期初余额为零,其他所有账户的期初都以一笔交易将其余额调整至预期金额。在此之前的所有记录均已汇总。

返回一个索引,指向余额期初交易之后的第一笔交易,因此你可以仅保留这些交易,以便仅渲染期初余额的资产负债表。

参数:
  • entries – 一组指令元组的列表。

  • date – 一个 datetime.date 实例,表示执行此操作的日期。

  • account_types – AccountTypes 实例。

  • conversion_currency – 字符串,用于汇兑条目中零价格的转账货币。

  • account_earnings – 一个字符串,指定将损益表账户中之前收益转入资产负债表的账户名称。

  • account_opening – 一个字符串,指定用于转移前期余额以初始化期间开始时账户余额的权益账户名称。该账户通常称为期初余额账户。

  • account_conversions – 一个字符串,指定用于记录货币转换的权益账户名称。

返回:
  • 返回一个新的条目列表,以及指向期间开始日期之后第一个原始交易的索引。该索引可用于生成期初余额报表,该报表仅使用汇总后的条目作为资产负债表数据。

源代码位于 beancount/ops/summarize.py
def open(entries,
         date,
         account_types,
         conversion_currency,
         account_earnings,
         account_opening,
         account_conversions):
    """Summarize entries before a date and transfer income/expenses to equity.

    This method essentially prepares a list of directives to contain only
    transactions that occur after a particular date. It truncates the past. To
    do so, it will

    1. Insert conversion transactions at the given open date, then

    2. Insert transactions at that date to move accumulated balances from before
       that date from the income and expenses accounts to an equity account, and
       finally

    3. It removes all the transactions previous to the date and replaces them by
       opening balances entries to bring the balances to the same amount.

    The result is a list of entries for which the income and expense accounts
    are beginning with a balance of zero, and all other accounts begin with a
    transaction that brings their balance to the expected amount. All the past
    has been summarized at that point.

    An index is returned to the first transaction past the balance opening
    transactions, so you can keep just those in order to render a balance sheet
    for only the opening balances.

    Args:
      entries: A list of directive tuples.
      date: A datetime.date instance, the date at which to do this.
      account_types: An instance of AccountTypes.
      conversion_currency: A string, the transfer currency to use for zero prices
        on the conversion entry.
      account_earnings: A string, the name of the account to transfer
        previous earnings from the income statement accounts to the balance
        sheet.
      account_opening: A string, the name of the account in equity
        to transfer previous balances from, in order to initialize account
        balances at the beginning of the period. This is typically called an
        opening balances account.
      account_conversions: A string, the name of the equity account to
        book currency conversions against.
    Returns:
      A new list of entries is returned, and the index that points to the first
      original transaction after the beginning date of the period. This index
      can be used to generate the opening balances report, which is a balance
      sheet fed with only the summarized entries.

    """
    # Insert conversion entries.
    entries = conversions(entries, account_conversions, conversion_currency, date)

    # Transfer income and expenses before the period to equity.
    entries, _ = clear(entries, date, account_types, account_earnings)

    # Summarize all the previous balances, after transferring the income and
    # expense balances, so all entries for those accounts before the begin date
    # should now disappear.
    entries, index = summarize(entries, date, account_opening)

    return entries, index

beancount.ops.summarize.open_opt(entries, date, options_map)

使用选项映射的 open() 函数的便捷封装。

源代码位于 beancount/ops/summarize.py
def open_opt(entries, date, options_map):
    """Convenience function to open() using an options map.
    """
    account_types = options.get_account_types(options_map)
    previous_accounts = options.get_previous_accounts(options_map)
    conversion_currency = options_map['conversion_currency']
    return open(entries, date, account_types, conversion_currency, *previous_accounts)

beancount.ops.summarize.summarize(entries, date, account_opening)

通过用汇总条目替换指定日期之前的全部条目来汇总数据。

该函数将截至(但不包括)指定日期的所有交易替换为每个账户的期初余额交易。它返回一组新的条目,所有在指定日期之前的交易均被替换为每个账户对应的少量汇总条目。

注意:- 活跃账户的开户条目将被保留。- 每个(基础货币,报价货币)对的最后一个相关价格条目将被保留。- 截止日期之前的所有其他条目将被移除。

参数:
  • entries – 一组指令列表。

  • date – 一个 datetime.date 实例,表示在此日期之前进行汇总的截止日期。

  • account_opening – 一个字符串,表示用于记录汇总条目的源账户名称。

返回:
  • 该函数返回一个新的条目列表,以及一个整数索引,该索引表示从截止日期或之后的条目开始的位置。

源代码位于 beancount/ops/summarize.py
def summarize(entries, date, account_opening):
    """Summarize all entries before a date by replacing then with summarization entries.

    This function replaces the transactions up to (and not including) the given
    date with a opening balance transactions, one for each account. It returns
    new entries, all of the transactions before the given date having been
    replaced by a few summarization entries, one for each account.

    Notes:
    - Open entries are preserved for active accounts.
    - The last relevant price entry for each (base, quote) pair is preserved.
    - All other entries before the cutoff date are culled.

    Args:
      entries: A list of directives.
      date: A datetime.date instance, the cutoff date before which to summarize.
      account_opening: A string, the name of the source account to book summarization
        entries against.
    Returns:
      The function returns a list of new entries and the integer index at which
      the entries on or after the cutoff date begin.
    """
    # Compute balances at date.
    balances, index = balance_by_account(entries, date)

    # We need to insert the entries with a date previous to subsequent checks,
    # to maintain ensure the open directives show up before any transaction.
    summarize_date = date - datetime.timedelta(days=1)

    # Create summarization / opening balance entries.
    summarizing_entries = create_entries_from_balances(
        balances, summarize_date, account_opening, True,
        data.new_metadata('<summarize>', 0), flags.FLAG_SUMMARIZE,
        "Opening balance for '{account}' (Summarization)")

    # Insert the last price entry for each commodity from before the date.
    price_entries = prices.get_last_price_entries(entries, date)

    # Gather the list of active open entries at date.
    open_entries = get_open_entries(entries, date)

    # Compute entries before the date and preserve the entries after the date.
    before_entries = sorted(open_entries + price_entries + summarizing_entries,
                            key=data.entry_sortkey)
    after_entries = entries[index:]

    # Return a new list of entries and the index that points after the entries
    # were inserted.
    return (before_entries + after_entries), len(before_entries)

beancount.ops.summarize.transfer_balances(entries, date, account_pred, transfer_account)

合成交易,以在指定日期将某些账户的余额转移。

对于所有匹配 'account_pred' 谓词的账户,创建新条目,将该日期的账户余额转移至指定的转账账户。此功能常用于将上一期间的收入和支出余额转移至“留存收益”账户。此操作通过创建新条目实现。

请注意,插入转账会破坏这些账户后续的余额校验。因此,输出列表中这些账户在截止日期之后的所有余额断言条目都将被移除。

参数:
  • entries – 一组指令列表。

  • date – 一个 datetime.date 实例,表示执行转账的日期。

  • account_pred – 一个谓词函数,接收账户字符串作为参数,若该账户需被转移则返回 True。

  • transfer_account – 一个字符串,表示用于接收指定日期余额的转账账户名称。

返回:
  • 一个包含新增转账条目的新条目列表。

源代码位于 beancount/ops/summarize.py
def transfer_balances(entries, date, account_pred, transfer_account):
    """Synthesize transactions to transfer balances from some accounts at a given date.

    For all accounts that match the 'account_pred' predicate, create new entries
    to transfer the balance at the given date from the account to the transfer
    account. This is used to transfer balances from income and expenses from a
    previous period to a "retained earnings" account. This is accomplished by
    creating new entries.

    Note that inserting transfers breaks any following balance checks that are
    in the transferred accounts. For this reason, all balance assertion entries
    following the cutoff date for those accounts are removed from the list in
    output.

    Args:
      entries: A list of directives.
      date: A datetime.date instance, the date at which to make the transfer.
      account_pred: A predicate function that, given an account string, returns
        true if the account is meant to be transferred.
      transfer_account: A string, the name of the source account to be used on
        the transfer entries to receive balances at the given date.
    Returns:
      A new list of entries, with the new transfer entries added in.
    """
    # Don't bother doing anything if there are no entries.
    if not entries:
        return entries

    # Compute balances at date.
    balances, index = balance_by_account(entries, date)

    # Filter out to keep only the accounts we want.
    transfer_balances = {account: balance
                         for account, balance in balances.items()
                         if account_pred(account)}

    # We need to insert the entries at the end of the previous day.
    if date:
        transfer_date = date - datetime.timedelta(days=1)
    else:
        transfer_date = entries[-1].date

    # Create transfer entries.
    transfer_entries = create_entries_from_balances(
        transfer_balances, transfer_date, transfer_account, False,
        data.new_metadata('<transfer_balances>', 0), flags.FLAG_TRANSFER,
        "Transfer balance for '{account}' (Transfer balance)")

    # Remove balance assertions that occur after a transfer on an account that
    # has been transferred away; they would break.
    after_entries = [entry
                     for entry in entries[index:]
                     if not (isinstance(entry, data.Balance) and
                             entry.account in transfer_balances)]

    # Split the new entries in a new list.
    return (entries[:index] + transfer_entries + after_entries)

beancount.ops.summarize.truncate(entries, date)

过滤掉指定日期及之后的所有条目。返回一个新的条目列表。

参数:
  • entries – 一个已排序的指令列表。

  • date – 一个 datetime.date 实例。

返回:
  • 一个被截断的指令列表。

源代码位于 beancount/ops/summarize.py
def truncate(entries, date):
    """Filter out all the entries at and after date. Returns a new list of entries.

    Args:
      entries: A sorted list of directives.
      date: A datetime.date instance.
    Returns:
      A truncated list of directives.
    """
    index = bisect_key.bisect_left_with_key(entries, date,
                                            key=lambda entry: entry.date)
    return entries[:index]

beancount.ops.validation

验证检查。

这些检查应在所有插件完成对条目列表的转换后、在提供或生成报告前运行。其目的是确保一组合理的不变量,并在这些不变量被违反时生成错误。它们并非简单的健全性检查——用户数据可能受约束限制,这些约束应在此处被检测并以错误形式反馈给用户。

beancount.ops.validation.ValidationError (元组)

ValidationError(source, message, entry)

beancount.ops.validation.ValidationError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/ops/validation.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.ops.validation.ValidationError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 ValidationError(source, message, entry) 的新实例

beancount.ops.validation.ValidationError.__replace__(/, self, **kwds) 特殊

返回一个新的 ValidationError 对象,用指定的新值替换字段

源代码位于 beancount/ops/validation.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.ops.validation.ValidationError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/ops/validation.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.ops.validation.validate(entries, options_map, log_timings=None, extra_validations=None)

对解析后的内容执行所有标准检查。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

  • log_timings – 一个可选函数,用于记录各个操作的耗时。

  • extra_validations – 在加载条目列表后要运行的额外验证函数列表。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate(entries, options_map, log_timings=None, extra_validations=None):
    """Perform all the standard checks on parsed contents.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
      log_timings: An optional function to use for logging the time of individual
        operations.
      extra_validations: A list of extra validation functions to run after loading
        this list of entries.
    Returns:
      A list of new errors, if any were found.
    """
    validation_tests = VALIDATIONS
    if extra_validations:
        validation_tests += extra_validations

    # Run various validation routines define above.
    errors = []
    for validation_function in validation_tests:
        with misc_utils.log_time('function: {}'.format(validation_function.__name__),
                                 log_timings, indent=2):
            new_errors = validation_function(entries, options_map)
        errors.extend(new_errors)

    return errors

beancount.ops.validation.validate_active_accounts(entries, unused_options_map)

检查所有账户引用是否都指向活跃账户。

我们基本上检查除 Open 和 Close 外的所有指令中对账户的引用,是否都发生在该账户的开闭区间内。对于所有能提取账户名称的指令类型,这都适用。

请注意,此检查比单纯比较日期更为严格:我们实际上会检查在账户的 Open 指令出现之前,是否在同一天有对该账户的任何引用。这是一个很好的特性,我们的自定义排序例程也支持这一点:在日期相同时,会将 Open 条目排在交易条目前。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_active_accounts(entries, unused_options_map):
    """Check that all references to accounts occurs on active accounts.

    We basically check that references to accounts from all directives other
    than Open and Close occur at dates the open-close interval of that account.
    This should be good for all of the directive types where we can extract an
    account name.

    Note that this is more strict a check than comparing the dates: we actually
    check that no references to account are made on the same day before the open
    directive appears for that account. This is a nice property to have, and is
    supported by our custom sorting routine that will sort open entries before
    transaction entries, given the same date.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    error_pairs = []
    active_set = set()
    opened_accounts = set()
    for entry in entries:
        if isinstance(entry, data.Open):
            active_set.add(entry.account)
            opened_accounts.add(entry.account)

        elif isinstance(entry, data.Close):
            active_set.discard(entry.account)

        else:
            for account in getters.get_entry_accounts(entry):
                if account not in active_set:
                    # Allow document and note directives that occur after an
                    # account is closed.
                    if (isinstance(entry, ALLOW_AFTER_CLOSE) and
                        account in opened_accounts):
                        continue

                    # Register an error to be logged later, with an appropriate
                    # message.
                    error_pairs.append((account, entry))

    # Refine the error message to disambiguate between the case of an account
    # that has never been seen and one that was simply not active at the time.
    errors = []
    for account, entry in error_pairs:
        if account in opened_accounts:
            message = "Invalid reference to inactive account '{}'".format(account)
        else:
            message = "Invalid reference to unknown account '{}'".format(account)
        errors.append(ValidationError(entry.meta, message, entry))

    return errors

beancount.ops.validation.validate_check_transaction_balances(entries, options_map)

再次检查所有交易分录是否平衡,因为用户可能已修改过交易。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_check_transaction_balances(entries, options_map):
    """Check again that all transaction postings balance, as users may have
    transformed transactions.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    # Note: this is a bit slow; we could limit our checks to the original
    # transactions by using the hash function in the loader.
    errors = []
    for entry in entries:
        if isinstance(entry, Transaction):
            # IMPORTANT: This validation is _crucial_ and cannot be skipped.
            # This is where we actually detect and warn on unbalancing
            # transactions. This _must_ come after the user routines, because
            # unbalancing input is legal, as those types of transactions may be
            # "fixed up" by a user-plugin. In other words, we want to allow
            # users to input unbalancing transactions as long as the final
            # transactions objects that appear on the stream (after processing
            # the plugins) are balanced. See {9e6c14b51a59}.
            #
            # Detect complete sets of postings that have residual balance;
            residual = interpolate.compute_residual(entry.postings)
            tolerances = interpolate.infer_tolerances(entry.postings, options_map)
            if not residual.is_small(tolerances):
                errors.append(
                    ValidationError(entry.meta,
                                    "Transaction does not balance: {}".format(residual),
                                    entry))

    return errors

beancount.ops.validation.validate_currency_constraints(entries, options_map)

检查账户开立声明中的货币约束。

Open 指令允许指定一个可选的货币列表,用于限定该账户余额中允许包含的唯一商品类型。此函数检查所有分录是否仅使用这些指定的商品。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_currency_constraints(entries, options_map):
    """Check the currency constraints from account open declarations.

    Open directives admit an optional list of currencies that specify the only
    types of commodities that the running inventory for this account may
    contain. This function checks that all postings are only made in those
    commodities.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """

    # Get all the open entries with currency constraints.
    open_map = {entry.account: entry
                for entry in entries
                if isinstance(entry, Open) and entry.currencies}

    errors = []
    for entry in entries:
        if not isinstance(entry, Transaction):
            continue

        for posting in entry.postings:
            # Look up the corresponding account's valid currencies; skip the
            # check if there are none specified.
            try:
                open_entry = open_map[posting.account]
                valid_currencies = open_entry.currencies
                if not valid_currencies:
                    continue
            except KeyError:
                continue

            # Perform the check.
            if posting.units.currency not in valid_currencies:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Invalid currency {} for account '{}'".format(
                            posting.units.currency, posting.account),
                        entry))

    return errors

beancount.ops.validation.validate_data_types(entries, options_map)

检查所有条目属性的数据类型是否符合预期。

用户可通过某种方式过滤条目列表,并能编写代码直接操作这些元组对象而无需类型约束。虽然在有纪律的情况下这通常可行,但我更清楚:还是检查一下更稳妥。此例程会检查所有条目的数据类型和假设。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_data_types(entries, options_map):
    """Check that all the data types of the attributes of entries are as expected.

    Users are provided with a means to filter the list of entries. They're able to
    write code that manipulates those tuple objects without any type constraints.
    With discipline, this mostly works, but I know better: check, just to make sure.
    This routine checks all the data types and assumptions on entries.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []
    for entry in entries:
        try:
            data.sanity_check_types(
                entry, options_map["allow_deprecated_none_for_tags_and_links"])
        except AssertionError as exc:
            errors.append(
                ValidationError(entry.meta,
                                "Invalid data types: {}".format(exc),
                                entry))
    return errors

beancount.ops.validation.validate_documents_paths(entries, options_map)

检查所有已解析的 Document 条目中的文件名是否为绝对路径。

文档条目的处理假定结果为绝对路径。相对路径在解析阶段已被解析,此时我们希望确保无需再进行任何进一步处理。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_documents_paths(entries, options_map):
    """Check that all filenames in resolved Document entries are absolute filenames.

    The processing of document entries is assumed to result in absolute paths.
    Relative paths are resolved at the parsing stage and at point we want to
    make sure we don't have to do any further processing on them.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    return [ValidationError(entry.meta, "Invalid relative path for entry", entry)
            for entry in entries
            if (isinstance(entry, Document) and
                not path.isabs(entry.filename))]

beancount.ops.validation.validate_duplicate_balances(entries, unused_options_map)

检查每日的余额条目是否仅出现一次。

由于我们不支持时间,且条目的声明顺序应保持无关,因此文件中不应出现金额不同的两个平衡条目。然而,我们允许出现两个完全相同的平衡断言,因为在导入过程中可能会发生这种情况。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_duplicate_balances(entries, unused_options_map):
    """Check that balance entries occur only once per day.

    Because we do not support time, and the declaration order of entries is
    meant to be kept irrelevant, two balance entries with different amounts
    should not occur in the file. We do allow two identical balance assertions,
    however, because this may occur during import.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []

    # Mapping of (account, currency, date) to Balance entry.
    balance_entries = {}
    for entry in entries:
        if not isinstance(entry, data.Balance):
            continue

        key = (entry.account, entry.amount.currency, entry.date)
        try:
            previous_entry = balance_entries[key]
            if entry.amount != previous_entry.amount:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate balance assertion with different amounts",
                        entry))
        except KeyError:
            balance_entries[key] = entry

    return errors

beancount.ops.validation.validate_duplicate_commodities(entries, unused_options_map)

检查每种商品的条目是否唯一。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_duplicate_commodities(entries, unused_options_map):
    """Check that commodity entries are unique for each commodity.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []

    # Mapping of (account, currency, date) to Balance entry.
    commodity_entries = {}
    for entry in entries:
        if not isinstance(entry, data.Commodity):
            continue

        key = entry.currency
        try:
            previous_entry = commodity_entries[key]
            if previous_entry:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate commodity directives for '{}'".format(key),
                        entry))
        except KeyError:
            commodity_entries[key] = entry

    return errors

beancount.ops.validation.validate_open_close(entries, unused_options_map)

检查开立和关闭指令本身的约束条件。

此方法检查两种类型的约束:

  1. 每个账户只能出现一次开立或关闭指令。如果检测到重复,将生成错误。

  2. 只有在先前(按时间顺序)已出现开立指令的情况下,才能出现关闭指令。

  3. 关闭指令的日期必须严格大于其对应的开立指令的日期。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/ops/validation.py
def validate_open_close(entries, unused_options_map):
    """Check constraints on open and close directives themselves.

    This method checks two kinds of constraints:

    1. An open or a close directive may only show up once for each account. If a
       duplicate is detected, an error is generated.

    2. Close directives may only appear if an open directive has been seen
       previously (chronologically).

    3. The date of close directives must be strictly greater than their
       corresponding open directive.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []
    open_map = {}
    close_map = {}
    for entry in entries:

        if isinstance(entry, Open):
            if entry.account in open_map:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate open directive for {}".format(entry.account),
                        entry))
            else:
                open_map[entry.account] = entry

        elif isinstance(entry, Close):
            if entry.account in close_map:
                errors.append(
                    ValidationError(
                        entry.meta,
                        "Duplicate close directive for {}".format(entry.account),
                        entry))
            else:
                try:
                    open_entry = open_map[entry.account]
                    if entry.date < open_entry.date:
                        errors.append(
                            ValidationError(
                                entry.meta,
                                "Internal error: closing date for {} "
                                "appears before opening date".format(entry.account),
                                entry))
                except KeyError:
                    errors.append(
                        ValidationError(
                            entry.meta,
                            "Unopened account {} is being closed".format(entry.account),
                            entry))

                close_map[entry.account] = entry

    return errors