beancount.plugins
用于过滤交易的示例插件。
这些是各种以创造性方式过滤条目的示例。
重要提示:这些插件并非旨在作为完整功能,而仅是使用 Beancount 解决问题的实验性尝试,属于可选安装的开发中功能,可通过 --plugin 选项启用,或作为邮件列表中回答问题的临时方案。
beancount.plugins.auto
一个包含所有自动和宽松插件的插件集合。
从某种意义上说,这与“严格模式”相反。在进行一些快速而粗糙的测试时非常有用。你只需导入“auto”插件,并将其放入宏中。
另请参阅:“pedantic”插件。
beancount.plugins.auto_accounts
此模块会自动为尚未开启的账户插入 Open 指令(在第一条条目日期处),并自动移除未使用的账户的 Open 指令。这在进行演示或设置初始交易时作为中间步骤非常方便。
beancount.plugins.auto_accounts.auto_insert_open(entries, unused_options_map)
为未开启的账户插入 Open 指令。
Open 指令将在第一条条目的日期处插入,未使用的账户的 Open 指令将被移除。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/auto_accounts.py
def auto_insert_open(entries, unused_options_map):
"""Insert Open directives for accounts not opened.
Open directives are inserted at the date of the first entry. Open directives
for unused accounts are removed.
Args:
entries: A list of directives.
unused_options_map: A parser options dict.
Returns:
A list of entries, possibly with more Open entries than before, and a
list of errors.
"""
opened_accounts = {entry.account
for entry in entries
if isinstance(entry, data.Open)}
new_entries = []
accounts_first, _ = getters.get_accounts_use_map(entries)
for index, (account, date_first_used) in enumerate(sorted(accounts_first.items())):
if account not in opened_accounts:
meta = data.new_metadata('<auto_accounts>', index)
new_entries.append(data.Open(meta, date_first_used, account,
None, None))
if new_entries:
new_entries.extend(entries)
new_entries.sort(key=data.entry_sortkey)
else:
new_entries = entries
return new_entries, []
beancount.plugins.check_average_cost
一个确保未记账交易中成本基础得以保留的插件。
该插件适用于使用“NONE”记账方式的账户,用于手动确保减少方的总成本基础与账户库存的平均值一致。这是实现“AVERAGE”记账方式的第一步。换句话说,此插件提供断言,强制你手动模拟“AVERAGE”记账方式的行为,避免因不匹配的记账而造成成本基础泄露。(注意此处的特殊语境:理想情况下,“NONE”记账方式本不应存在。)
beancount.plugins.check_average_cost.MatchBasisError (元组)
MatchBasisError(source, message, entry)
beancount.plugins.check_average_cost.MatchBasisError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/check_average_cost.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_average_cost.MatchBasisError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 MatchBasisError(source, message, entry) 的新实例
beancount.plugins.check_average_cost.MatchBasisError.__replace__(/, self, **kwds)
特殊
返回一个新的 MatchBasisError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/check_average_cost.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.check_average_cost.MatchBasisError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/check_average_cost.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_average_cost.validate_average_cost(entries, options_map, config_str=None)
检查未记账的分录的减少部分是否接近平均成本基础。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/check_average_cost.py
def validate_average_cost(entries, options_map, config_str=None):
"""Check that reducing legs on unbooked postings are near the average cost basis.
Args:
entries: A list of directives.
unused_options_map: An options map.
config_str: The configuration as a string version of a float.
Returns:
A list of new errors, if any were found.
"""
# Initialize tolerance bounds.
if config_str and config_str.strip():
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, float):
raise RuntimeError("Invalid configuration for check_average_cost: "
"must be a float")
tolerance = config_obj
else:
tolerance = DEFAULT_TOLERANCE
min_tolerance = D(1 - tolerance)
max_tolerance = D(1 + tolerance)
errors = []
ocmap = getters.get_account_open_close(entries)
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
if isinstance(entry, Transaction):
for posting in entry.postings:
dopen = ocmap.get(posting.account, None)
# Only process accounts with a NONE booking value.
if dopen and dopen[0] and dopen[0].booking == Booking.NONE:
balance = balances[(posting.account,
posting.units.currency,
posting.cost.currency if posting.cost else None)]
if posting.units.number < ZERO:
average = balance.average().get_only_position()
if average is not None:
number = average.cost.number
min_valid = number * min_tolerance
max_valid = number * max_tolerance
if not (min_valid <= posting.cost.number <= max_valid):
errors.append(
MatchBasisError(
entry.meta,
("Cost basis on reducing posting is too far from "
"the average cost ({} vs. {})".format(
posting.cost.number, average.cost.number)),
entry))
balance.add_position(posting)
return entries, errors
beancount.plugins.check_closing
一个自动在标记为“关闭”的分录上插入余额检查的插件。
某些分录被用户知晓为“关闭交易”,这意味着交易完成后该工具的持仓应为零。例如,大多数普通期权交易属于这种情况:仅持有一份特定工具,最终会到期或被完全平仓。您希望确认这一点,而在 Beancount 中实现此目的的方法是插入一个余额检查。
此插件允许您通过添加元数据更简便地实现此功能。例如,以下交易:
2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
Assets:US:Brokerage:Main:Options -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
closing: TRUE
Expenses:Financial:Commissions 17.45 USD
Expenses:Financial:Fees 0.42 USD
Assets:US:Brokerage:Main:Cash 7416.13 USD
Income:US:Brokerage:Main:PnL
将扩展为以下两个指令:
2018-02-16 * "SOLD -14 QQQ 100 16 FEB 18 160 CALL @5.31"
Assets:US:Brokerage:Main:Options -1400 QQQ180216C160 {2.70 USD} @ 5.31 USD
Expenses:Financial:Commissions 17.45 USD
Expenses:Financial:Fees 0.42 USD
Assets:US:Brokerage:Main:Cash 7416.13 USD
Income:US:Brokerage:Main:PnL
2018-02-17 balance Assets:US:Brokerage:Main:Options 0 QQQ180216C160
当您知道正在关闭持仓时,请插入关闭行。
beancount.plugins.check_closing.check_closing(entries, options_map)
将“closing”元数据扩展为零余额检查。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/check_closing.py
def check_closing(entries, options_map):
"""Expand 'closing' metadata to a zero balance check.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction):
for i, posting in enumerate(entry.postings):
if posting.meta and posting.meta.get('closing', False):
# Remove the metadata.
meta = posting.meta.copy()
del meta['closing']
posting = posting._replace(meta=meta)
entry.postings[i] = posting
# Insert a balance.
date = entry.date + datetime.timedelta(days=1)
balance = data.Balance(data.new_metadata("<check_closing>", 0),
date, posting.account,
amount.Amount(ZERO, posting.units.currency),
None, None)
new_entries.append(balance)
new_entries.append(entry)
return new_entries, []
beancount.plugins.check_commodity
一个验证所有遇到的商品是否都有 Commodity 指令的插件。
如果您比较严谨,希望确保为所使用的每种商品都声明了属性,此插件非常有用。例如,在使用投资组合导出功能时很有帮助。
您可以提供一个 (账户正则表达式, 货币正则表达式) 的映射作为选项,以有选择地忽略某些商品的检查。请谨慎使用,因为这会绕过该插件提供的检查。然而,在活跃的期权交易账户中,会生成大量产品,商品指令的数量可能过于庞大,逐一声明每个嵌入了行权价和到期日的期权合约(例如 'SPX_121622P3300')并不高效。
请注意,如果某个符号在至少一个账户中被忽略,则它在所有价格指令和元数据值中也将被忽略。
beancount.plugins.check_commodity.CheckCommodityError (元组)
CheckCommodityError(source, message, entry)
beancount.plugins.check_commodity.CheckCommodityError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/check_commodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_commodity.CheckCommodityError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 CheckCommodityError(source, message, entry) 的新实例
beancount.plugins.check_commodity.CheckCommodityError.__replace__(/, self, **kwds)
特殊
返回一个新的 CheckCommodityError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/check_commodity.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.check_commodity.CheckCommodityError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/check_commodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_commodity.ConfigError (元组)
ConfigError(source, message, entry)
beancount.plugins.check_commodity.ConfigError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/check_commodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.check_commodity.ConfigError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 ConfigError(source, message, entry) 的新实例
beancount.plugins.check_commodity.ConfigError.__replace__(/, self, **kwds)
特殊
返回一个新的 ConfigError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/check_commodity.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.check_commodity.ConfigError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/check_commodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.check_commodity.get_commodity_map_ex(entries, metadata=False)
在指令流中查找并提取商品。
源代码位于 beancount/plugins/check_commodity.py
def get_commodity_map_ex(entries, metadata=False):
"""Find and extract commodities in the stream of directives."""
# Find commodity names in metadata.
#
# TODO(dnicolodi) Unfortunately detecting commodities in metadata
# values may result in false positives: common used string are
# matched by the regular expression. Revisit this when commodities
# will be represented with their own type.
ignore = set(['filename', 'lineno', '__automatic__'])
regexp = re.compile(CURRENCY_RE)
def currencies_in_meta(entry):
if entry.meta is not None:
for key, value in entry.meta.items():
if isinstance(value, str) and key not in ignore:
if regexp.fullmatch(value):
yield value
commodities_map = {}
occurrences = set()
for entry in entries:
if isinstance(entry, data.Commodity):
commodities_map[entry.currency] = entry
elif isinstance(entry, data.Open):
if entry.currencies:
for currency in entry.currencies:
occurrences.add((entry.account, currency))
elif isinstance(entry, data.Transaction):
for posting in entry.postings:
# Main currency.
units = posting.units
occurrences.add((posting.account, units.currency))
# Currency in cost.
cost = posting.cost
if cost:
occurrences.add((posting.account, cost.currency))
# Currency in price.
price = posting.price
if price:
occurrences.add((posting.account, price.currency))
# Currency in posting metadata.
if metadata:
for currency in currencies_in_meta(posting):
occurrences.add((posting.account, currency))
elif isinstance(entry, data.Balance):
occurrences.add((entry.account, entry.amount.currency))
elif isinstance(entry, data.Price):
occurrences.add((PRICE_CONTEXT, entry.currency))
occurrences.add((PRICE_CONTEXT, entry.amount.currency))
# Entry metadata.
if metadata:
for currency in currencies_in_meta(entry):
occurrences.add((METADATA_CONTEXT, currency))
return occurrences, commodities_map
beancount.plugins.check_commodity.validate_commodity_directives(entries, options_map, config_str=None)
查找所有使用过的商品,并确保它们都有对应的 Commodity 指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/check_commodity.py
def validate_commodity_directives(entries, options_map, config_str=None):
"""Find all commodities used and ensure they have a corresponding Commodity directive.
Args:
entries: A list of directives.
options_map: An options map.
config_str: The configuration as a string version of a float.
Returns:
A list of new errors, if any were found.
"""
errors = []
# pylint: disable=eval-used
if config_str:
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, dict):
errors.append(ConfigError(
data.new_metadata('<commodity_attr>', 0),
"Invalid configuration for check_commodity plugin; skipping.", None))
return entries, errors
else:
config_obj = {}
# Compile the regular expressions, producing an error if invalid.
ignore_map = {}
for key, value in config_obj.items():
kv = []
for pattern in key, value:
try:
kv.append(re.compile(pattern).match)
except re.error:
meta = data.new_metadata('<check_commodity>', 0)
errors.append(
CheckCommodityError(
meta, "Invalid regexp: '{}' for {}".format(value, key), None))
if len(kv) == 2:
ignore_map[kv[0]] = kv[1]
# Get all the occurrences of commodities and a mapping of the directives.
#
# TODO(blais): Establish a distinction at the parser level for commodities
# and strings, so that we can turn detection of them in metadata.
occurrences, commodity_map = get_commodity_map_ex(entries, metadata=False)
# Process all currencies with context.
issued = set()
ignored = set()
anonymous = set()
for context, currency in sorted(occurrences):
if context in ANONYMOUS:
anonymous.add((context, currency))
continue
commodity_entry = commodity_map.get(currency, None)
# Skip if the commodity was declared, or if an error for that commodity
# has already been issued.
if commodity_entry is not None or currency in issued:
continue
# If any of the ignore patterns matches, ignore and record ignored.
if any((context_re(context) and currency_re(currency))
for context_re, currency_re in ignore_map.items()):
ignored.add(currency)
continue
# Issue error.
meta = data.new_metadata('<check_commodity>', 0)
errors.append(
CheckCommodityError(
meta,
"Missing Commodity directive for '{}' in '{}'".format(
currency, context),
None))
# Process it only once.
issued.add(currency)
# Process all currencies out of context, automatically ignoring those which
# have already been issued with account context..
for context, currency in sorted(anonymous):
commodity_entry = commodity_map.get(currency, None)
# Skip if (a) the commodity was declared, any of the ignore patterns
# matches, or an error for that commodity has already been issued.
if (commodity_entry is not None or
currency in issued or
currency in ignored):
continue
# Issue error.
meta = data.new_metadata('<check_commodity>', 0)
errors.append(
CheckCommodityError(
meta,
"Missing Commodity directive for '{}' in '{}'".format(
currency, context),
None))
return entries, errors
beancount.plugins.check_drained
在资产负债表账户关闭前插入余额为零的检查。
对于带有 Close 指令的资产负债表账户(资产、负债和权益),在其关闭日期之后插入 Balance 指令,涵盖该账户中出现过且被声明为合法的所有商品。这相当于执行以下转换:
2018-02-01 open Assets:Project:Cash USD,CAD
...
2020-02-01 close Assets:Project:Cash
!!! to 2018-02-01 open Assets:Project:Cash USD,CAD ...
2020-02-01 close Assets:Project:Cash
2020-02-02 balance Assets:Project:Cash 0 USD
2020-02-02 balance Assets:Project:Cash 0 CAD
beancount.plugins.check_drained.check_drained(entries, options_map)
检查已关闭的账户是否为空。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/check_drained.py
def check_drained(entries, options_map):
"""Check that closed accounts are empty.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
acctypes = options.get_account_types(options_map)
is_covered = functools.partial(
account_types.is_balance_sheet_account, account_types=acctypes
)
new_entries = []
currencies = collections.defaultdict(set)
balances = collections.defaultdict(set)
for entry in entries:
if isinstance(entry, data.Transaction):
# Accumulate all the currencies seen in each account over time.
for posting in entry.postings:
if is_covered(posting.account):
currencies[posting.account].add(posting.units.currency)
elif isinstance(entry, data.Open):
# Accumulate all the currencies declared in the account opening.
if is_covered(entry.account) and entry.currencies:
for currency in entry.currencies:
currencies[entry.account].add(currency)
elif isinstance(entry, data.Balance):
# Ignore balances where directives are present.
if is_covered(entry.account):
balances[entry.account].add((entry.date, entry.amount.currency))
if isinstance(entry, data.Close):
if is_covered(entry.account):
for currency in currencies[entry.account]:
# Skip balance insertion due to the presence of an explicit one.
if (entry.date, currency) in balances[entry.account]:
continue
# Insert a balance directive.
balance_entry = data.Balance(
# Note: We use the close directive's meta so that
# balance errors direct the user to the corresponding
# close directive.
entry.meta,
entry.date + ONE_DAY,
entry.account,
amount.Amount(ZERO, currency),
None,
None,
)
new_entries.append(balance_entry)
new_entries.append(entry)
return new_entries, []
beancount.plugins.close_tree
此插件在账户关闭时为其所有子账户插入关闭指令。未打开的父账户也可以被关闭。任何显式指定的关闭指令将保持不变。
例如,给定以下内容:
2017-11-10 open Assets:Brokerage:AAPL
2017-11-10 open Assets:Brokerage:ORNG
2018-11-10 close Assets:Brokerage ; this does not necessarily need to be opened
该插件将其转换为:
2017-11-10 open Assets:Brokerage:AAPL
2017-11-10 open Assets:Brokerage:ORNG
2018-11-10 close Assets:Brokerage:AAPL
2018-11-10 close Assets:Brokerage:ORNG
在任何生成您希望自动关闭的账户树的 open 指令的插件之后调用此插件。例如 Beancount 自带的 auto_accounts 插件:
plugin "beancount.plugins.auto_accounts"
plugin "beancount.plugins.close_tree"
beancount.plugins.close_tree.close_tree(entries, unused_options_map)
为已关闭账户的所有子账户插入关闭条目。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/close_tree.py
def close_tree(entries, unused_options_map):
"""Insert close entries for all subaccounts of a closed account.
Args:
entries: A list of directives. We're interested only in the Open/Close instances.
unused_options_map: A parser options dict.
Returns:
A tuple of entries and errors.
"""
new_entries = []
errors = []
opens = set(entry.account for entry in entries if isinstance(entry, Open))
closes = set(entry.account for entry in entries if isinstance(entry, Close))
for entry in entries:
if isinstance(entry, Close):
subaccounts = [
account
for account in opens
if account.startswith(entry.account + ":") and account not in closes
]
for subacc in subaccounts:
meta = data.new_metadata("<beancount.plugins.close_tree>", 0)
close_entry = data.Close(meta, entry.date, subacc)
new_entries.append(close_entry)
# So we don't attempt to re-close a grandchild that a child closed
closes.add(subacc)
if entry.account in opens:
new_entries.append(entry)
else:
new_entries.append(entry)
return new_entries, errors
beancount.plugins.coherent_cost
此插件验证以成本持有货币时不会被以价格转换,反之亦然。通常情况下应如此,使用它可防止用户在未指定成本基础的情况下出售持仓。
CoherentCostError(source, message, entry)
CoherentCostError(source, message, entry)
beancount.plugins.coherent_cost.CoherentCostError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/coherent_cost.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.coherent_cost.CoherentCostError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 CoherentCostError(source, message, entry) 的新实例
beancount.plugins.coherent_cost.CoherentCostError.__replace__(/, self, **kwds)
特殊
返回一个新的 CoherentCostError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/coherent_cost.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.coherent_cost.CoherentCostError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/coherent_cost.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.coherent_cost.validate_coherent_cost(entries, unused_options_map)
检查所有货币要么全部使用成本,要么完全不使用成本,但不能两者兼有。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/coherent_cost.py
def validate_coherent_cost(entries, unused_options_map):
"""Check that all currencies are either used at cost or not at all, but never both.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
with_cost = {}
without_cost = {}
for entry in data.filter_txns(entries):
for posting in entry.postings:
target_set = without_cost if posting.cost is None else with_cost
currency = posting.units.currency
target_set.setdefault(currency, entry)
for currency in set(with_cost) & set(without_cost):
errors.append(
CoherentCostError(
without_cost[currency].meta,
"Currency '{}' is used both with and without cost".format(currency),
with_cost[currency]))
# Note: We really ought to include both of the first transactions here.
return entries, errors
beancount.plugins.commodity_attr
一个插件,用于断言所有 Commodity 指令都具有特定属性,且该属性的值属于一组预定义的枚举值。
配置必须是属性名称到有效值列表的映射,例如:
plugin "beancount.plugins.commodity_attr" "{
'sector': ['Technology', 'Financials', 'Energy'],
'name': None,
}"
如果 Commodity 指令缺少该属性,或属性值不在有效集合中,插件将报错。如果您只想确保属性已设置,可将有效值列表设为 None,如上例中的 'name' 属性所示。
beancount.plugins.commodity_attr.CommodityError (tuple)
CommodityError(source, message, entry)
beancount.plugins.commodity_attr.CommodityError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/commodity_attr.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.commodity_attr.CommodityError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 CommodityError(source, message, entry) 的新实例
beancount.plugins.commodity_attr.CommodityError.__replace__(/, self, **kwds)
特殊
返回一个新的 CommodityError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/commodity_attr.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.commodity_attr.CommodityError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/commodity_attr.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.commodity_attr.ConfigError (tuple)
ConfigError(source, message, entry)
beancount.plugins.commodity_attr.ConfigError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/commodity_attr.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.commodity_attr.ConfigError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 ConfigError(source, message, entry) 的新实例
beancount.plugins.commodity_attr.ConfigError.__replace__(/, self, **kwds)
特殊
返回一个新的 ConfigError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/commodity_attr.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.commodity_attr.ConfigError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/commodity_attr.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.commodity_attr.validate_commodity_attr(entries, unused_options_map, config_str)
检查所有 Commodity 指令是否具有有效的属性。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/commodity_attr.py
def validate_commodity_attr(entries, unused_options_map, config_str):
"""Check that all Commodity directives have a valid attribute.
Args:
entries: A list of directives.
unused_options_map: An options map.
config_str: A configuration string.
Returns:
A list of new errors, if any were found.
"""
errors = []
# pylint: disable=eval-used
config_obj = eval(config_str, {}, {})
if not isinstance(config_obj, dict):
errors.append(ConfigError(
data.new_metadata('<commodity_attr>', 0),
"Invalid configuration for commodity_attr plugin; skipping.", None))
return entries, errors
validmap = {attr: frozenset(values) if values is not None else None
for attr, values in config_obj.items()}
for entry in entries:
if not isinstance(entry, data.Commodity):
continue
for attr, values in validmap.items():
value = entry.meta.get(attr, None)
if value is None:
errors.append(CommodityError(
entry.meta,
"Missing attribute '{}' for Commodity directive {}".format(
attr, entry.currency), None))
continue
if values and value not in values:
errors.append(CommodityError(
entry.meta,
"Invalid value '{}' for attribute {}, Commodity".format(value, attr) +
" directive {}; valid options: {}".format(
entry.currency, ', '.join(values)), None))
return entries, errors
beancount.plugins.currency_accounts
货币账户的实现。
这是对以下页面所述方法的自动实现:https://www.mathstat.dal.ca/~selinger/accounting/tutorial.html
您只需这样启用它:
plugin "beancount.plugins.currency_accounts" "Equity:CurrencyAccounts"
系统将在指定的基础账户下自动创建账户,并在其后追加货币名称,例如:
Equity:CurrencyAccounts:CAD
Equity:CurrencyAccounts:USD
等等。您可以使用以下查询查看账户余额:
bean-query $L "select account, sum(position), convert(sum(position), 'USD')
where date >= 2018-01-01 and account ~ 'CurrencyAccounts' "
转换后金额的总和应为一个不太大的数值:
bean-query $L "select convert(sum(position), 'USD')
where date >= 2018-01-01 and account ~ 'CurrencyAccounts'"
警告:这是一个原型。请参见下方代码中的 FIXME 注释,它们指出了潜在的问题。
beancount.plugins.currency_accounts.get_neutralizing_postings(curmap, base_account, new_accounts)
处理一条记录。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/currency_accounts.py
def get_neutralizing_postings(curmap, base_account, new_accounts):
"""Process an entry.
Args:
curmap: A dict of currency to a list of Postings of this transaction.
base_account: A string, the root account name to insert.
new_accounts: A set, a mutable accumulator of new account names.
Returns:
A modified entry, with new postings inserted to rebalance currency trading
accounts.
"""
new_postings = []
for currency, postings in curmap.items():
# Compute the per-currency balance.
inv = inventory.Inventory()
for posting in postings:
inv.add_amount(convert.get_cost(posting))
if inv.is_empty():
new_postings.extend(postings)
continue
# Re-insert original postings and remove price conversions.
#
# Note: This may cause problems if the implicit_prices plugin is
# configured to run after this one, or if you need the price annotations
# for some scripting or serious work.
#
# FIXME: We need to handle these important cases (they're not frivolous,
# this is a prototype), probably by inserting some exceptions with
# collaborating code in the booking (e.g. insert some metadata that
# disables price conversions on those postings).
#
# FIXME(2): Ouch! Some of the residual seeps through here, where there
# are more than a single currency block. This needs fixing too. You can
# easily mitigate some of this to some extent, by excluding transactions
# which don't have any price conversion in them.
for pos in postings:
if pos.price is not None:
pos = pos._replace(price=None)
new_postings.append(pos)
# Insert the currency trading accounts postings.
amount = inv.get_only_position().units
acc = account.join(base_account, currency)
new_accounts.add(acc)
new_postings.append(
Posting(acc, -amount, None, None, None, None))
return new_postings
beancount.plugins.currency_accounts.group_postings_by_weight_currency(entry)
返回此记录可能需要调整的位置。
源代码位于 beancount/plugins/currency_accounts.py
def group_postings_by_weight_currency(entry: Transaction):
"""Return where this entry might require adjustment."""
curmap = collections.defaultdict(list)
has_price = False
for posting in entry.postings:
currency = posting.units.currency
if posting.cost:
currency = posting.cost.currency
if posting.price:
assert posting.price.currency == currency
elif posting.price:
has_price = True
currency = posting.price.currency
if posting.price:
has_price = True
curmap[currency].append(posting)
return curmap, has_price
beancount.plugins.currency_accounts.insert_currency_trading_postings(entries, options_map, config)
插入货币交易 Posting。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/currency_accounts.py
def insert_currency_trading_postings(entries, options_map, config):
"""Insert currency trading postings.
Args:
entries: A list of directives.
unused_options_map: An options map.
config: The base account name for currency trading accounts.
Returns:
A list of new errors, if any were found.
"""
base_account = config.strip()
if not account.is_valid(base_account):
base_account = DEFAULT_BASE_ACCOUNT
errors = []
new_entries = []
new_accounts = set()
for entry in entries:
if isinstance(entry, Transaction):
curmap, has_price = group_postings_by_weight_currency(entry)
if has_price and len(curmap) > 1:
new_postings = get_neutralizing_postings(
curmap, base_account, new_accounts)
entry = entry._replace(postings=new_postings)
if META_PROCESSED:
entry.meta[META_PROCESSED] = True
new_entries.append(entry)
earliest_date = entries[0].date
open_entries = [
data.Open(data.new_metadata('<currency_accounts>', index),
earliest_date, acc, None, None)
for index, acc in enumerate(sorted(new_accounts))]
return open_entries + new_entries, errors
beancount.plugins.implicit_prices
此插件会为所有带有价格或指令的 Posting,或作为增补 Posting 且带有成本指令的 Posting 自动生成 Price 指令。
beancount.plugins.implicit_prices.ImplicitPriceError (元组)
ImplicitPriceError(source, message, entry)
beancount.plugins.implicit_prices.ImplicitPriceError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/implicit_prices.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.implicit_prices.ImplicitPriceError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 ImplicitPriceError(source, message, entry) 的新实例
beancount.plugins.implicit_prices.ImplicitPriceError.__replace__(/, self, **kwds)
特殊
返回一个新的 ImplicitPriceError 对象,用指定的新值替换相应字段
源代码位于 beancount/plugins/implicit_prices.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.implicit_prices.ImplicitPriceError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/implicit_prices.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.implicit_prices.add_implicit_prices(entries, unused_options_map)
从交易中插入隐式定义的价格。
显式价格条目将直接保留在输出列表中。来自带有成本或交易条目中价格的过账项,将被合成新的价格条目并添加到输出列表中。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/implicit_prices.py
def add_implicit_prices(entries, unused_options_map):
"""Insert implicitly defined prices from Transactions.
Explicit price entries are simply maintained in the output list. Prices from
postings with costs or with prices from Transaction entries are synthesized
as new Price entries in the list of entries output.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
Returns:
A list of entries, possibly with more Price entries than before, and a
list of errors.
"""
new_entries = []
errors = []
# A dict of (date, currency, cost-currency) to price entry.
new_price_entry_map = {}
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
# Always replicate the existing entries.
new_entries.append(entry)
if isinstance(entry, Transaction):
# Inspect all the postings in the transaction.
for posting in entry.postings:
units = posting.units
cost = posting.cost
# Check if the position is matching against an existing
# position.
_, booking = balances[posting.account].add_position(posting)
# Add prices when they're explicitly specified on a posting. An
# explicitly specified price may occur in a conversion, e.g.
# Assets:Account 100 USD @ 1.10 CAD
# or, if a cost is also specified, as the current price of the
# underlying instrument, e.g.
# Assets:Account 100 HOOL {564.20} @ {581.97} USD
if posting.price is not None:
meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
meta[METADATA_FIELD] = "from_price"
price_entry = data.Price(meta, entry.date,
units.currency,
posting.price)
# Add costs, when we're not matching against an existing
# position. This happens when we're just specifying the cost,
# e.g.
# Assets:Account 100 HOOL {564.20}
elif (cost is not None and
booking != inventory.MatchResult.REDUCED):
# TODO(blais): What happens here if the account has no
# booking strategy? Do we end up inserting a price for the
# reducing leg? Check.
meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
meta[METADATA_FIELD] = "from_cost"
price_entry = data.Price(meta, entry.date,
units.currency,
amount.Amount(cost.number, cost.currency))
else:
price_entry = None
if price_entry is not None:
key = (price_entry.date,
price_entry.currency,
price_entry.amount.number, # Ideally should be removed.
price_entry.amount.currency)
try:
new_price_entry_map[key]
## Do not fail for now. We still have many valid use
## cases of duplicate prices on the same date, for
## example, stock splits, or trades on two dates with
## two separate reported prices. We need to figure out a
## more elegant solution for this in the long term.
## Keeping both for now. We should ideally not use the
## number in the de-dup key above.
#
# dup_entry = new_price_entry_map[key]
# if price_entry.amount.number == dup_entry.amount.number:
# # Skip duplicates.
# continue
# else:
# errors.append(
# ImplicitPriceError(
# entry.meta,
# "Duplicate prices for {} on {}".format(entry,
# dup_entry),
# entry))
except KeyError:
new_price_entry_map[key] = price_entry
new_entries.append(price_entry)
return new_entries, errors
beancount.plugins.leafonly
一个在金额被记入非叶账户(即具有子账户的账户)时发出错误的插件。
这是一个可选的额外约束。安装此插件后,它将对所有向非叶账户过账的账户发出错误。某些用户可能希望禁止此类行为,并强制仅允许叶账户有过账。
beancount.plugins.leafonly.LeafOnlyError (tuple)
LeafOnlyError(source, message, entry)
beancount.plugins.leafonly.LeafOnlyError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/leafonly.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.leafonly.LeafOnlyError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 LeafOnlyError(source, message, entry) 的新实例
beancount.plugins.leafonly.LeafOnlyError.__replace__(/, self, **kwds)
特殊
返回一个新的 LeafOnlyError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/leafonly.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.leafonly.LeafOnlyError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/leafonly.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.leafonly.validate_leaf_only(entries, unused_options_map)
检查是否存在向非叶账户过账的情况。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/leafonly.py
def validate_leaf_only(entries, unused_options_map):
"""Check for non-leaf accounts that have postings on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
real_root = realization.realize(entries, compute_balance=False)
default_meta = data.new_metadata('<leafonly>', 0)
open_close_map = None # Lazily computed.
errors = []
for real_account in realization.iter_children(real_root):
if len(real_account) > 0 and real_account.txn_postings:
if open_close_map is None:
open_close_map = getters.get_account_open_close(entries)
try:
open_entry = open_close_map[real_account.account][0]
except KeyError:
open_entry = None
errors.append(LeafOnlyError(
open_entry.meta if open_entry else default_meta,
"Non-leaf account '{}' has postings on it".format(real_account.account),
open_entry))
return entries, errors
beancount.plugins.noduplicates
此插件验证不存在重复的交易。
beancount.plugins.noduplicates.validate_no_duplicates(entries, unused_options_map)
通过计算哈希值检查条目是否唯一。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/noduplicates.py
def validate_no_duplicates(entries, unused_options_map):
"""Check that the entries are unique, by computing hashes.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
unused_hashes, errors = compare.hash_entries(entries, exclude_meta=True)
return entries, errors
beancount.plugins.nounused
此插件验证不存在未使用的账户。
beancount.plugins.nounused.UnusedAccountError (tuple)
UnusedAccountError(source, message, entry)
beancount.plugins.nounused.UnusedAccountError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/nounused.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.nounused.UnusedAccountError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 UnusedAccountError(source, message, entry) 的新实例
beancount.plugins.nounused.UnusedAccountError.__replace__(/, self, **kwds)
特殊
返回一个新的 UnusedAccountError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/nounused.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.nounused.UnusedAccountError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/nounused.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.nounused.validate_unused_accounts(entries, unused_options_map)
检查所有声明为开放的账户是否实际被使用。
我们检查所有已开放的账户是否至少被其他指令引用过。这些账户可能是未使用的,因此会发出警告(我们倾向于严格检查)。请注意,一个被打开后又关闭的账户被视为已使用——这在现实中是有效的使用场景。如果您有账户被打开但从未使用的用例,可以通过为该账户初始化余额断言、填充指令,甚至使用注释来静音该警告。
(这很可能是作为“严格模式”插件可选包含的合适候选。)
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/nounused.py
def validate_unused_accounts(entries, unused_options_map):
"""Check that all accounts declared open are actually used.
We check that all of the accounts that are open are at least referred to by
another directive. These are probably unused, so issue a warning (we like to
be pedantic). Note that an account that is open and then closed is
considered used--this is a valid use case that may occur in reality. If you
have a use case for an account to be open but never used, you can quiet that
warning by initializing the account with a balance asserts or a pad
directive, or even use a note will be sufficient.
(This is probably a good candidate for optional inclusion as a "pedantic"
plugin.)
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
# Find all the accounts referenced by entries which are not Open, and the
# open directives for error reporting below.
open_map = {}
referenced_accounts = set()
for entry in entries:
if isinstance(entry, data.Open):
open_map[entry.account] = entry
continue
referenced_accounts.update(getters.get_entry_accounts(entry))
# Create a list of suitable errors, with the location of the Open directives
# corresponding to the unused accounts.
errors = [UnusedAccountError(open_entry.meta,
"Unused account '{}'".format(account),
open_entry)
for account, open_entry in open_map.items()
if account not in referenced_accounts]
return entries, errors
beancount.plugins.onecommodity
当一个账户中使用了多种商品时,该插件会发出错误。
对于投资或交易账户,通过将股票名称作为账户名称的末尾部分,可以更方便地围绕单一股票进行操作筛选。
注意:
-
该插件会自动跳过在 Open 指令中显式声明了商品的账户。
-
您也可以在账户的 Open 指令中设置元数据 "onecommodity: FALSE" 来跳过对该账户的检查。
-
如果提供,配置应为一个正则表达式,用于限定需检查的账户集合。
beancount.plugins.onecommodity.OneCommodityError (元组)
OneCommodityError(source, message, entry)
beancount.plugins.onecommodity.OneCommodityError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/onecommodity.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.onecommodity.OneCommodityError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 OneCommodityError(source, message, entry) 的新实例
beancount.plugins.onecommodity.OneCommodityError.__replace__(/, self, **kwds)
特殊
返回一个新的 OneCommodityError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/onecommodity.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.onecommodity.OneCommodityError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/onecommodity.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.onecommodity.validate_one_commodity(entries, unused_options_map, config=None)
检查每个账户是否仅包含单一商品的单位。
这是一个可选的额外约束,尽管 Beancount 支持包含多种商品的库存和聚合。我认为这与 GnuCash 的模型一致,即每个账户仅关联一种商品。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/onecommodity.py
def validate_one_commodity(entries, unused_options_map, config=None):
"""Check that each account has units in only a single commodity.
This is an extra constraint that you may want to apply optionally, despite
Beancount's ability to support inventories and aggregations with more than
one commodity. I believe this also matches GnuCash's model, where each
account has a single commodity attached to it.
Args:
entries: A list of directives.
unused_options_map: An options map.
config: The plugin configuration string, a regular expression to match
against the subset of accounts to check.
Returns:
A list of new errors, if any were found.
"""
accounts_re = re.compile(config) if config else None
# Mappings of account name to lists of currencies for each units and cost.
units_map = collections.defaultdict(set)
cost_map = collections.defaultdict(set)
# Mappings to use just for getting a relevant source.
units_source_map = {}
cost_source_map = {}
# Gather the set of accounts to skip from the Open directives.
skip_accounts = set()
for entry in entries:
if not isinstance(entry, data.Open):
continue
if (not entry.meta.get("onecommodity", True) or
(accounts_re and not accounts_re.match(entry.account)) or
(entry.currencies and len(entry.currencies) > 1)):
skip_accounts.add(entry.account)
# Accumulate all the commodities used.
for entry in entries:
if isinstance(entry, data.Transaction):
for posting in entry.postings:
if posting.account in skip_accounts:
continue
units = posting.units
units_map[posting.account].add(units.currency)
if len(units_map[posting.account]) > 1:
units_source_map[posting.account] = entry
cost = posting.cost
if cost:
cost_map[posting.account].add(cost.currency)
if len(cost_map[posting.account]) > 1:
cost_source_map[posting.account] = entry
elif isinstance(entry, data.Balance):
if entry.account in skip_accounts:
continue
units_map[entry.account].add(entry.amount.currency)
if len(units_map[entry.account]) > 1:
units_source_map[entry.account] = entry
elif isinstance(entry, data.Open):
if entry.currencies and len(entry.currencies) > 1:
skip_accounts.add(entry.account)
# Check units.
errors = []
for account, currencies in units_map.items():
if account in skip_accounts:
continue
if len(currencies) > 1:
errors.append(OneCommodityError(
units_source_map[account].meta,
"More than one currency in account '{}': {}".format(
account, ','.join(currencies)),
None))
# Check costs.
for account, currencies in cost_map.items():
if account in skip_accounts:
continue
if len(currencies) > 1:
errors.append(OneCommodityError(
cost_source_map[account].meta,
"More than one cost currency in account '{}': {}".format(
account, ','.join(currencies)),
None))
return entries, errors
beancount.plugins.pedantic
一个插件集合,用于触发所有严格模式插件。
beancount.plugins.sellgains
一个交叉验证已申报收益与持仓出售价格的插件。
当你卖出股票时,收益可以通过相应的现金金额自动推断。例如,在以下交易中,第二和第三条分录应与所售股票的价值匹配:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD}
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL -10.125 USD
成本基础将与以下数值进行核对:2141.36 + 0.08 + -10.125。即,余额检查计算:
-81 x 26.3125 = -2131.3125 + 2141.36 + 0.08 + -10.125
并检查残差是否低于一个小的容差值。
但通常,收入科目并不会在对账单中直接提供。Beancount 可以利用余额自动推断该值,非常方便,如下所示:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD}
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL
此外,大多数情况下,你的交易确认单会提供卖出价格,因此你可以直接输入:
1999-07-31 * "Sell"
Assets:US:BRS:Company:ESPP -81 ADSK {26.3125 USD} @ 26.4375 USD
Assets:US:BRS:Company:Cash 2141.36 USD
Expenses:Financial:Fees 0.08 USD
Income:US:Company:ESPP:PnL
因此,理论上,如果提供了价格(26.4375 USD),我们可以验证按该价格出售所得是否与非收入分录匹配。即验证:
-81 x 26.4375 = -2141.4375 + 2141.36 + 0.08 +
的差值是否低于一个小的容差值。该插件正是执行此验证。
总体而言,它执行以下操作:对于具有成本和价格的分录,验证所有非收入账户的持仓总和是否在容差范围内。
这提供了额外的验证层级,使你无需手动输入收入金额,同时利用价格信息作为额外的错误检查机制,以防输入错误。
beancount.plugins.sellgains.SellGainsError (元组)
SellGainsError(source, message, entry)
beancount.plugins.sellgains.SellGainsError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/sellgains.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.sellgains.SellGainsError.__new__(_cls, source, message, entry)
特殊
静态方法
创建一个新的 SellGainsError 实例 (source, message, entry)
beancount.plugins.sellgains.SellGainsError.__replace__(/, self, **kwds)
特殊
返回一个新的 SellGainsError 对象,用指定的新值替换某些字段
源代码位于 beancount/plugins/sellgains.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.sellgains.SellGainsError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/sellgains.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.sellgains.validate_sell_gains(entries, options_map)
检查已售出持仓且带有价格的资产账户总额。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/sellgains.py
def validate_sell_gains(entries, options_map):
"""Check the sum of asset account totals for lots sold with a price on them.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of new errors, if any were found.
"""
errors = []
acc_types = options.get_account_types(options_map)
proceed_types = set([acc_types.assets,
acc_types.liabilities,
acc_types.equity,
acc_types.expenses])
for entry in entries:
if not isinstance(entry, data.Transaction):
continue
# Find transactions whose lots at cost all have a price.
postings_at_cost = [posting
for posting in entry.postings
if posting.cost is not None]
if not postings_at_cost or not all(posting.price is not None
for posting in postings_at_cost):
continue
# Accumulate the total expected proceeds and the sum of the asset and
# expenses legs.
total_price = inventory.Inventory()
total_proceeds = inventory.Inventory()
for posting in entry.postings:
# If the posting is held at cost, add the priced value to the balance.
if posting.cost is not None:
assert posting.price is not None
price = posting.price
total_price.add_amount(amount.mul(price, -posting.units.number))
else:
# Otherwise, use the weight and ignore postings to Income accounts.
atype = account_types.get_account_type(posting.account)
if atype in proceed_types:
total_proceeds.add_amount(convert.get_weight(posting))
# Compare inventories, currency by currency.
dict_price = {pos.units.currency: pos.units.number
for pos in total_price}
dict_proceeds = {pos.units.currency: pos.units.number
for pos in total_proceeds}
tolerances = interpolate.infer_tolerances(entry.postings, options_map)
invalid = False
for currency, price_number in dict_price.items():
# Accept a looser than usual tolerance because rounding occurs
# differently. Also, it would be difficult for the user to satisfy
# two sets of constraints manually.
tolerance = tolerances.get(currency) * EXTRA_TOLERANCE_MULTIPLIER
proceeds_number = dict_proceeds.pop(currency, ZERO)
diff = abs(price_number - proceeds_number)
if diff > tolerance:
invalid = True
break
if invalid or dict_proceeds:
errors.append(
SellGainsError(
entry.meta,
"Invalid price vs. proceeds/gains: {} vs. {}; difference: {}".format(
total_price, total_proceeds, (total_price + -total_proceeds)),
entry))
return entries, errors
beancount.plugins.unique_prices
此模块验证每个日期和基础/报价货币组合仅定义一个价格。若声明了多个冲突的价格值,将生成错误。注意:相同数值的多个价格条目不会触发错误。
此功能适用于希望以极严格模式输入价格的场景,但可能不符合实际使用需求。例如,如果你在一天内(1)通过交易成本隐式生成了一个价格,同时(2)又单独声明了一个不同的收盘价,这将触发错误。我不确定此功能长期是否实用,因此将其作为插件提供。
beancount.plugins.unique_prices.UniquePricesError (元组)
UniquePricesError(source, message, entry)
beancount.plugins.unique_prices.UniquePricesError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/plugins/unique_prices.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.plugins.unique_prices.UniquePricesError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 UniquePricesError(source, message, entry) 的新实例
beancount.plugins.unique_prices.UniquePricesError.__replace__(/, self, **kwds)
特殊
返回一个新的 UniquePricesError 对象,用指定的新值替换字段
源代码位于 beancount/plugins/unique_prices.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.plugins.unique_prices.UniquePricesError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/plugins/unique_prices.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.plugins.unique_prices.validate_unique_prices(entries, unused_options_map)
检查特定基础/报价货币对每天是否仅有一个价格。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/plugins/unique_prices.py
def validate_unique_prices(entries, unused_options_map):
"""Check that there is only a single price per day for a particular base/quote.
Args:
entries: A list of directives. We're interested only in the Transaction instances.
unused_options_map: A parser options dict.
Returns:
The list of input entries, and a list of new UniquePricesError instances generated.
"""
new_entries = []
errors = []
prices = collections.defaultdict(list)
for entry in entries:
if not isinstance(entry, data.Price):
continue
key = (entry.date, entry.currency, entry.amount.currency)
prices[key].append(entry)
errors = []
for price_entries in prices.values():
if len(price_entries) > 1:
number_map = {price_entry.amount.number: price_entry
for price_entry in price_entries}
if len(number_map) > 1:
# Note: This should be a list of entries for better error
# reporting. (Later.)
error_entry = next(iter(number_map.values()))
errors.append(
UniquePricesError(error_entry.meta,
"Disagreeing price entries",
price_entries))
return entries, errors