beancount.plugins

用于过滤交易的示例插件。

这些是各种以创造性方式过滤条目的示例。

重要提示:这些插件并非旨在作为完整功能,而仅是使用 Beancount 解决问题的实验性尝试,属于可选安装的开发中功能,可通过 --plugin 选项启用,或作为邮件列表中回答问题的临时方案。

beancount.plugins.auto

一个包含所有自动和宽松插件的插件集合。

从某种意义上说,这与“严格模式”相反。在进行一些快速而粗糙的测试时非常有用。你只需导入“auto”插件,并将其放入宏中。

另请参阅:“pedantic”插件。

beancount.plugins.auto_accounts

此模块会自动为尚未开启的账户插入 Open 指令(在第一条条目日期处),并自动移除未使用的账户的 Open 指令。这在进行演示或设置初始交易时作为中间步骤非常方便。

beancount.plugins.auto_accounts.auto_insert_open(entries, unused_options_map)

为未开启的账户插入 Open 指令。

Open 指令将在第一条条目的日期处插入,未使用的账户的 Open 指令将被移除。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 解析器选项字典。

返回:
  • 一组条目(可能比之前包含更多 Open 条目)以及一组错误信息。

源代码位于 beancount/plugins/auto_accounts.py
def auto_insert_open(entries, unused_options_map):
    """Insert Open directives for accounts not opened.

    Open directives are inserted at the date of the first entry. Open directives
    for unused accounts are removed.

    Args:
      entries: A list of directives.
      unused_options_map: A parser options dict.
    Returns:
      A list of entries, possibly with more Open entries than before, and a
      list of errors.
    """
    opened_accounts = {entry.account
                       for entry in entries
                       if isinstance(entry, data.Open)}

    new_entries = []
    accounts_first, _ = getters.get_accounts_use_map(entries)
    for index, (account, date_first_used) in enumerate(sorted(accounts_first.items())):
        if account not in opened_accounts:
            meta = data.new_metadata('<auto_accounts>', index)
            new_entries.append(data.Open(meta, date_first_used, account,
                                         None, None))

    if new_entries:
        new_entries.extend(entries)
        new_entries.sort(key=data.entry_sortkey)
    else:
        new_entries = entries

    return new_entries, []

beancount.plugins.check_average_cost

一个确保未记账交易中成本基础得以保留的插件。

该插件适用于使用“NONE”记账方式的账户,用于手动确保减少方的总成本基础与账户库存的平均值一致。这是实现“AVERAGE”记账方式的第一步。换句话说,此插件提供断言,强制你手动模拟“AVERAGE”记账方式的行为,避免因不匹配的记账而造成成本基础泄露。(注意此处的特殊语境:理想情况下,“NONE”记账方式本不应存在。)

beancount.plugins.check_average_cost.MatchBasisError (元组)

MatchBasisError(source, message, entry)

beancount.plugins.check_average_cost.MatchBasisError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/check_average_cost.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.check_average_cost.MatchBasisError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 MatchBasisError(source, message, entry) 的新实例

beancount.plugins.check_average_cost.MatchBasisError.__replace__(/, self, **kwds) 特殊

返回一个新的 MatchBasisError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/check_average_cost.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.check_average_cost.MatchBasisError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/check_average_cost.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.check_average_cost.validate_average_cost(entries, options_map, config_str=None)

检查未记账的分录的减少部分是否接近平均成本基础。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

  • config_str – 配置项,以浮点数的字符串形式表示。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/check_average_cost.py
def validate_average_cost(entries, options_map, config_str=None):
    """Check that reducing legs on unbooked postings are near the average cost basis.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
      config_str: The configuration as a string version of a float.
    Returns:
      A list of new errors, if any were found.
    """
    # Initialize tolerance bounds.
    if config_str and config_str.strip():
        # pylint: disable=eval-used
        config_obj = eval(config_str, {}, {})
        if not isinstance(config_obj, float):
            raise RuntimeError("Invalid configuration for check_average_cost: "
                               "must be a float")
        tolerance = config_obj
    else:
        tolerance = DEFAULT_TOLERANCE
    min_tolerance = D(1 - tolerance)
    max_tolerance = D(1 + tolerance)

    errors = []
    ocmap = getters.get_account_open_close(entries)
    balances = collections.defaultdict(inventory.Inventory)
    for entry in entries:
        if isinstance(entry, Transaction):
            for posting in entry.postings:
                dopen = ocmap.get(posting.account, None)
                # Only process accounts with a NONE booking value.
                if dopen and dopen[0] and dopen[0].booking == Booking.NONE:
                    balance = balances[(posting.account,
                                        posting.units.currency,
                                        posting.cost.currency if posting.cost else None)]
                    if posting.units.number < ZERO:
                        average = balance.average().get_only_position()
                        if average is not None:
                            number = average.cost.number
                            min_valid = number * min_tolerance
                            max_valid = number * max_tolerance
                            if not (min_valid <= posting.cost.number <= max_valid):
                                errors.append(
                                    MatchBasisError(
                                        entry.meta,
                                        ("Cost basis on reducing posting is too far from "
                                         "the average cost ({} vs. {})".format(
                                             posting.cost.number, average.cost.number)),
                                        entry))
                    balance.add_position(posting)
    return entries, errors

beancount.plugins.check_closing

一个自动在标记为“关闭”的分录上插入余额检查的插件。

某些分录被用户知晓为“关闭交易”,这意味着交易完成后该工具的持仓应为零。例如,大多数普通期权交易属于这种情况:仅持有一份特定工具,最终会到期或被完全平仓。您希望确认这一点,而在 Beancount 中实现此目的的方法是插入一个余额检查。

此插件允许您通过添加元数据更简便地实现此功能。例如,以下交易:

2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
  Assets:US:Brokerage:Main:Options     -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
    closing: TRUE
  Expenses:Financial:Commissions       17.45 USD
  Expenses:Financial:Fees               0.42 USD
  Assets:US:Brokerage:Main:Cash      7416.13 USD
  Income:US:Brokerage:Main:PnL

将扩展为以下两个指令:

2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
  Assets:US:Brokerage:Main:Options     -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
  Expenses:Financial:Commissions       17.45 USD
  Expenses:Financial:Fees               0.42 USD
  Assets:US:Brokerage:Main:Cash      7416.13 USD
  Income:US:Brokerage:Main:PnL

2018-02-17 balance Assets:US:Brokerage:Main:Options  0 QQQ180216C160

当您知道正在关闭持仓时,请插入关闭行。

beancount.plugins.check_closing.check_closing(entries, options_map)

将“closing”元数据扩展为零余额检查。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/check_closing.py
def check_closing(entries, options_map):
    """Expand 'closing' metadata to a zero balance check.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    new_entries = []
    for entry in entries:
        if isinstance(entry, data.Transaction):
            for i, posting in enumerate(entry.postings):
                if posting.meta and posting.meta.get('closing', False):
                    # Remove the metadata.
                    meta = posting.meta.copy()
                    del meta['closing']
                    posting = posting._replace(meta=meta)
                    entry.postings[i] = posting

                    # Insert a balance.
                    date = entry.date + datetime.timedelta(days=1)
                    balance = data.Balance(data.new_metadata("<check_closing>", 0),
                                           date, posting.account,
                                           amount.Amount(ZERO, posting.units.currency),
                                           None, None)
                    new_entries.append(balance)
        new_entries.append(entry)
    return new_entries, []

beancount.plugins.check_commodity

一个验证所有遇到的商品是否都有 Commodity 指令的插件。

如果您比较严谨,希望确保为所使用的每种商品都声明了属性,此插件非常有用。例如,在使用投资组合导出功能时很有帮助。

您可以提供一个 (账户正则表达式, 货币正则表达式) 的映射作为选项,以有选择地忽略某些商品的检查。请谨慎使用,因为这会绕过该插件提供的检查。然而,在活跃的期权交易账户中,会生成大量产品,商品指令的数量可能过于庞大,逐一声明每个嵌入了行权价和到期日的期权合约(例如 'SPX_121622P3300')并不高效。

请注意,如果某个符号在至少一个账户中被忽略,则它在所有价格指令和元数据值中也将被忽略。

beancount.plugins.check_commodity.CheckCommodityError (元组)

CheckCommodityError(source, message, entry)

beancount.plugins.check_commodity.CheckCommodityError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/check_commodity.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.check_commodity.CheckCommodityError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 CheckCommodityError(source, message, entry) 的新实例

beancount.plugins.check_commodity.CheckCommodityError.__replace__(/, self, **kwds) 特殊

返回一个新的 CheckCommodityError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/check_commodity.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.check_commodity.CheckCommodityError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/check_commodity.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.check_commodity.ConfigError (元组)

ConfigError(source, message, entry)

beancount.plugins.check_commodity.ConfigError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/check_commodity.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.check_commodity.ConfigError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 ConfigError(source, message, entry) 的新实例

beancount.plugins.check_commodity.ConfigError.__replace__(/, self, **kwds) 特殊

返回一个新的 ConfigError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/check_commodity.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.check_commodity.ConfigError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/check_commodity.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.check_commodity.get_commodity_map_ex(entries, metadata=False)

在指令流中查找并提取商品。

源代码位于 beancount/plugins/check_commodity.py
def get_commodity_map_ex(entries, metadata=False):
    """Find and extract commodities in the stream of directives."""

    # Find commodity names in metadata.
    #
    # TODO(dnicolodi) Unfortunately detecting commodities in metadata
    # values may result in false positives: common used string are
    # matched by the regular expression. Revisit this when commodities
    # will be represented with their own type.
    ignore = set(['filename', 'lineno', '__automatic__'])
    regexp = re.compile(CURRENCY_RE)
    def currencies_in_meta(entry):
        if entry.meta is not None:
            for key, value in entry.meta.items():
                if isinstance(value, str) and key not in ignore:
                    if regexp.fullmatch(value):
                        yield value

    commodities_map = {}
    occurrences = set()
    for entry in entries:
        if isinstance(entry, data.Commodity):
            commodities_map[entry.currency] = entry

        elif isinstance(entry, data.Open):
            if entry.currencies:
                for currency in entry.currencies:
                    occurrences.add((entry.account, currency))

        elif isinstance(entry, data.Transaction):
            for posting in entry.postings:

                # Main currency.
                units = posting.units
                occurrences.add((posting.account, units.currency))

                # Currency in cost.
                cost = posting.cost
                if cost:
                    occurrences.add((posting.account, cost.currency))

                # Currency in price.
                price = posting.price
                if price:
                    occurrences.add((posting.account, price.currency))

                # Currency in posting metadata.
                if metadata:
                    for currency in currencies_in_meta(posting):
                        occurrences.add((posting.account, currency))

        elif isinstance(entry, data.Balance):
            occurrences.add((entry.account, entry.amount.currency))

        elif isinstance(entry, data.Price):
            occurrences.add((PRICE_CONTEXT, entry.currency))
            occurrences.add((PRICE_CONTEXT, entry.amount.currency))

        # Entry metadata.
        if metadata:
            for currency in currencies_in_meta(entry):
                occurrences.add((METADATA_CONTEXT, currency))

    return occurrences, commodities_map

beancount.plugins.check_commodity.validate_commodity_directives(entries, options_map, config_str=None)

查找所有使用过的商品,并确保它们都有对应的 Commodity 指令。

参数:
  • entries – 一组指令列表。

  • options_map – 一个选项映射。

  • config_str – 配置项,以浮点数的字符串形式表示。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/check_commodity.py
def validate_commodity_directives(entries, options_map, config_str=None):
    """Find all commodities used and ensure they have a corresponding Commodity directive.

    Args:
      entries: A list of directives.
      options_map: An options map.
      config_str: The configuration as a string version of a float.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []

    # pylint: disable=eval-used
    if config_str:
        config_obj = eval(config_str, {}, {})
        if not isinstance(config_obj, dict):
            errors.append(ConfigError(
                data.new_metadata('<commodity_attr>', 0),
                "Invalid configuration for check_commodity plugin; skipping.", None))
            return entries, errors
    else:
        config_obj = {}

    # Compile the regular expressions, producing an error if invalid.
    ignore_map = {}
    for key, value in config_obj.items():
        kv = []
        for pattern in key, value:
            try:
                kv.append(re.compile(pattern).match)
            except re.error:
                meta = data.new_metadata('<check_commodity>', 0)
                errors.append(
                    CheckCommodityError(
                        meta, "Invalid regexp: '{}' for {}".format(value, key), None))
        if len(kv) == 2:
            ignore_map[kv[0]] = kv[1]

    # Get all the occurrences of commodities and a mapping of the directives.
    #
    # TODO(blais): Establish a distinction at the parser level for commodities
    # and strings, so that we can turn detection of them in metadata.
    occurrences, commodity_map = get_commodity_map_ex(entries, metadata=False)

    # Process all currencies with context.
    issued = set()
    ignored = set()
    anonymous = set()
    for context, currency in sorted(occurrences):
        if context in ANONYMOUS:
            anonymous.add((context, currency))
            continue
        commodity_entry = commodity_map.get(currency, None)

        # Skip if the commodity was declared, or if an error for that commodity
        # has already been issued.
        if commodity_entry is not None or currency in issued:
            continue

        # If any of the ignore patterns matches, ignore and record ignored.
        if any((context_re(context) and currency_re(currency))
                for context_re, currency_re in ignore_map.items()):
            ignored.add(currency)
            continue

        # Issue error.
        meta = data.new_metadata('<check_commodity>', 0)
        errors.append(
            CheckCommodityError(
                meta,
                "Missing Commodity directive for '{}' in '{}'".format(
                    currency, context),
                None))

        # Process it only once.
        issued.add(currency)

    # Process all currencies out of context, automatically ignoring those which
    # have already been issued with account context..
    for context, currency in sorted(anonymous):
        commodity_entry = commodity_map.get(currency, None)

        # Skip if (a) the commodity was declared, any of the ignore patterns
        # matches, or an error for that commodity has already been issued.
        if (commodity_entry is not None or
            currency in issued or
            currency in ignored):
            continue

        # Issue error.
        meta = data.new_metadata('<check_commodity>', 0)
        errors.append(
            CheckCommodityError(
                meta,
                "Missing Commodity directive for '{}' in '{}'".format(
                    currency, context),
                None))

    return entries, errors

beancount.plugins.check_drained

在资产负债表账户关闭前插入余额为零的检查。

对于带有 Close 指令的资产负债表账户(资产、负债和权益),在其关闭日期之后插入 Balance 指令,涵盖该账户中出现过且被声明为合法的所有商品。这相当于执行以下转换:

2018-02-01 open  Assets:Project:Cash      USD,CAD
...
2020-02-01 close Assets:Project:Cash

!!! to 2018-02-01 open Assets:Project:Cash USD,CAD ...

2020-02-01 close Assets:Project:Cash
2020-02-02 balance Assets:Project:Cash    0 USD
2020-02-02 balance Assets:Project:Cash    0 CAD

beancount.plugins.check_drained.check_drained(entries, options_map)

检查已关闭的账户是否为空。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/check_drained.py
def check_drained(entries, options_map):
    """Check that closed accounts are empty.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    acctypes = options.get_account_types(options_map)
    is_covered = functools.partial(
        account_types.is_balance_sheet_account, account_types=acctypes
    )

    new_entries = []
    currencies = collections.defaultdict(set)
    balances = collections.defaultdict(set)
    for entry in entries:
        if isinstance(entry, data.Transaction):
            # Accumulate all the currencies seen in each account over time.
            for posting in entry.postings:
                if is_covered(posting.account):
                    currencies[posting.account].add(posting.units.currency)

        elif isinstance(entry, data.Open):
            # Accumulate all the currencies declared in the account opening.
            if is_covered(entry.account) and entry.currencies:
                for currency in entry.currencies:
                    currencies[entry.account].add(currency)

        elif isinstance(entry, data.Balance):
            # Ignore balances where directives are present.
            if is_covered(entry.account):
                balances[entry.account].add((entry.date, entry.amount.currency))

        if isinstance(entry, data.Close):
            if is_covered(entry.account):
                for currency in currencies[entry.account]:
                    # Skip balance insertion due to the presence of an explicit one.
                    if (entry.date, currency) in balances[entry.account]:
                        continue

                    # Insert a balance directive.
                    balance_entry = data.Balance(
                        # Note: We use the close directive's meta so that
                        # balance errors direct the user to the corresponding
                        # close directive.
                        entry.meta,
                        entry.date + ONE_DAY,
                        entry.account,
                        amount.Amount(ZERO, currency),
                        None,
                        None,
                    )
                    new_entries.append(balance_entry)

        new_entries.append(entry)

    return new_entries, []

beancount.plugins.close_tree

此插件在账户关闭时为其所有子账户插入关闭指令。未打开的父账户也可以被关闭。任何显式指定的关闭指令将保持不变。

例如,给定以下内容:

2017-11-10 open Assets:Brokerage:AAPL
2017-11-10 open Assets:Brokerage:ORNG
2018-11-10 close Assets:Brokerage  ; this does not necessarily need to be opened

该插件将其转换为:

2017-11-10 open Assets:Brokerage:AAPL
2017-11-10 open Assets:Brokerage:ORNG
2018-11-10 close Assets:Brokerage:AAPL
2018-11-10 close Assets:Brokerage:ORNG

在任何生成您希望自动关闭的账户树的 open 指令的插件之后调用此插件。例如 Beancount 自带的 auto_accounts 插件:

plugin "beancount.plugins.auto_accounts"
plugin "beancount.plugins.close_tree"

beancount.plugins.close_tree.close_tree(entries, unused_options_map)

为已关闭账户的所有子账户插入关闭条目。

参数:
  • entries – 指令列表。我们仅关注 Open/Close 实例。

  • unused_options_map – 解析器选项字典。

返回:
  • 一个包含条目和错误的元组。

源代码位于 beancount/plugins/close_tree.py
def close_tree(entries, unused_options_map):
    """Insert close entries for all subaccounts of a closed account.

    Args:
      entries: A list of directives. We're interested only in the Open/Close instances.
      unused_options_map: A parser options dict.
    Returns:
      A tuple of entries and errors.
    """

    new_entries = []
    errors = []

    opens = set(entry.account for entry in entries if isinstance(entry, Open))
    closes = set(entry.account for entry in entries if isinstance(entry, Close))

    for entry in entries:
        if isinstance(entry, Close):
            subaccounts = [
                account
                for account in opens
                if account.startswith(entry.account + ":") and account not in closes
            ]
            for subacc in subaccounts:
                meta = data.new_metadata("<beancount.plugins.close_tree>", 0)
                close_entry = data.Close(meta, entry.date, subacc)
                new_entries.append(close_entry)
                # So we don't attempt to re-close a grandchild that a child closed
                closes.add(subacc)
            if entry.account in opens:
                new_entries.append(entry)
        else:
            new_entries.append(entry)

    return new_entries, errors

beancount.plugins.coherent_cost

此插件验证以成本持有货币时不会被以价格转换,反之亦然。通常情况下应如此,使用它可防止用户在未指定成本基础的情况下出售持仓。

CoherentCostError(source, message, entry)

CoherentCostError(source, message, entry)

beancount.plugins.coherent_cost.CoherentCostError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/coherent_cost.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.coherent_cost.CoherentCostError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 CoherentCostError(source, message, entry) 的新实例

beancount.plugins.coherent_cost.CoherentCostError.__replace__(/, self, **kwds) 特殊

返回一个新的 CoherentCostError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/coherent_cost.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.coherent_cost.CoherentCostError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/coherent_cost.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.coherent_cost.validate_coherent_cost(entries, unused_options_map)

检查所有货币要么全部使用成本,要么完全不使用成本,但不能两者兼有。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/coherent_cost.py
def validate_coherent_cost(entries, unused_options_map):
    """Check that all currencies are either used at cost or not at all, but never both.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []

    with_cost = {}
    without_cost = {}
    for entry in data.filter_txns(entries):
        for posting in entry.postings:
            target_set = without_cost if posting.cost is None else with_cost
            currency = posting.units.currency
            target_set.setdefault(currency, entry)

    for currency in set(with_cost) & set(without_cost):
        errors.append(
            CoherentCostError(
                without_cost[currency].meta,
                "Currency '{}' is used both with and without cost".format(currency),
                with_cost[currency]))
        # Note: We really ought to include both of the first transactions here.

    return entries, errors

beancount.plugins.commodity_attr

一个插件,用于断言所有 Commodity 指令都具有特定属性,且该属性的值属于一组预定义的枚举值。

配置必须是属性名称到有效值列表的映射,例如:

plugin "beancount.plugins.commodity_attr" "{
  'sector': ['Technology', 'Financials', 'Energy'],
  'name': None,
}"

如果 Commodity 指令缺少该属性,或属性值不在有效集合中,插件将报错。如果您只想确保属性已设置,可将有效值列表设为 None,如上例中的 'name' 属性所示。

beancount.plugins.commodity_attr.CommodityError (tuple)

CommodityError(source, message, entry)

beancount.plugins.commodity_attr.CommodityError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/commodity_attr.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.commodity_attr.CommodityError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 CommodityError(source, message, entry) 的新实例

beancount.plugins.commodity_attr.CommodityError.__replace__(/, self, **kwds) 特殊

返回一个新的 CommodityError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/commodity_attr.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.commodity_attr.CommodityError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/commodity_attr.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.commodity_attr.ConfigError (tuple)

ConfigError(source, message, entry)

beancount.plugins.commodity_attr.ConfigError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/commodity_attr.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.commodity_attr.ConfigError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 ConfigError(source, message, entry) 的新实例

beancount.plugins.commodity_attr.ConfigError.__replace__(/, self, **kwds) 特殊

返回一个新的 ConfigError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/commodity_attr.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.commodity_attr.ConfigError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/commodity_attr.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.commodity_attr.validate_commodity_attr(entries, unused_options_map, config_str)

检查所有 Commodity 指令是否具有有效的属性。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

  • config_str – 一个配置字符串。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/commodity_attr.py
def validate_commodity_attr(entries, unused_options_map, config_str):
    """Check that all Commodity directives have a valid attribute.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
      config_str: A configuration string.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []

    # pylint: disable=eval-used
    config_obj = eval(config_str, {}, {})
    if not isinstance(config_obj, dict):
        errors.append(ConfigError(
            data.new_metadata('<commodity_attr>', 0),
            "Invalid configuration for commodity_attr plugin; skipping.", None))
        return entries, errors

    validmap = {attr: frozenset(values) if values is not None else None
                for attr, values in config_obj.items()}
    for entry in entries:
        if not isinstance(entry, data.Commodity):
            continue
        for attr, values in validmap.items():
            value = entry.meta.get(attr, None)
            if value is None:
                errors.append(CommodityError(
                    entry.meta,
                    "Missing attribute '{}' for Commodity directive {}".format(
                        attr, entry.currency), None))
                continue
            if values and value not in values:
                errors.append(CommodityError(
                    entry.meta,
                    "Invalid value '{}' for attribute {}, Commodity".format(value, attr) +
                    " directive {}; valid options: {}".format(
                        entry.currency, ', '.join(values)), None))

    return entries, errors

beancount.plugins.currency_accounts

货币账户的实现。

这是对以下页面所述方法的自动实现:https://www.mathstat.dal.ca/~selinger/accounting/tutorial.html

您只需这样启用它:

plugin "beancount.plugins.currency_accounts" "Equity:CurrencyAccounts"

系统将在指定的基础账户下自动创建账户,并在其后追加货币名称,例如:

Equity:CurrencyAccounts:CAD
Equity:CurrencyAccounts:USD

等等。您可以使用以下查询查看账户余额:

bean-query $L "select account, sum(position), convert(sum(position), 'USD')
               where date &gt;= 2018-01-01 and  account ~ 'CurrencyAccounts' "

转换后金额的总和应为一个不太大的数值:

bean-query $L "select convert(sum(position), 'USD')
               where date &gt;= 2018-01-01 and  account ~ 'CurrencyAccounts'"

警告:这是一个原型。请参见下方代码中的 FIXME 注释,它们指出了潜在的问题。

beancount.plugins.currency_accounts.get_neutralizing_postings(curmap, base_account, new_accounts)

处理一条记录。

参数:
  • curmap —— 一个字典,键为货币,值为该交易的 Posting 列表。

  • base_account —— 字符串,表示要插入的基础账户名称。

  • new_accounts —— 一个集合,用于累积新账户名称的可变容器。

返回:
  • 返回修改后的记录,其中插入了新的 Posting 以平衡货币交易账户。

源代码位于 beancount/plugins/currency_accounts.py
def get_neutralizing_postings(curmap, base_account, new_accounts):
    """Process an entry.

    Args:
      curmap: A dict of currency to a list of Postings of this transaction.
      base_account: A string, the root account name to insert.
      new_accounts: A set, a mutable accumulator of new account names.
    Returns:
      A modified entry, with new postings inserted to rebalance currency trading
      accounts.
    """
    new_postings = []
    for currency, postings in curmap.items():
        # Compute the per-currency balance.
        inv = inventory.Inventory()
        for posting in postings:
            inv.add_amount(convert.get_cost(posting))
        if inv.is_empty():
            new_postings.extend(postings)
            continue

        # Re-insert original postings and remove price conversions.
        #
        # Note: This may cause problems if the implicit_prices plugin is
        # configured to run after this one, or if you need the price annotations
        # for some scripting or serious work.
        #
        # FIXME: We need to handle these important cases (they're not frivolous,
        # this is a prototype), probably by inserting some exceptions with
        # collaborating code in the booking (e.g. insert some metadata that
        # disables price conversions on those postings).
        #
        # FIXME(2): Ouch! Some of the residual seeps through here, where there
        # are more than a single currency block. This needs fixing too. You can
        # easily mitigate some of this to some extent, by excluding transactions
        # which don't have any price conversion in them.
        for pos in postings:
            if pos.price is not None:
                pos = pos._replace(price=None)
            new_postings.append(pos)

        # Insert the currency trading accounts postings.
        amount = inv.get_only_position().units
        acc = account.join(base_account, currency)
        new_accounts.add(acc)
        new_postings.append(
            Posting(acc, -amount, None, None, None, None))

    return new_postings

beancount.plugins.currency_accounts.group_postings_by_weight_currency(entry)

返回此记录可能需要调整的位置。

源代码位于 beancount/plugins/currency_accounts.py
def group_postings_by_weight_currency(entry: Transaction):
    """Return where this entry might require adjustment."""
    curmap = collections.defaultdict(list)
    has_price = False
    for posting in entry.postings:
        currency = posting.units.currency
        if posting.cost:
            currency = posting.cost.currency
            if posting.price:
                assert posting.price.currency == currency
            elif posting.price:
                has_price = True
                currency = posting.price.currency
        if posting.price:
            has_price = True
        curmap[currency].append(posting)
    return curmap, has_price

beancount.plugins.currency_accounts.insert_currency_trading_postings(entries, options_map, config)

插入货币交易 Posting。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

  • config —— 货币交易账户的基础账户名称。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/currency_accounts.py
def insert_currency_trading_postings(entries, options_map, config):
    """Insert currency trading postings.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
      config: The base account name for currency trading accounts.
    Returns:
      A list of new errors, if any were found.
    """
    base_account = config.strip()
    if not account.is_valid(base_account):
        base_account = DEFAULT_BASE_ACCOUNT

    errors = []
    new_entries = []
    new_accounts = set()
    for entry in entries:
        if isinstance(entry, Transaction):
            curmap, has_price = group_postings_by_weight_currency(entry)
            if has_price and len(curmap) > 1:
                new_postings = get_neutralizing_postings(
                    curmap, base_account, new_accounts)
                entry = entry._replace(postings=new_postings)
                if META_PROCESSED:
                    entry.meta[META_PROCESSED] = True
        new_entries.append(entry)

    earliest_date = entries[0].date
    open_entries = [
        data.Open(data.new_metadata('<currency_accounts>', index),
                  earliest_date, acc, None, None)
        for index, acc in enumerate(sorted(new_accounts))]

    return open_entries + new_entries, errors

beancount.plugins.implicit_prices

此插件会为所有带有价格或指令的 Posting,或作为增补 Posting 且带有成本指令的 Posting 自动生成 Price 指令。

beancount.plugins.implicit_prices.ImplicitPriceError (元组)

ImplicitPriceError(source, message, entry)

beancount.plugins.implicit_prices.ImplicitPriceError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/implicit_prices.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.implicit_prices.ImplicitPriceError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 ImplicitPriceError(source, message, entry) 的新实例

beancount.plugins.implicit_prices.ImplicitPriceError.__replace__(/, self, **kwds) 特殊

返回一个新的 ImplicitPriceError 对象,用指定的新值替换相应字段

源代码位于 beancount/plugins/implicit_prices.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.implicit_prices.ImplicitPriceError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/implicit_prices.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.implicit_prices.add_implicit_prices(entries, unused_options_map)

从交易中插入隐式定义的价格。

显式价格条目将直接保留在输出列表中。来自带有成本或交易条目中价格的过账项,将被合成新的价格条目并添加到输出列表中。

参数:
  • entries – 一组指令。我们仅关注其中的 Transaction 实例。

  • unused_options_map – 解析器选项字典。

返回:
  • 一组条目,可能比之前包含更多 Price 条目,以及一组错误。

源代码位于 beancount/plugins/implicit_prices.py
def add_implicit_prices(entries, unused_options_map):
    """Insert implicitly defined prices from Transactions.

    Explicit price entries are simply maintained in the output list. Prices from
    postings with costs or with prices from Transaction entries are synthesized
    as new Price entries in the list of entries output.

    Args:
      entries: A list of directives. We're interested only in the Transaction instances.
      unused_options_map: A parser options dict.
    Returns:
      A list of entries, possibly with more Price entries than before, and a
      list of errors.
    """
    new_entries = []
    errors = []

    # A dict of (date, currency, cost-currency) to price entry.
    new_price_entry_map = {}

    balances = collections.defaultdict(inventory.Inventory)
    for entry in entries:
        # Always replicate the existing entries.
        new_entries.append(entry)

        if isinstance(entry, Transaction):
            # Inspect all the postings in the transaction.
            for posting in entry.postings:
                units = posting.units
                cost = posting.cost

                # Check if the position is matching against an existing
                # position.
                _, booking = balances[posting.account].add_position(posting)

                # Add prices when they're explicitly specified on a posting. An
                # explicitly specified price may occur in a conversion, e.g.
                #      Assets:Account    100 USD @ 1.10 CAD
                # or, if a cost is also specified, as the current price of the
                # underlying instrument, e.g.
                #      Assets:Account    100 HOOL {564.20} @ {581.97} USD
                if posting.price is not None:
                    meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
                    meta[METADATA_FIELD] = "from_price"
                    price_entry = data.Price(meta, entry.date,
                                             units.currency,
                                             posting.price)

                # Add costs, when we're not matching against an existing
                # position. This happens when we're just specifying the cost,
                # e.g.
                #      Assets:Account    100 HOOL {564.20}
                elif (cost is not None and
                      booking != inventory.MatchResult.REDUCED):
                    # TODO(blais): What happens here if the account has no
                    # booking strategy? Do we end up inserting a price for the
                    # reducing leg?  Check.
                    meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
                    meta[METADATA_FIELD] = "from_cost"
                    price_entry = data.Price(meta, entry.date,
                                             units.currency,
                                             amount.Amount(cost.number, cost.currency))
                else:
                    price_entry = None

                if price_entry is not None:
                    key = (price_entry.date,
                           price_entry.currency,
                           price_entry.amount.number,  # Ideally should be removed.
                           price_entry.amount.currency)
                    try:
                        new_price_entry_map[key]

                        ## Do not fail for now. We still have many valid use
                        ## cases of duplicate prices on the same date, for
                        ## example, stock splits, or trades on two dates with
                        ## two separate reported prices. We need to figure out a
                        ## more elegant solution for this in the long term.
                        ## Keeping both for now. We should ideally not use the
                        ## number in the de-dup key above.
                        #
                        # dup_entry = new_price_entry_map[key]
                        # if price_entry.amount.number == dup_entry.amount.number:
                        #     # Skip duplicates.
                        #     continue
                        # else:
                        #     errors.append(
                        #         ImplicitPriceError(
                        #             entry.meta,
                        #             "Duplicate prices for {} on {}".format(entry,
                        #                                                    dup_entry),
                        #             entry))
                    except KeyError:
                        new_price_entry_map[key] = price_entry
                        new_entries.append(price_entry)

    return new_entries, errors

beancount.plugins.leafonly

一个在金额被记入非叶账户(即具有子账户的账户)时发出错误的插件。

这是一个可选的额外约束。安装此插件后,它将对所有向非叶账户过账的账户发出错误。某些用户可能希望禁止此类行为,并强制仅允许叶账户有过账。

beancount.plugins.leafonly.LeafOnlyError (tuple)

LeafOnlyError(source, message, entry)

beancount.plugins.leafonly.LeafOnlyError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/leafonly.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.leafonly.LeafOnlyError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 LeafOnlyError(source, message, entry) 的新实例

beancount.plugins.leafonly.LeafOnlyError.__replace__(/, self, **kwds) 特殊

返回一个新的 LeafOnlyError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/leafonly.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.leafonly.LeafOnlyError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/leafonly.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.leafonly.validate_leaf_only(entries, unused_options_map)

检查是否存在向非叶账户过账的情况。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/leafonly.py
def validate_leaf_only(entries, unused_options_map):
    """Check for non-leaf accounts that have postings on them.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    real_root = realization.realize(entries, compute_balance=False)

    default_meta = data.new_metadata('<leafonly>', 0)
    open_close_map = None # Lazily computed.
    errors = []
    for real_account in realization.iter_children(real_root):
        if len(real_account) > 0 and real_account.txn_postings:

            if open_close_map is None:
                open_close_map = getters.get_account_open_close(entries)

            try:
                open_entry = open_close_map[real_account.account][0]
            except KeyError:
                open_entry = None
            errors.append(LeafOnlyError(
                open_entry.meta if open_entry else default_meta,
                "Non-leaf account '{}' has postings on it".format(real_account.account),
                open_entry))

    return entries, errors

beancount.plugins.noduplicates

此插件验证不存在重复的交易。

beancount.plugins.noduplicates.validate_no_duplicates(entries, unused_options_map)

通过计算哈希值检查条目是否唯一。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/noduplicates.py
def validate_no_duplicates(entries, unused_options_map):
    """Check that the entries are unique, by computing hashes.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    unused_hashes, errors = compare.hash_entries(entries, exclude_meta=True)
    return entries, errors

beancount.plugins.nounused

此插件验证不存在未使用的账户。

beancount.plugins.nounused.UnusedAccountError (tuple)

UnusedAccountError(source, message, entry)

beancount.plugins.nounused.UnusedAccountError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/nounused.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.nounused.UnusedAccountError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 UnusedAccountError(source, message, entry) 的新实例

beancount.plugins.nounused.UnusedAccountError.__replace__(/, self, **kwds) 特殊

返回一个新的 UnusedAccountError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/nounused.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.nounused.UnusedAccountError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/nounused.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.nounused.validate_unused_accounts(entries, unused_options_map)

检查所有声明为开放的账户是否实际被使用。

我们检查所有已开放的账户是否至少被其他指令引用过。这些账户可能是未使用的,因此会发出警告(我们倾向于严格检查)。请注意,一个被打开后又关闭的账户被视为已使用——这在现实中是有效的使用场景。如果您有账户被打开但从未使用的用例,可以通过为该账户初始化余额断言、填充指令,甚至使用注释来静音该警告。

(这很可能是作为“严格模式”插件可选包含的合适候选。)

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/nounused.py
def validate_unused_accounts(entries, unused_options_map):
    """Check that all accounts declared open are actually used.

    We check that all of the accounts that are open are at least referred to by
    another directive. These are probably unused, so issue a warning (we like to
    be pedantic). Note that an account that is open and then closed is
    considered used--this is a valid use case that may occur in reality. If you
    have a use case for an account to be open but never used, you can quiet that
    warning by initializing the account with a balance asserts or a pad
    directive, or even use a note will be sufficient.

    (This is probably a good candidate for optional inclusion as a "pedantic"
    plugin.)

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    # Find all the accounts referenced by entries which are not Open, and the
    # open directives for error reporting below.
    open_map = {}
    referenced_accounts = set()
    for entry in entries:
        if isinstance(entry, data.Open):
            open_map[entry.account] = entry
            continue
        referenced_accounts.update(getters.get_entry_accounts(entry))

    # Create a list of suitable errors, with the location of the Open directives
    # corresponding to the unused accounts.
    errors = [UnusedAccountError(open_entry.meta,
                                 "Unused account '{}'".format(account),
                                 open_entry)
              for account, open_entry in open_map.items()
              if account not in referenced_accounts]
    return entries, errors

beancount.plugins.onecommodity

当一个账户中使用了多种商品时,该插件会发出错误。

对于投资或交易账户,通过将股票名称作为账户名称的末尾部分,可以更方便地围绕单一股票进行操作筛选。

注意:

  • 该插件会自动跳过在 Open 指令中显式声明了商品的账户。

  • 您也可以在账户的 Open 指令中设置元数据 "onecommodity: FALSE" 来跳过对该账户的检查。

  • 如果提供,配置应为一个正则表达式,用于限定需检查的账户集合。

beancount.plugins.onecommodity.OneCommodityError (元组)

OneCommodityError(source, message, entry)

beancount.plugins.onecommodity.OneCommodityError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/onecommodity.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.onecommodity.OneCommodityError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 OneCommodityError(source, message, entry) 的新实例

beancount.plugins.onecommodity.OneCommodityError.__replace__(/, self, **kwds) 特殊

返回一个新的 OneCommodityError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/onecommodity.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.onecommodity.OneCommodityError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/onecommodity.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.onecommodity.validate_one_commodity(entries, unused_options_map, config=None)

检查每个账户是否仅包含单一商品的单位。

这是一个可选的额外约束,尽管 Beancount 支持包含多种商品的库存和聚合。我认为这与 GnuCash 的模型一致,即每个账户仅关联一种商品。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

  • config – 插件配置字符串,一个用于匹配需检查账户子集的正则表达式。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/onecommodity.py
def validate_one_commodity(entries, unused_options_map, config=None):
    """Check that each account has units in only a single commodity.

    This is an extra constraint that you may want to apply optionally, despite
    Beancount's ability to support inventories and aggregations with more than
    one commodity. I believe this also matches GnuCash's model, where each
    account has a single commodity attached to it.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
      config: The plugin configuration string, a regular expression to match
        against the subset of accounts to check.
    Returns:
      A list of new errors, if any were found.
    """
    accounts_re = re.compile(config) if config else None

    # Mappings of account name to lists of currencies for each units and cost.
    units_map = collections.defaultdict(set)
    cost_map = collections.defaultdict(set)

    # Mappings to use just for getting a relevant source.
    units_source_map = {}
    cost_source_map = {}

    # Gather the set of accounts to skip from the Open directives.
    skip_accounts = set()
    for entry in entries:
        if not isinstance(entry, data.Open):
            continue
        if (not entry.meta.get("onecommodity", True) or
            (accounts_re and not accounts_re.match(entry.account)) or
            (entry.currencies and len(entry.currencies) > 1)):
            skip_accounts.add(entry.account)

    # Accumulate all the commodities used.
    for entry in entries:
        if isinstance(entry, data.Transaction):
            for posting in entry.postings:
                if posting.account in skip_accounts:
                    continue

                units = posting.units
                units_map[posting.account].add(units.currency)
                if len(units_map[posting.account]) > 1:
                    units_source_map[posting.account] = entry

                cost = posting.cost
                if cost:
                    cost_map[posting.account].add(cost.currency)
                    if len(cost_map[posting.account]) > 1:
                        cost_source_map[posting.account] = entry

        elif isinstance(entry, data.Balance):
            if entry.account in skip_accounts:
                continue

            units_map[entry.account].add(entry.amount.currency)
            if len(units_map[entry.account]) > 1:
                units_source_map[entry.account] = entry

        elif isinstance(entry, data.Open):
            if entry.currencies and len(entry.currencies) > 1:
                skip_accounts.add(entry.account)

    # Check units.
    errors = []
    for account, currencies in units_map.items():
        if account in skip_accounts:
            continue
        if len(currencies) > 1:
            errors.append(OneCommodityError(
                units_source_map[account].meta,
                "More than one currency in account '{}': {}".format(
                    account, ','.join(currencies)),
                None))

    # Check costs.
    for account, currencies in cost_map.items():
        if account in skip_accounts:
            continue
        if len(currencies) > 1:
            errors.append(OneCommodityError(
                cost_source_map[account].meta,
                "More than one cost currency in account '{}': {}".format(
                    account, ','.join(currencies)),
                None))

    return entries, errors

beancount.plugins.pedantic

一个插件集合,用于触发所有严格模式插件。

beancount.plugins.sellgains

一个交叉验证已申报收益与持仓出售价格的插件。

当你卖出股票时,收益可以通过相应的现金金额自动推断。例如,在以下交易中,第二和第三条分录应与所售股票的价值匹配:

1999-07-31 * "Sell"
  Assets:US:BRS:Company:ESPP          -81 ADSK {26.3125 USD}
  Assets:US:BRS:Company:Cash      2141.36 USD
  Expenses:Financial:Fees            0.08 USD
  Income:US:Company:ESPP:PnL      -10.125 USD

成本基础将与以下数值进行核对:2141.36 + 0.08 + -10.125。即,余额检查计算:

-81 x 26.3125 = -2131.3125 + 2141.36 + 0.08 + -10.125

并检查残差是否低于一个小的容差值。

但通常,收入科目并不会在对账单中直接提供。Beancount 可以利用余额自动推断该值,非常方便,如下所示:

1999-07-31 * "Sell"
  Assets:US:BRS:Company:ESPP          -81 ADSK {26.3125 USD}
  Assets:US:BRS:Company:Cash      2141.36 USD
  Expenses:Financial:Fees            0.08 USD
  Income:US:Company:ESPP:PnL

此外,大多数情况下,你的交易确认单会提供卖出价格,因此你可以直接输入:

1999-07-31 * "Sell"
  Assets:US:BRS:Company:ESPP          -81 ADSK {26.3125 USD} @ 26.4375 USD
  Assets:US:BRS:Company:Cash      2141.36 USD
  Expenses:Financial:Fees            0.08 USD
  Income:US:Company:ESPP:PnL

因此,理论上,如果提供了价格(26.4375 USD),我们可以验证按该价格出售所得是否与非收入分录匹配。即验证:

-81 x 26.4375 = -2141.4375 + 2141.36 + 0.08 +

的差值是否低于一个小的容差值。该插件正是执行此验证。

总体而言,它执行以下操作:对于具有成本和价格的分录,验证所有非收入账户的持仓总和是否在容差范围内。

这提供了额外的验证层级,使你无需手动输入收入金额,同时利用价格信息作为额外的错误检查机制,以防输入错误。

beancount.plugins.sellgains.SellGainsError (元组)

SellGainsError(source, message, entry)

beancount.plugins.sellgains.SellGainsError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/sellgains.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.sellgains.SellGainsError.__new__(_cls, source, message, entry) 特殊 静态方法

创建一个新的 SellGainsError 实例 (source, message, entry)

beancount.plugins.sellgains.SellGainsError.__replace__(/, self, **kwds) 特殊

返回一个新的 SellGainsError 对象,用指定的新值替换某些字段

源代码位于 beancount/plugins/sellgains.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.sellgains.SellGainsError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/sellgains.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.sellgains.validate_sell_gains(entries, options_map)

检查已售出持仓且带有价格的资产账户总额。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 如果发现任何错误,则返回新的错误列表。

源代码位于 beancount/plugins/sellgains.py
def validate_sell_gains(entries, options_map):
    """Check the sum of asset account totals for lots sold with a price on them.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of new errors, if any were found.
    """
    errors = []
    acc_types = options.get_account_types(options_map)
    proceed_types = set([acc_types.assets,
                         acc_types.liabilities,
                         acc_types.equity,
                         acc_types.expenses])

    for entry in entries:
        if not isinstance(entry, data.Transaction):
            continue

        # Find transactions whose lots at cost all have a price.
        postings_at_cost = [posting
                            for posting in entry.postings
                            if posting.cost is not None]
        if not postings_at_cost or not all(posting.price is not None
                                           for posting in postings_at_cost):
            continue

        # Accumulate the total expected proceeds and the sum of the asset and
        # expenses legs.
        total_price = inventory.Inventory()
        total_proceeds = inventory.Inventory()
        for posting in entry.postings:
            # If the posting is held at cost, add the priced value to the balance.
            if posting.cost is not None:
                assert posting.price is not None
                price = posting.price
                total_price.add_amount(amount.mul(price, -posting.units.number))
            else:
                # Otherwise, use the weight and ignore postings to Income accounts.
                atype = account_types.get_account_type(posting.account)
                if atype in proceed_types:
                    total_proceeds.add_amount(convert.get_weight(posting))

        # Compare inventories, currency by currency.
        dict_price = {pos.units.currency: pos.units.number
                      for pos in total_price}
        dict_proceeds = {pos.units.currency: pos.units.number
                         for pos in total_proceeds}

        tolerances = interpolate.infer_tolerances(entry.postings, options_map)
        invalid = False
        for currency, price_number in dict_price.items():
            # Accept a looser than usual tolerance because rounding occurs
            # differently. Also, it would be difficult for the user to satisfy
            # two sets of constraints manually.
            tolerance = tolerances.get(currency) * EXTRA_TOLERANCE_MULTIPLIER

            proceeds_number = dict_proceeds.pop(currency, ZERO)
            diff = abs(price_number - proceeds_number)
            if diff > tolerance:
                invalid = True
                break

        if invalid or dict_proceeds:
            errors.append(
                SellGainsError(
                    entry.meta,
                    "Invalid price vs. proceeds/gains: {} vs. {}; difference: {}".format(
                        total_price, total_proceeds, (total_price + -total_proceeds)),
                    entry))

    return entries, errors

beancount.plugins.unique_prices

此模块验证每个日期和基础/报价货币组合仅定义一个价格。若声明了多个冲突的价格值,将生成错误。注意:相同数值的多个价格条目不会触发错误。

此功能适用于希望以极严格模式输入价格的场景,但可能不符合实际使用需求。例如,如果你在一天内(1)通过交易成本隐式生成了一个价格,同时(2)又单独声明了一个不同的收盘价,这将触发错误。我不确定此功能长期是否实用,因此将其作为插件提供。

beancount.plugins.unique_prices.UniquePricesError (元组)

UniquePricesError(source, message, entry)

beancount.plugins.unique_prices.UniquePricesError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/plugins/unique_prices.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.plugins.unique_prices.UniquePricesError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 UniquePricesError(source, message, entry) 的新实例

beancount.plugins.unique_prices.UniquePricesError.__replace__(/, self, **kwds) 特殊

返回一个新的 UniquePricesError 对象,用指定的新值替换字段

源代码位于 beancount/plugins/unique_prices.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.plugins.unique_prices.UniquePricesError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/plugins/unique_prices.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.plugins.unique_prices.validate_unique_prices(entries, unused_options_map)

检查特定基础/报价货币对每天是否仅有一个价格。

参数:
  • entries – 一组指令。我们仅关注其中的 Transaction 实例。

  • unused_options_map – 解析器选项字典。

返回:
  • 输入条目的列表,以及生成的 UniquePricesError 实例列表。

源代码位于 beancount/plugins/unique_prices.py
def validate_unique_prices(entries, unused_options_map):
    """Check that there is only a single price per day for a particular base/quote.

    Args:
      entries: A list of directives. We're interested only in the Transaction instances.
      unused_options_map: A parser options dict.
    Returns:
      The list of input entries, and a list of new UniquePricesError instances generated.
    """
    new_entries = []
    errors = []

    prices = collections.defaultdict(list)
    for entry in entries:
        if not isinstance(entry, data.Price):
            continue
        key = (entry.date, entry.currency, entry.amount.currency)
        prices[key].append(entry)

    errors = []
    for price_entries in prices.values():
        if len(price_entries) > 1:
            number_map = {price_entry.amount.number: price_entry
                          for price_entry in price_entries}
            if len(number_map) > 1:
                # Note: This should be a list of entries for better error
                # reporting. (Later.)
                error_entry = next(iter(number_map.values()))
                errors.append(
                    UniquePricesError(error_entry.meta,
                                      "Disagreeing price entries",
                                      price_entries))

    return entries, errors