beancount.parser
Beancount 输入文件的解析器模块。
beancount.parser.booking
用于‘预订’库存的算法,即在减少库存内容时查找匹配批次的过程。
beancount.parser.booking.BookingError (元组)
BookingError(source, message, entry)
beancount.parser.booking.BookingError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/booking.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.booking.BookingError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 BookingError(source, message, entry) 的新实例
beancount.parser.booking.BookingError.__replace__(/, self, **kwds)
特殊
返回一个新的 BookingError 对象,用指定的新值替换某些字段
源代码位于 beancount/parser/booking.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.booking.BookingError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/booking.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.booking.book(incomplete_entries, options_map, initial_balances=None)
预订库存批次,并补全所有不完整的仓位。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking.py
def book(incomplete_entries, options_map, initial_balances=None):
"""Book inventory lots and complete all positions with incomplete numbers.
Args:
incomplete_entries: A list of directives, with some postings possibly left
with incomplete amounts as produced by the parser.
options_map: An options dict as produced by the parser.
initial_balances: A dict of (account, inventory) pairs to start booking from.
This is useful when attempting to book on top of an existing state.
Returns:
A pair of
entries: A list of completed entries with all their postings completed.
errors: New errors produced during interpolation.
"""
# Get the list of booking methods for each account.
booking_methods = collections.defaultdict(lambda: options_map["booking_method"])
for entry in incomplete_entries:
if isinstance(entry, data.Open) and entry.booking:
booking_methods[entry.account] = entry.booking
# Do the booking here!
entries, booking_errors = booking_full.book(incomplete_entries, options_map,
booking_methods, initial_balances)
# Check for MISSING elements remaining.
missing_errors = validate_missing_eliminated(entries, options_map)
return entries, (booking_errors + missing_errors)
beancount.parser.booking.convert_lot_specs_to_lots(entries)
对所有条目,将过账仓位的 CostSpec 转换为 Cost 实例。在简单方法中,CostSpec 提供的数据必须能明确计算出成本金额。
这本质上复制了旧解析器的工作方式,但允许仓位使用模糊的批次规格而非已解析的规格。我们过去通常在本地计算成本,而此方法会移除 CostSpec,直接生成不含模糊匹配的 Cost。此功能仅用于过渡到新的匹配逻辑。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/booking.py
def convert_lot_specs_to_lots(entries):
"""For all the entries, convert the posting's position's CostSpec to Cost
instances. In the simple method, the data provided in the CostSpec must
unambiguously provide a way to compute the cost amount.
This essentially replicates the way the old parser used to work, but
allowing positions to have the fuzzy lot specifications instead of the
resolved ones. We used to simply compute the costs locally, and this gets
rid of the CostSpec to produce the Cost without fuzzy matching. This is only
there for the sake of transition to the new matching logic.
Args:
entries: A list of incomplete directives as per the parser.
Returns:
A list of entries whose postings's position costs have been converted to
Cost instances but that may still be incomplete.
Raises:
ValueError: If there's a unacceptable number.
"""
new_entries = []
errors = []
for entry in entries:
if not isinstance(entry, data.Transaction):
new_entries.append(entry)
continue
new_postings = []
for posting in entry.postings:
try:
units = posting.units
cost_spec = posting.cost
cost = convert_spec_to_cost(units, cost_spec)
if cost_spec is not None and cost is None:
errors.append(
BookingError(entry.meta,
"Cost syntax not supported; cost spec ignored",
None))
if cost and isinstance(units, amount.Amount):
# If there is a cost, we don't allow either a cost value of
# zero, nor a zero number of units. Note that we allow a price
# of zero as the only special case (for conversion entries), but
# never for costs.
if units.number == ZERO:
raise ValueError('Amount is zero: "{}"'.format(units))
if cost.number is not None and cost.number < ZERO:
raise ValueError('Cost is negative: "{}"'.format(cost))
except ValueError as exc:
errors.append(BookingError(entry.meta, str(exc), None))
cost = None
new_postings.append(posting._replace(cost=cost))
new_entries.append(entry._replace(postings=new_postings))
return new_entries, errors
beancount.parser.booking.convert_spec_to_cost(units, cost_spec)
将一个记账项的 CostSpec 实例转换为 Cost。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking.py
def convert_spec_to_cost(units, cost_spec):
"""Convert a posting's CostSpec instance to a Cost.
Args:
units: An instance of Amount.
cost_spec: An instance of CostSpec.
Returns:
An instance of Cost.
"""
cost = cost_spec
errors = []
if isinstance(units, amount.Amount):
currency = units.currency
if cost_spec is not None:
number_per, number_total, cost_currency, date, label, merge = cost_spec
# Compute the cost.
if number_per is not MISSING or number_total is not None:
if number_total is not None:
# Compute the per-unit cost if there is some total cost
# component involved.
units_num = units.number
cost_total = number_total
if number_per is not MISSING:
cost_total += number_per * units_num
unit_cost = cost_total / abs(units_num)
else:
unit_cost = number_per
cost = position.Cost(unit_cost, cost_currency, date, label)
else:
cost = None
return cost
beancount.parser.booking.validate_inventory_booking(entries, unused_options_map, booking_methods)
验证不允许任何按成本计算的持仓变为负数。
此例程检查当记账项减少一个持仓(无论是否存在)时,后续的库存不会导致单位数量为负的持仓。负的单位数量仅在期货价差交易中做空时可能出现,而目前尚不支持此功能。虽然实现此功能并不困难,但我们希望对此保持严格,因为对这一点的严谨性是检测用户数据输入错误的绝佳方式。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking.py
def validate_inventory_booking(entries, unused_options_map, booking_methods):
"""Validate that no position at cost is allowed to go negative.
This routine checks that when a posting reduces a position, existing or not,
that the subsequent inventory does not result in a position with a negative
number of units. A negative number of units would only be required for short
trades of trading spreads on futures, and right now this is not supported.
It would not be difficult to support this, however, but we want to be strict
about it, because being pedantic about this is otherwise a great way to
detect user data entry mistakes.
Args:
entries: A list of directives.
unused_options_map: An options map.
booking_methods: A mapping of account name to booking method, accumulated
in the main loop.
Returns:
A list of errors.
"""
errors = []
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
if isinstance(entry, data.Transaction):
for posting in entry.postings:
# Update the balance of each posting on its respective account
# without allowing booking to a negative position, and if an error
# is encountered, catch it and return it.
running_balance = balances[posting.account]
position_, _ = running_balance.add_position(posting)
# Skip this check if the booking method is set to ignore it.
if booking_methods.get(posting.account, None) == data.Booking.NONE:
continue
# Check if the resulting inventory is mixed, which is not
# allowed under the STRICT method.
if running_balance.is_mixed():
errors.append(
BookingError(
entry.meta,
("Reducing position results in inventory with positive "
"and negative lots: {}").format(position_),
entry))
return errors
beancount.parser.booking.validate_missing_eliminated(entries, unused_options_map)
验证所有缺失的记账项部分都已被消除。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking.py
def validate_missing_eliminated(entries, unused_options_map):
"""Validate that all the missing bits of postings have been eliminated.
Args:
entries: A list of directives.
unused_options_map: An options map.
Returns:
A list of errors.
"""
errors = []
for entry in entries:
if isinstance(entry, data.Transaction):
for posting in entry.postings:
units = posting.units
cost = posting.cost
if (MISSING in (units.number, units.currency) or
cost is not None and MISSING in (cost.number, cost.currency,
cost.date, cost.label)):
errors.append(
BookingError(entry.meta,
"Transaction has incomplete elements",
entry))
break
return errors
beancount.parser.booking_full
完整的(新)记账实现。
问题描述:
插值与记账相互依赖:通过插值填充的数值可能影响记账过程,而由记账过程得出的数值也可能帮助完成原本定义不足的插值。以下是插值辅助记账的一个示例:
假设 Assets:Investments 的期初库存包含两笔 HOOL 股票仓位,分别以 100.00 美元和 101.00 美元的成本持有,并应用以下交易:
2015-09-30 *
Assets:Investments -10 HOOL {USD}
Assets:Cash 1000 USD
Income:Gains -200 USD
插值能够明确推导出每单位 HOOL 的成本为 100 美元,从而得出明确的记账结果。
另一方面,考虑以下交易:
2015-09-30 *
Assets:Investments -10 HOOL {USD}
Assets:Cash 1000 USD
Income:Gains
此时插值无法成功。如果 Assets:Investments 账户配置为使用先进先出(FIFO)方法,则会选择最旧的 10 股作为成本,从而可以正确插值资本利得。
第一点观察:第二种情况比第一种更常见,而第一种情况可通过明确指定特定成本轻松手动解决。此外,在许多情况下,并非仅存在单一仓位可供减少,且在给定目标成本的情况下确定正确的股票集合是一个未明确定义的问题。
第二点观察:记账仅适用于库存减少,而不适用于库存增加。因此,我们应在库存减少时执行记账,若减少部分未明确定义则尽早报错;而对于库存增加中缺失的数值,则保持其未定义状态,以便在后续阶段由插值填充。
请注意,我们希望处理但可能无法处理的一种情况是带有插值价格的减少,例如:
2015-09-30 *
Assets:Investments -10 HOOL {100.00 # USD}
Expenses:Commission 9.95 USD
Assets:Cash 990.05 USD
因此我们选择:
1) 首先对库存减少执行记账,保留库存增加部分不变(可能未定义)。已记账记账项的 'cost' 属性将从 CostSpec 转换为 Cost。对于缺少金额的增加记账项,则保留为 CostSpec 实例,以便插值能够填充总额与单位金额。
2) 对生成的条目进行插值计算。在此阶段(如果可能)可通过插值填补库存增加的未定义成本。
3) 最后,将插值后的 CostSpec 实例转换为 Cost 实例。
改进此算法需要在记账和插值步骤上循环,直到所有数值都已确定或无法再进行推断为止。我们可将其作为实验性功能留待后续考虑。我推测,这种情况极少,因此我们不会对上述算法进行改进。
beancount.parser.booking_full.CategorizationError (元组)
CategorizationError(source, message, entry)
beancount.parser.booking_full.CategorizationError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.booking_full.CategorizationError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 CategorizationError(source, message, entry) 的新实例
beancount.parser.booking_full.CategorizationError.__replace__(/, self, **kwds)
特殊
返回一个新的 CategorizationError 对象,用指定的新值替换字段
源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.booking_full.CategorizationError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/booking_full.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.booking_full.InterpolationError (元组)
InterpolationError(source, message, entry)
beancount.parser.booking_full.InterpolationError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.booking_full.InterpolationError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 InterpolationError(source, message, entry) 的新实例
beancount.parser.booking_full.InterpolationError.__replace__(/, self, **kwds)
特殊
返回一个新的 InterpolationError 对象,用指定的新值替换字段
源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.booking_full.InterpolationError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/booking_full.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.booking_full.MissingType (枚举)
缺失数值的类型。
beancount.parser.booking_full.ReductionError (元组)
ReductionError(source, message, entry)
beancount.parser.booking_full.ReductionError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.booking_full.ReductionError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 ReductionError(source, message, entry) 的新实例
beancount.parser.booking_full.ReductionError.__replace__(/, self, **kwds)
特殊
返回一个新的 ReductionError 对象,用指定的新值替换字段
源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.booking_full.ReductionError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/booking_full.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.booking_full.Refer (元组)
Refer(index, units_currency, cost_currency, price_currency)
beancount.parser.booking_full.Refer.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.booking_full.Refer.__new__(_cls, index, units_currency, cost_currency, price_currency)
特殊
静态方法
创建 Refer(index, units_currency, cost_currency, price_currency) 的新实例
beancount.parser.booking_full.Refer.__replace__(/, self, **kwds)
特殊
返回一个新的 Refer 对象,用指定的新值替换相应字段
源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.booking_full.Refer.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/booking_full.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.booking_full.SelfReduxError (元组)
SelfReduxError(source, message, entry)
beancount.parser.booking_full.SelfReduxError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.booking_full.SelfReduxError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 SelfReduxError(source, message, entry) 的新实例
beancount.parser.booking_full.SelfReduxError.__replace__(/, self, **kwds)
特殊
返回一个新的 SelfReduxError 对象,用指定的新值替换相应字段
源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.booking_full.SelfReduxError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/booking_full.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.booking_full.book(entries, options_map, methods, initial_balances=None)
使用完整的历史算法从条目中插值缺失的数据。详情请参见内部实现 _book()。此方法仅剥离部分返回值。
有关参数和返回值,请参见 _book()。
源代码位于 beancount/parser/booking_full.py
def book(entries, options_map, methods, initial_balances=None):
"""Interpolate missing data from the entries using the full historical algorithm.
See the internal implementation _book() for details.
This method only stripes some of the return values.
See _book() for arguments and return values.
"""
entries, errors, _ = _book(entries, options_map, methods, initial_balances)
return entries, errors
beancount.parser.booking_full.book_reductions(entry, group_postings, balances, methods)
将库存减少与前期余额进行匹配。
此函数接受一个 (账户, 库存余额) 字典,对于每个作为库存减少的记账项,尝试查找对应的批次或批次列表以抵减余额。
-
对于减少批次,记账项的 CostSpec 实例将被替换为 Cost 实例。
-
对于增加批次,记账项的 CostSpec 实例保持不变,仅其日期继承自父交易。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def book_reductions(entry, group_postings, balances,
methods):
"""Book inventory reductions against the ante-balances.
This function accepts a dict of (account, Inventory balance) and for each
posting that is a reduction against its inventory, attempts to find a
corresponding lot or list of lots to reduce the balance with.
* For reducing lots, the CostSpec instance of the posting is replaced by a
Cost instance.
* For augmenting lots, the CostSpec instance of the posting is left alone,
except for its date, which is inherited from the parent Transaction.
Args:
entry: An instance of Transaction. This is only used to refer to when
logging errors.
group_postings: A list of Posting instances for the group.
balances: A dict of account name to inventory contents.
methods: A mapping of account name to their corresponding booking
method enum.
Returns:
A pair of
booked_postings: A list of booked postings, with reducing lots resolved
against specific position in the corresponding accounts'
ante-inventory balances. Note single reducing posting in the input may
result in multiple postings in the output. Also note that augmenting
postings held-at-cost will still refer to 'cost' instances of
CostSpec, left to be interpolated later.
errors: A list of errors, if there were any.
"""
errors = []
# A local copy of the balances dictionary which is updated just for the
# duration of this function's updates, in order to take into account the
# cumulative effect of all the postings inferred here
local_balances = {}
empty = inventory.Inventory()
booked_postings = []
for posting in group_postings:
# Process a single posting.
units = posting.units
costspec = posting.cost
account = posting.account
# Note: We ensure there is no mutation on 'balances' to keep this
# function without side-effects. Note that we may be able to optimize
# performance later on by giving up this property.
#
# Also note that if there is no existing balance, then won't be any lot
# reduction because none of the postings will be able to match against
# any currencies of the balance.
if account not in local_balances:
previous_balance = balances.get(account, empty)
local_balances[account] = copy.copy(previous_balance)
balance = local_balances[account]
# Check if this is a lot held at cost.
if costspec is None or units.number is MISSING:
# This posting is not held at cost; we do nothing.
booked_postings.append(posting)
else:
# This posting is held at cost; figure out if it's a reduction or an
# augmentation.
method = methods[account]
if (method is not Booking.NONE and
balance is not None and
balance.is_reduced_by(units)):
# This posting is a reduction.
# Match the positions.
cost_number = compute_cost_number(costspec, units)
matches = []
for position in balance:
# Skip inventory contents of a different currency.
if (units.currency and
position.units.currency != units.currency):
continue
# Skip balance positions not held at cost.
if position.cost is None:
continue
if (cost_number is not None and
position.cost.number != cost_number):
continue
if (isinstance(costspec.currency, str) and
position.cost.currency != costspec.currency):
continue
if (costspec.date and
position.cost.date != costspec.date):
continue
if (costspec.label and
position.cost.label != costspec.label):
continue
matches.append(position)
# Check for ambiguous matches.
if len(matches) == 0:
errors.append(
ReductionError(entry.meta,
'No position matches "{}" against balance {}'.format(
posting, balance),
entry))
return [], errors # This is irreconcilable, remove these postings.
# TODO(blais): We'll have to change this, as we want to allow
# positions crossing from negative to positive and vice-versa in
# a simple application. See {d3cbd78f1029}.
reduction_postings, matched_postings, ambi_errors = (
booking_method.handle_ambiguous_matches(entry, posting, matches,
method))
if ambi_errors:
errors.extend(ambi_errors)
return [], errors
# Add the reductions to the resulting list of booked postings.
booked_postings.extend(reduction_postings)
# Update the local balance in order to avoid matching against
# the same postings twice when processing multiple postings in
# the same transaction. Note that we only do this for postings
# held at cost because the other postings may need interpolation
# in order to be resolved properly.
for posting in reduction_postings:
balance.add_position(posting)
else:
# This posting is an augmentation.
#
# Note that we do not convert the CostSpec instances to Cost
# instances, because we want to let the subsequent interpolation
# process able to interpolate either the cost per-unit or the
# total cost, separately.
# Put in the date of the parent Transaction if there is no
# explicit date specified on the spec.
if costspec.date is None:
dated_costspec = costspec._replace(date=entry.date)
posting = posting._replace(cost=dated_costspec)
# FIXME: Insert unique ids for trade tracking; right now this
# creates ambiguous matches errors (and it shouldn't).
# # Insert a unique label if there isn't one.
# if posting.cost is not None and posting.cost.label is None:
# posting = posting._replace(
# cost=posting.cost._replace(label=unique_label()))
booked_postings.append(posting)
return booked_postings, errors
beancount.parser.booking_full.categorize_by_currency(entry, balances)
按其声明的货币对记账项进行分组。
这用于为下一阶段(插值和记账)准备记账项:随后将分别对每个货币组执行插值和记账。在本流程开始时,我们应已拥有明确的货币分组,且不存在关于其需与何种货币平衡的任何歧义。
以下是其工作原理。
-
首先,我们应用约束条件:如果同时存在成本和价格,则成本货币和价格货币必须一致。这在一定程度上缩小了可能性范围。
-
如果货币已明确指定,则将该记账项放入对应货币的分组中。
-
如果未明确指定,则我们还有几种方法可用于消除歧义:
-
我们查看其余的记账项……如果它们全部属于同一种货币,则该记账项也必须属于该货币。
-
如果无法通过上述方式确定,则检查该记账项账户的库存内容。如果所有库存内容均为同一种货币,则使用该货币。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def categorize_by_currency(entry, balances):
"""Group the postings by the currency they declare.
This is used to prepare the postings for the next stages: Interpolation and
booking will then be carried out separately on each currency group. At the
outset of this routine, we should have distinct groups of currencies without
any ambiguities regarding which currency they need to balance against.
Here's how this works.
- First we apply the constraint that cost-currency and price-currency must
match, if there is both a cost and a price. This reduces the space of
possibilities somewhat.
- If the currency is explicitly specified, we put the posting in that
currency's bucket.
- If not, we have a few methods left to disambiguate the currency:
1. We look at the remaining postings... if they are all of a single
currency, the posting must be in that currency too.
2. If we cannot do that, we inspect the contents of the inventory of the
account for the posting. If all the contents are of a single currency,
we use that one.
Args:
postings: A list of incomplete postings to categorize.
balances: A dict of currency to inventory contents before the transaction is
applied.
Returns:
A list of (currency string, list of tuples) items describing each postings
and its interpolated currencies, and a list of generated errors for
currency interpolation. The entry's original postings are left unmodified.
Each tuple in the value-list contains:
index: The posting index in the original entry.
units_currency: The interpolated currency for units.
cost_currency: The interpolated currency for cost.
price_currency: The interpolated currency for price.
"""
errors = []
groups = collections.defaultdict(list)
sortdict = {}
auto_postings = []
unknown = []
for index, posting in enumerate(entry.postings):
units = posting.units
cost = posting.cost
price = posting.price
# Extract and override the currencies locally.
units_currency = (units.currency
if units is not MISSING and units is not None
else None)
cost_currency = (cost.currency
if cost is not MISSING and cost is not None
else None)
price_currency = (price.currency
if price is not MISSING and price is not None
else None)
# First we apply the constraint that cost-currency and price-currency
# must match, if there is both a cost and a price. This reduces the
# space of possibilities somewhat.
if cost_currency is MISSING and isinstance(price_currency, str):
cost_currency = price_currency
if price_currency is MISSING and isinstance(cost_currency, str):
price_currency = cost_currency
refer = Refer(index, units_currency, cost_currency, price_currency)
if units is MISSING and price_currency is None:
# Bucket auto-postings separately from unknown.
auto_postings.append(refer)
else:
# Bucket with what we know so far.
currency = get_bucket_currency(refer)
if currency is not None:
sortdict.setdefault(currency, index)
groups[currency].append(refer)
else:
# If we need to infer the currency, store in unknown.
unknown.append(refer)
# We look at the remaining postings... if they are all of a single currency,
# the posting must be in that currency too.
if unknown and len(unknown) == 1 and len(groups) == 1:
(index, units_currency, cost_currency, price_currency) = unknown.pop()
other_currency = next(iter(groups.keys()))
if price_currency is None and cost_currency is None:
# Infer to the units currency.
units_currency = other_currency
else:
# Infer to the cost and price currencies.
if price_currency is MISSING:
price_currency = other_currency
if cost_currency is MISSING:
cost_currency = other_currency
refer = Refer(index, units_currency, cost_currency, price_currency)
currency = get_bucket_currency(refer)
assert currency is not None
sortdict.setdefault(currency, index)
groups[currency].append(refer)
# Finally, try to resolve all the unknown legs using the inventory contents
# of each account.
for refer in unknown:
(index, units_currency, cost_currency, price_currency) = refer
posting = entry.postings[index]
balance = balances.get(posting.account, None)
if balance is None:
balance = inventory.Inventory()
if units_currency is MISSING:
balance_currencies = balance.currencies()
if len(balance_currencies) == 1:
units_currency = balance_currencies.pop()
if cost_currency is MISSING or price_currency is MISSING:
balance_cost_currencies = balance.cost_currencies()
if len(balance_cost_currencies) == 1:
balance_cost_currency = balance_cost_currencies.pop()
if price_currency is MISSING:
price_currency = balance_cost_currency
if cost_currency is MISSING:
cost_currency = balance_cost_currency
refer = Refer(index, units_currency, cost_currency, price_currency)
currency = get_bucket_currency(refer)
if currency is not None:
sortdict.setdefault(currency, index)
groups[currency].append(refer)
else:
errors.append(
CategorizationError(posting.meta,
"Failed to categorize posting {}".format(index + 1),
entry))
# Fill in missing units currencies if some remain as missing. This may occur
# if we used the cost or price to bucket the currency but the units currency
# was missing.
for currency, refers in groups.items():
for rindex, refer in enumerate(refers):
if refer.units_currency is MISSING:
posting = entry.postings[refer.index]
balance = balances.get(posting.account, None)
if balance is None:
continue
balance_currencies = balance.currencies()
if len(balance_currencies) == 1:
refers[rindex] = refer._replace(units_currency=balance_currencies.pop())
# Deal with auto-postings.
if len(auto_postings) > 1:
refer = auto_postings[-1]
posting = entry.postings[refer.index]
errors.append(
CategorizationError(posting.meta,
"You may not have more than one auto-posting per currency",
entry))
auto_postings = auto_postings[0:1]
for refer in auto_postings:
for currency, glist in groups.items():
sortdict.setdefault(currency, refer.index)
glist.append(Refer(refer.index, currency, None, None))
# Issue error for all currencies which we could not resolve.
for currency, refers in groups.items():
for refer in refers:
posting = entry.postings[refer.index]
for currency, name in [(refer.units_currency, 'units'),
(refer.cost_currency, 'cost'),
(refer.price_currency, 'price')]:
if currency is MISSING:
errors.append(CategorizationError(
posting.meta,
"Could not resolve {} currency".format(name),
entry))
sorted_groups = sorted(groups.items(), key=lambda item: sortdict[item[0]])
return sorted_groups, errors
beancount.parser.booking_full.compute_cost_number(costspec, units)
给定一个 CostSpec,若可能,则返回其成本数值。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def compute_cost_number(costspec, units):
"""Given a CostSpec, return the cost number, if possible to compute.
Args:
costspec: A parsed instance of CostSpec.
units: An instance of Amount for the units of the position.
Returns:
If it is not possible to calculate the cost, return None.
Otherwise, returns a Decimal instance, the per-unit cost.
"""
number_per = costspec.number_per
number_total = costspec.number_total
if MISSING in (number_per, number_total):
return None
if number_total is not None:
# Compute the per-unit cost if there is some total cost
# component involved.
cost_total = number_total
units_number = abs(units.number)
if number_per is not None:
cost_total += number_per * units_number
unit_cost = cost_total / units_number
elif number_per is None:
return None
else:
unit_cost = number_per
return unit_cost
beancount.parser.booking_full.convert_costspec_to_cost(posting)
若记账项中存在 CostSpec,则将其转换为 Cost。
若记账项无成本,则直接返回其本身。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def convert_costspec_to_cost(posting):
"""Convert an instance of CostSpec to Cost, if present on the posting.
If the posting has no cost, it itself is just returned.
Args:
posting: An instance of Posting.
Returns:
An instance of Posting with a possibly replaced 'cost' attribute.
"""
cost = posting.cost
if isinstance(cost, position.CostSpec):
if cost is not None:
number_per = cost.number_per
number_total = cost.number_total
if number_total is not None:
# Compute the per-unit cost if there is some total cost
# component involved.
units_number = abs(posting.units.number)
cost_total = number_total
if number_per is not MISSING:
cost_total += number_per * units_number
unit_cost = cost_total / units_number
else:
unit_cost = number_per
new_cost = Cost(unit_cost, cost.currency, cost.date, cost.label)
posting = posting._replace(units=posting.units, cost=new_cost)
return posting
beancount.parser.booking_full.get_bucket_currency(refer)
给定记账项的货币引用,返回其所属分组货币。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def get_bucket_currency(refer):
"""Given currency references for a posting, return the bucket currency.
Args:
refer: An instance of Refer.
Returns:
A currency string.
"""
currency = None
if isinstance(refer.cost_currency, str):
currency = refer.cost_currency
elif isinstance(refer.price_currency, str):
currency = refer.price_currency
elif (refer.cost_currency is None and
refer.price_currency is None and
isinstance(refer.units_currency, str)):
currency = refer.units_currency
return currency
beancount.parser.booking_full.has_self_reduction(postings, methods)
若记账项可能在成本层面相互抵减,则返回 True。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def has_self_reduction(postings, methods):
"""Return true if the postings potentially reduce each other at cost.
Args:
postings: A list of postings with uninterpolated CostSpec cost instances.
methods: A mapping of account name to their corresponding booking
method.
Returns:
A boolean, true if there's a potential for self-reduction.
"""
# A mapping of (currency, cost-currency) and sign.
cost_changes = {}
for posting in postings:
cost = posting.cost
if cost is None:
continue
if methods[posting.account] is Booking.NONE:
continue
key = (posting.account, posting.units.currency)
sign = 1 if posting.units.number > ZERO else -1
if cost_changes.setdefault(key, sign) != sign:
return True
return False
beancount.parser.booking_full.interpolate_group(postings, balances, currency, tolerances)
插值补全一组记账项中缺失的数值。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def interpolate_group(postings, balances, currency, tolerances):
"""Interpolate missing numbers in the set of postings.
Args:
postings: A list of Posting instances.
balances: A dict of account to its ante-inventory.
currency: The weight currency of this group, used for reporting errors.
tolerances: A dict of currency to tolerance values.
Returns:
A tuple of
postings: A list of new posting instances.
errors: A list of errors generated during interpolation.
interpolated: A boolean, true if we did have to interpolate.
In the case of an error, this returns the original list of postings, which
is still incomplete. If an error is returned, you should probably skip the
transaction altogether, or just not include the postings in it. (An
alternative behaviour would be to return only the list of valid postings,
but that would likely result in an unbalanced transaction. We do it this
way by choice.)
"""
errors = []
# Figure out which type of amount is missing, by creating a list of
# incomplete postings and which type of units is missing.
incomplete = []
for index, posting in enumerate(postings):
units = posting.units
cost = posting.cost
price = posting.price
# Identify incomplete parts of the Posting components.
if units.number is MISSING:
incomplete.append((MissingType.UNITS, index))
if isinstance(cost, CostSpec):
if cost and cost.number_per is MISSING:
incomplete.append((MissingType.COST_PER, index))
if cost and cost.number_total is MISSING:
incomplete.append((MissingType.COST_TOTAL, index))
else:
# Check that a resolved instance of Cost never needs interpolation.
#
# Note that in theory we could support the interpolation of regular
# per-unit costs in these if we wanted to; but because they're all
# reducing postings that have been booked earlier, those never need
# to be interpolated.
if cost is not None:
assert isinstance(cost.number, Decimal), (
"Internal error: cost has no number: {}; on postings: {}".format(
cost, postings))
if price and price.number is MISSING:
incomplete.append((MissingType.PRICE, index))
# The replacement posting for the incomplete posting of this group.
new_posting = None
if len(incomplete) == 0:
# If there are no missing numbers, just convert the CostSpec to Cost and
# return that.
out_postings = [convert_costspec_to_cost(posting)
for posting in postings]
elif len(incomplete) > 1:
# If there is more than a single value to be interpolated, generate an
# error and return no postings.
_, posting_index = incomplete[0]
errors.append(InterpolationError(
postings[posting_index].meta,
"Too many missing numbers for currency group '{}'".format(currency),
None))
out_postings = []
else:
# If there is a single missing number, calculate it and fill it in here.
missing, index = incomplete[0]
incomplete_posting = postings[index]
# Convert augmenting postings' costs from CostSpec to corresponding Cost
# instances, except for the incomplete posting.
new_postings = [(posting
if posting is incomplete_posting
else convert_costspec_to_cost(posting))
for posting in postings]
# Compute the balance of the other postings.
residual = interpolate.compute_residual(posting
for posting in new_postings
if posting is not incomplete_posting)
assert len(residual) < 2, "Internal error in grouping postings by currencies."
if not residual.is_empty():
respos = next(iter(residual))
assert respos.cost is None, (
"Internal error; cost appears in weight calculation.")
assert respos.units.currency == currency, (
"Internal error; residual different than currency group.")
weight = -respos.units.number
weight_currency = respos.units.currency
else:
weight = ZERO
weight_currency = currency
if missing == MissingType.UNITS:
units = incomplete_posting.units
cost = incomplete_posting.cost
if cost:
# Handle the special case where we only have total cost.
if cost.number_per == ZERO:
errors.append(InterpolationError(
incomplete_posting.meta,
"Cannot infer per-unit cost only from total", None))
return postings, errors, True
assert cost.currency == weight_currency, (
"Internal error; residual currency different than missing currency.")
cost_total = cost.number_total or ZERO
units_number = (weight - cost_total) / cost.number_per
elif incomplete_posting.price:
assert incomplete_posting.price.currency == weight_currency, (
"Internal error; residual currency different than missing currency.")
units_number = weight / incomplete_posting.price.number
else:
assert units.currency == weight_currency, (
"Internal error; residual currency different than missing currency.")
units_number = weight
# Quantize the interpolated units if necessary.
units_number = interpolate.quantize_with_tolerance(tolerances,
units.currency,
units_number)
if weight != ZERO:
new_pos = Position(Amount(units_number, units.currency), cost)
new_posting = incomplete_posting._replace(units=new_pos.units,
cost=new_pos.cost)
else:
new_posting = None
elif missing == MissingType.COST_PER:
units = incomplete_posting.units
cost = incomplete_posting.cost
assert cost.currency == weight_currency, (
"Internal error; residual currency different than missing currency.")
if units.number != ZERO:
number_per = (weight - (cost.number_total or ZERO)) / units.number
new_cost = cost._replace(number_per=number_per)
new_pos = Position(units, new_cost)
new_posting = incomplete_posting._replace(units=new_pos.units,
cost=new_pos.cost)
else:
new_posting = None
elif missing == MissingType.COST_TOTAL:
units = incomplete_posting.units
cost = incomplete_posting.cost
assert cost.currency == weight_currency, (
"Internal error; residual currency different than missing currency.")
number_total = (weight - cost.number_per * units.number)
new_cost = cost._replace(number_total=number_total)
new_pos = Position(units, new_cost)
new_posting = incomplete_posting._replace(units=new_pos.units,
cost=new_pos.cost)
elif missing == MissingType.PRICE:
units = incomplete_posting.units
cost = incomplete_posting.cost
if cost is not None:
errors.append(InterpolationError(
incomplete_posting.meta,
"Cannot infer price for postings with units held at cost", None))
return postings, errors, True
else:
price = incomplete_posting.price
assert price.currency == weight_currency, (
"Internal error; residual currency different than missing currency.")
new_price_number = abs(weight / units.number)
new_posting = incomplete_posting._replace(price=Amount(new_price_number,
price.currency))
else:
assert False, "Internal error; Invalid missing type."
# Replace the number in the posting.
if new_posting is not None:
# Set meta-data on the new posting to indicate it was interpolated.
if new_posting.meta is None:
new_posting = new_posting._replace(meta={})
new_posting.meta[interpolate.AUTOMATIC_META] = True
# Convert augmenting posting costs from CostSpec to a corresponding
# Cost instance.
new_postings[index] = convert_costspec_to_cost(new_posting)
else:
del new_postings[index]
out_postings = new_postings
assert all(not isinstance(posting.cost, CostSpec)
for posting in out_postings)
# Check that units are non-zero and that no cost remains negative; issue an
# error if this is the case.
for posting in out_postings:
if posting.cost is None:
continue
# If there is a cost, we don't allow either a cost value of zero,
# nor a zero number of units. Note that we allow a price of zero as
# the only special case allowed (for conversion entries), but never
# for costs.
if posting.units.number == ZERO:
errors.append(InterpolationError(
posting.meta,
'Amount is zero: "{}"'.format(posting.units), None))
if posting.cost.number is not None and posting.cost.number < ZERO:
errors.append(InterpolationError(
posting.meta,
'Cost is negative: "{}"'.format(posting.cost), None))
return out_postings, errors, (new_posting is not None)
beancount.parser.booking_full.replace_currencies(postings, refer_groups)
替换条目中记账项的已解析货币。
此函数本质上将 categorize_by_currency() 的结果应用于生成所有货币均已解析的新记账项。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_full.py
def replace_currencies(postings, refer_groups):
"""Replace resolved currencies in the entry's Postings.
This essentially applies the findings of categorize_by_currency() to produce
new postings with all currencies resolved.
Args:
postings: A list of Posting instances to replace.
refer_groups: A list of (currency, list of posting references) items as
returned by categorize_by_currency().
Returns:
A new list of items of (currency, list of Postings), postings for which the
currencies have been replaced by their interpolated currency values.
"""
new_groups = []
for currency, refers in refer_groups:
new_postings = []
for refer in sorted(refers, key=lambda r: r.index):
posting = postings[refer.index]
units = posting.units
if units is MISSING or units is None:
posting = posting._replace(units=Amount(MISSING, refer.units_currency))
else:
replace = False
cost = posting.cost
price = posting.price
if units.currency is MISSING:
units = Amount(units.number, refer.units_currency)
replace = True
if cost and cost.currency is MISSING:
cost = cost._replace(currency=refer.cost_currency)
replace = True
if price and price.currency is MISSING:
price = Amount(price.number, refer.price_currency)
replace = True
if replace:
posting = posting._replace(units=units, cost=cost, price=price)
new_postings.append(posting)
new_groups.append((currency, new_postings))
return new_groups
beancount.parser.booking_full.unique_label()
为成本条目返回一个全局唯一的标签。
源代码位于 beancount/parser/booking_full.py
def unique_label() -> str:
"Return a globally unique label for cost entries."
return str(uuid.uuid4())
beancount.parser.booking_method
所有特定记账方法的实现。此代码被完整记账算法使用。
beancount.parser.booking_method.AmbiguousMatchError (元组)
AmbiguousMatchError(source, message, entry)
beancount.parser.booking_method.AmbiguousMatchError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/booking_method.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.booking_method.AmbiguousMatchError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 AmbiguousMatchError(source, message, entry) 的新实例
beancount.parser.booking_method.AmbiguousMatchError.__replace__(/, self, **kwds)
特殊
返回一个新 AmbiguousMatchError 对象,用指定的新值替换字段
源代码位于 beancount/parser/booking_method.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.booking_method.AmbiguousMatchError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/booking_method.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.booking_method.booking_method_AVERAGE(entry, posting, matches)
AVERAGE 记账方法的实现。
源代码位于 beancount/parser/booking_method.py
def booking_method_AVERAGE(entry, posting, matches):
"""AVERAGE booking method implementation."""
booked_reductions = []
booked_matches = []
errors = [AmbiguousMatchError(entry.meta, "AVERAGE method is not supported", entry)]
return booked_reductions, booked_matches, errors, False
# FIXME: Future implementation here.
# pylint: disable=unreachable
if False: # pylint: disable=using-constant-test
# DISABLED - This is the code for AVERAGE, which is currently disabled.
# If there is more than a single match we need to ultimately merge the
# postings. Also, if the reducing posting provides a specific cost, we
# need to update the cost basis as well. Both of these cases are carried
# out by removing all the matches and readding them later on.
if len(matches) == 1 and (
not isinstance(posting.cost.number_per, Decimal) and
not isinstance(posting.cost.number_total, Decimal)):
# There is no cost. Just reduce the one leg. This should be the
# normal case if we always merge augmentations and the user lets
# Beancount deal with the cost.
match = matches[0]
sign = -1 if posting.units.number < ZERO else 1
number = min(abs(match.units.number), abs(posting.units.number))
match_units = Amount(number * sign, match.units.currency)
booked_reductions.append(posting._replace(units=match_units, cost=match.cost))
insufficient = (match_units.number != posting.units.number)
else:
# Merge the matching postings to a single one.
merged_units = inventory.Inventory()
merged_cost = inventory.Inventory()
for match in matches:
merged_units.add_amount(match.units)
merged_cost.add_amount(convert.get_weight(match))
if len(merged_units) != 1 or len(merged_cost) != 1:
errors.append(
AmbiguousMatchError(
entry.meta,
'Cannot merge positions in multiple currencies: {}'.format(
', '.join(position.to_string(match_posting)
for match_posting in matches)), entry))
else:
if (isinstance(posting.cost.number_per, Decimal) or
isinstance(posting.cost.number_total, Decimal)):
errors.append(
AmbiguousMatchError(
entry.meta,
"Explicit cost reductions aren't supported yet: {}".format(
position.to_string(posting)), entry))
else:
# Insert postings to remove all the matches.
booked_reductions.extend(
posting._replace(units=-match.units, cost=match.cost,
flag=flags.FLAG_MERGING)
for match in matches)
units = merged_units[0].units
date = matches[0].cost.date ## FIXME: Select which one,
## oldest or latest.
cost_units = merged_cost[0].units
cost = Cost(cost_units.number/units.number, cost_units.currency,
date, None)
# Insert a posting to refill those with a replacement match.
booked_reductions.append(
posting._replace(units=units, cost=cost, flag=flags.FLAG_MERGING))
# Now, match the reducing request against this lot.
booked_reductions.append(
posting._replace(units=posting.units, cost=cost))
insufficient = abs(posting.units.number) > abs(units.number)
beancount.parser.booking_method.booking_method_FIFO(entry, posting, matches)
FIFO 记账方法实现。
源代码位于 beancount/parser/booking_method.py
def booking_method_FIFO(entry, posting, matches):
"""FIFO booking method implementation."""
return _booking_method_xifo(entry, posting, matches, "date", False)
beancount.parser.booking_method.booking_method_HIFO(entry, posting, matches)
HIFO 记账方法实现。
源代码位于 beancount/parser/booking_method.py
def booking_method_HIFO(entry, posting, matches):
"""HIFO booking method implementation."""
return _booking_method_xifo(entry, posting, matches, "number", True)
beancount.parser.booking_method.booking_method_LIFO(entry, posting, matches)
LIFO 记账方法实现。
源代码位于 beancount/parser/booking_method.py
def booking_method_LIFO(entry, posting, matches):
"""LIFO booking method implementation."""
return _booking_method_xifo(entry, posting, matches, "date", True)
beancount.parser.booking_method.booking_method_NONE(entry, posting, matches)
NONE 记账方法实现。
源代码位于 beancount/parser/booking_method.py
def booking_method_NONE(entry, posting, matches):
"""NONE booking method implementation."""
# This never needs to match against any existing positions... we
# disregard the matches, there's never any error. Note that this never
# gets called in practice, we want to treat NONE postings as
# augmentations. Default behaviour is to return them with their original
# CostSpec, and the augmentation code will handle signaling an error if
# there is insufficient detail to carry out the conversion to an
# instance of Cost.
# Note that it's an interesting question whether a reduction on an
# account with NONE method which happens to match a single position
# ought to be matched against it. We don't allow it for now.
return [posting], [], False
beancount.parser.booking_method.booking_method_STRICT(entry, posting, matches)
严格记账方法。如果存在模糊匹配,此方法将失败。
源代码位于 beancount/parser/booking_method.py
def booking_method_STRICT(entry, posting, matches):
"""Strict booking method. This method fails if there are ambiguous matches.
"""
booked_reductions = []
booked_matches = []
errors = []
insufficient = False
# In strict mode, we require at most a single matching posting.
if len(matches) > 1:
# If the total requested to reduce matches the sum of all the
# ambiguous postings, match against all of them.
sum_matches = sum(p.units.number for p in matches)
if sum_matches == -posting.units.number:
booked_reductions.extend(
posting._replace(units=-match.units, cost=match.cost)
for match in matches)
else:
errors.append(
AmbiguousMatchError(entry.meta,
'Ambiguous matches for "{}": {}'.format(
position.to_string(posting),
', '.join(position.to_string(match_posting)
for match_posting in matches)),
entry))
else:
# Replace the posting's units and cost values.
match = matches[0]
sign = -1 if posting.units.number < ZERO else 1
number = min(abs(match.units.number), abs(posting.units.number))
match_units = Amount(number * sign, match.units.currency)
booked_reductions.append(posting._replace(units=match_units, cost=match.cost))
booked_matches.append(match)
insufficient = (match_units.number != posting.units.number)
return booked_reductions, booked_matches, errors, insufficient
beancount.parser.booking_method.booking_method_STRICT_WITH_SIZE(entry, posting, matches)
严格记账方法,但进一步通过数量消除歧义。
此记账方法应用与 STRICT 方法相同的算法,但如果仅有一个模糊批次匹配所需数量,则自动选择该批次。
源代码位于 beancount/parser/booking_method.py
def booking_method_STRICT_WITH_SIZE(entry, posting, matches):
"""Strict booking method, but disambiguate further with sizes.
This booking method applies the same algorithm as the STRICT method, but if
only one of the ambiguous lots matches the desired size, select that one
automatically.
"""
(booked_reductions, booked_matches, errors,
insufficient) = booking_method_STRICT(entry, posting, matches)
# If we couldn't match strictly, attempt to find a match with the same
# number of units. If there is one or more of these, accept the oldest lot.
if errors and len(matches) > 1:
number = -posting.units.number
matching_units = [match
for match in matches
if number == match.units.number]
if matching_units:
matching_units.sort(key=lambda match: match.cost.date)
# Replace the posting's units and cost values.
match = matching_units[0]
booked_reductions.append(posting._replace(units=-match.units, cost=match.cost))
booked_matches.append(match)
insufficient = False
errors = []
return booked_reductions, booked_matches, errors, insufficient
beancount.parser.booking_method.handle_ambiguous_matches(entry, posting, matches, method)
通过分派到特定方法来处理模糊匹配。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/booking_method.py
def handle_ambiguous_matches(entry, posting, matches, method):
"""Handle ambiguous matches by dispatching to a particular method.
Args:
entry: The parent Transaction instance.
posting: An instance of Posting, the reducing posting which we're
attempting to match.
matches: A list of matching Position instances from the ante-inventory.
Those positions are known to already match the 'posting' spec.
methods: A mapping of account name to their corresponding booking
method.
Returns:
A triple of
booked_reductions: A list of matched Posting instances, whose 'cost'
attributes are ensured to be of type Cost.
errors: A list of errors to be generated.
insufficient: A boolean, true if we could not find enough matches to
cover the entire position.
"""
assert isinstance(method, Booking), (
"Invalid type: {}".format(method))
assert matches, "Internal error: Invalid call with no matches"
#method = globals()['booking_method_{}'.format(method.name)]
method = _BOOKING_METHODS[method]
(booked_reductions,
booked_matches, errors, insufficient) = method(entry, posting, matches)
if insufficient:
errors.append(
AmbiguousMatchError(entry.meta,
'Not enough lots to reduce "{}": {}'.format(
position.to_string(posting),
', '.join(position.to_string(match_posting)
for match_posting in matches)),
entry))
return booked_reductions, booked_matches, errors
beancount.parser.cmptest
用于测试脚本的辅助工具。
beancount.parser.cmptest.TestCase (TestCase)
beancount.parser.cmptest.TestCase.assertEqualEntries(self, expected_entries, actual_entries, allow_incomplete=False)
检查两个条目列表是否相等。
条目可以作为指令列表或字符串提供。在后一种情况下,字符串将使用 beancount.parser.parse_string() 解析,并使用生成的指令列表。如果 allow_incomplete 为 True,则在比较指令列表前执行轻量级记账,以允许比较具有不完整记账项的交易。
| 参数: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/cmptest.py
def assertEqualEntries(self, expected_entries, actual_entries, allow_incomplete=False):
"""Check that two lists of entries are equal.
Entries can be provided either as a list of directives or as a
string. In the latter case, the string is parsed with
beancount.parser.parse_string() and the resulting directives
list is used. If allow_incomplete is True, light-weight
booking is performed before comparing the directive lists,
allowing to compare transactions with incomplete postings.
Args:
expected_entries: Expected entries.
actual_entries: Actual entries.
allow_incomplete: Perform booking before comparison.
Raises:
AssertionError: If the exception fails.
"""
expected_entries = read_string_or_entries(expected_entries, allow_incomplete)
actual_entries = read_string_or_entries(actual_entries, allow_incomplete)
same, expected_missing, actual_missing = \
compare.compare_entries(expected_entries, actual_entries)
if not same:
assert expected_missing or actual_missing, \
"Missing is missing: {}, {}".format(expected_missing, actual_missing)
oss = io.StringIO()
if expected_missing:
oss.write("Present in expected set and not in actual set:\n\n")
for entry in expected_missing:
oss.write(printer.format_entry(entry))
oss.write('\n')
if actual_missing:
oss.write("Present in actual set and not in expected set:\n\n")
for entry in actual_missing:
oss.write(printer.format_entry(entry))
oss.write('\n')
self.fail(oss.getvalue())
beancount.parser.cmptest.TestCase.assertExcludesEntries(self, subset_entries, entries, allow_incomplete=False)
检查 subset_entries 是否未包含在 entries 中。
条目可以作为指令列表或字符串提供。在后一种情况下,字符串将使用 beancount.parser.parse_string() 解析,并使用生成的指令列表。如果 allow_incomplete 为 True,则在比较指令列表前执行轻量级记账,以允许比较具有不完整记账项的交易。
| 参数: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/cmptest.py
def assertExcludesEntries(self, subset_entries, entries, allow_incomplete=False):
"""Check that subset_entries is not included in entries.
Entries can be provided either as a list of directives or as a
string. In the latter case, the string is parsed with
beancount.parser.parse_string() and the resulting directives
list is used. If allow_incomplete is True, light-weight
booking is performed before comparing the directive lists,
allowing to compare transactions with incomplete postings.
Args:
subset_entries: Subset entries.
entries: Entries.
allow_incomplete: Perform booking before comparison.
Raises:
AssertionError: If the exception fails.
"""
subset_entries = read_string_or_entries(subset_entries, allow_incomplete)
entries = read_string_or_entries(entries)
excludes, extra = compare.excludes_entries(subset_entries, entries)
if not excludes:
assert extra, "Extra is empty: {}".format(extra)
oss = io.StringIO()
if extra:
oss.write("Extra from from first/excluded set:\n\n")
for entry in extra:
oss.write(printer.format_entry(entry))
oss.write('\n')
self.fail(oss.getvalue())
beancount.parser.cmptest.TestCase.assertIncludesEntries(self, subset_entries, entries, allow_incomplete=False)
检查 subset_entries 是否包含在 entries 中。
条目可以作为指令列表或字符串提供。在后一种情况下,字符串将使用 beancount.parser.parse_string() 解析,并使用生成的指令列表。如果 allow_incomplete 为 True,则在比较指令列表前执行轻量级记账,以允许比较具有不完整记账项的交易。
| 参数: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/cmptest.py
def assertIncludesEntries(self, subset_entries, entries, allow_incomplete=False):
"""Check that subset_entries is included in entries.
Entries can be provided either as a list of directives or as a
string. In the latter case, the string is parsed with
beancount.parser.parse_string() and the resulting directives
list is used. If allow_incomplete is True, light-weight
booking is performed before comparing the directive lists,
allowing to compare transactions with incomplete postings.
Args:
subset_entries: Subset entries.
entries: Entries.
allow_incomplete: Perform booking before comparison.
Raises:
AssertionError: If the exception fails.
"""
subset_entries = read_string_or_entries(subset_entries, allow_incomplete)
entries = read_string_or_entries(entries)
includes, missing = compare.includes_entries(subset_entries, entries)
if not includes:
assert missing, "Missing is empty: {}".format(missing)
oss = io.StringIO()
if missing:
oss.write("Missing from from expected set:\n\n")
for entry in missing:
oss.write(printer.format_entry(entry))
oss.write('\n')
self.fail(oss.getvalue())
beancount.parser.cmptest.TestError (异常)
测试实现内部的错误。这些错误不应发生。
beancount.parser.cmptest.read_string_or_entries(entries_or_str, allow_incomplete=False)
读取条目字符串或直接读取条目。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/cmptest.py
def read_string_or_entries(entries_or_str, allow_incomplete=False):
"""Read a string of entries or just entries.
Args:
entries_or_str: Either a list of directives, or a string containing directives.
allow_incomplete: A boolean, true if we allow incomplete inputs and perform
light-weight booking.
Returns:
A list of directives.
"""
if isinstance(entries_or_str, str):
entries, errors, options_map = parser.parse_string(
textwrap.dedent(entries_or_str))
if allow_incomplete:
# Do a simplistic local conversion in order to call the comparison.
entries = [_local_booking(entry) for entry in entries]
else:
# Don't accept incomplete entries either.
if any(parser.is_entry_incomplete(entry) for entry in entries):
raise TestError("Entries in assertions may not use interpolation.")
entries, booking_errors = booking.book(entries, options_map)
errors = errors + booking_errors
# Don't tolerate errors.
if errors:
oss = io.StringIO()
printer.print_errors(errors, file=oss)
raise TestError("Unexpected errors in expected: {}".format(oss.getvalue()))
else:
assert isinstance(entries_or_str, list), "Expecting list: {}".format(entries_or_str)
entries = entries_or_str
return entries
beancount.parser.context
生成在应用特定条目前后账户余额的渲染结果。
beancount.parser.context.render_entry_context(entries, options_map, entry, parsed_entry=None)
渲染特定交易应用前后的上下文。
| 参数: |
|
|---|
| 返回: |
|
|---|
beancount/parser/context.py 中的源代码
def render_entry_context(entries, options_map, entry, parsed_entry=None):
"""Render the context before and after a particular transaction is applied.
Args:
entries: A list of directives.
options_map: A dict of options, as produced by the parser.
entry: The entry instance which should be rendered. (Note that this object is
expected to be in the set of entries, not just structurally equal.)
parsed_entry: An optional incomplete, parsed but not booked nor interpolated
entry. If this is provided, this is used for inspecting the list of prior
accounts and it is also rendered.
Returns:
A multiline string of text, which consists of the context before the
transaction is applied, the transaction itself, and the context after it
is applied. You can just print that, it is in form that is intended to be
consumed by the user.
"""
oss = io.StringIO()
pr = functools.partial(print, file=oss)
header = "** {} --------------------------------"
meta = entry.meta
pr(header.format("Transaction Id"))
pr()
pr("Hash:{}".format(compare.hash_entry(entry)))
pr("Location: {}:{}".format(meta["filename"], meta["lineno"]))
pr()
pr()
# Get the list of accounts sorted by the order in which they appear in the
# closest entry.
order = {}
if parsed_entry is None:
parsed_entry = entry
if isinstance(parsed_entry, data.Transaction):
order = {posting.account: index
for index, posting in enumerate(parsed_entry.postings)}
accounts = sorted(getters.get_entry_accounts(parsed_entry),
key=lambda account: order.get(account, 10000))
# Accumulate the balances of these accounts up to the entry.
balance_before, balance_after = interpolate.compute_entry_context(
entries, entry, additional_accounts=accounts)
# Create a format line for printing the contents of account balances.
max_account_width = max(map(len, accounts)) if accounts else 1
position_line = '{{:1}} {{:{width}}} {{:>49}}'.format(width=max_account_width)
# Print the context before.
pr(header.format("Balances before transaction"))
pr()
before_hashes = set()
average_costs = {}
for account in accounts:
balance = balance_before[account]
pc_balances = balance.split()
for currency, pc_balance in pc_balances.items():
if len(pc_balance) > 1:
average_costs[account] = pc_balance.average()
positions = balance.get_positions()
for position in positions:
before_hashes.add((account, hash(position)))
pr(position_line.format('', account, str(position)))
if not positions:
pr(position_line.format('', account, ''))
pr()
pr()
# Print average cost per account, if relevant.
if average_costs:
pr(header.format("Average Costs"))
pr()
for account, average_cost in sorted(average_costs.items()):
for position in average_cost:
pr(position_line.format('', account, str(position)))
pr()
pr()
# Print the entry itself.
dcontext = options_map['dcontext']
pr(header.format("Unbooked Transaction"))
pr()
if parsed_entry:
printer.print_entry(parsed_entry, dcontext, render_weights=True, file=oss)
pr()
pr(header.format("Transaction"))
pr()
printer.print_entry(entry, dcontext, render_weights=True, file=oss)
pr()
if isinstance(entry, data.Transaction):
pr(header.format("Residual and Tolerances"))
pr()
# Print residuals.
residual = interpolate.compute_residual(entry.postings)
if not residual.is_empty():
# Note: We render the residual at maximum precision, for debugging.
pr('Residual: {}'.format(residual))
# Dump the tolerances used.
tolerances = interpolate.infer_tolerances(entry.postings, options_map)
if tolerances:
pr('Tolerances: {}'.format(
', '.join('{}={}'.format(key, value)
for key, value in sorted(tolerances.items()))))
# Compute the total cost basis.
cost_basis = inventory.Inventory(
pos for pos in entry.postings if pos.cost is not None
).reduce(convert.get_cost)
if not cost_basis.is_empty():
pr('Basis: {}'.format(cost_basis))
pr()
pr()
# Print the context after.
pr(header.format("Balances after transaction"))
pr()
for account in accounts:
positions = balance_after[account].get_positions()
for position in positions:
changed = (account, hash(position)) not in before_hashes
print(position_line.format('*' if changed else '', account, str(position)),
file=oss)
if not positions:
pr(position_line.format('', account, ''))
pr()
return oss.getvalue()
beancount.parser.context.render_file_context(entries, options_map, filename, lineno)
渲染特定交易应用前后的上下文。
| 参数: |
|
|---|
| 返回: |
|
|---|
beancount/parser/context.py 中的源代码
def render_file_context(entries, options_map, filename, lineno):
"""Render the context before and after a particular transaction is applied.
Args:
entries: A list of directives.
options_map: A dict of options, as produced by the parser.
filename: A string, the name of the file from which the transaction was parsed.
lineno: An integer, the line number in the file the transaction was parsed from.
Returns:
A multiline string of text, which consists of the context before the
transaction is applied, the transaction itself, and the context after it
is applied. You can just print that, it is in form that is intended to be
consumed by the user.
"""
# Find the closest entry.
closest_entry = data.find_closest(entries, filename, lineno)
if closest_entry is None:
raise SystemExit("No entry could be found before {}:{}".format(filename, lineno))
# Run just the parser stage (no booking nor interpolation, which would
# remove the postings) on the input file to produced the corresponding
# unbooked transaction, so that we can get the list of accounts.
if path.exists(filename):
parsed_entries, _, __= parser.parse_file(filename)
# Note: We cannot bisect as we cannot rely on sorting behavior from the parser.
lineno = closest_entry.meta['lineno']
closest_parsed_entries = [parsed_entry
for parsed_entry in parsed_entries
if parsed_entry.meta['lineno'] == lineno]
if len(closest_parsed_entries) != 1:
# This is an internal error, this should never occur.
raise RuntimeError(
"Parsed entry corresponding to real entry not found in original filename.")
closest_parsed_entry = next(iter(closest_parsed_entries))
else:
closest_parsed_entry = None
return render_entry_context(entries, options_map, closest_entry, closest_parsed_entry)
beancount.parser.grammar
Beancount 语法构建器。
beancount.parser.grammar.Builder (LexBuilder)
一种由词法分析器和语法解析器使用的构建器,作为回调函数,用于根据输入文件中解析出的规则创建对应的数据对象。
beancount.parser.grammar.Builder.account(self, filename, lineno, account)
检查账户名称的有效性。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def account(self, filename, lineno, account):
"""Check account name validity.
Args:
account: a str, the account name.
Returns:
A string, the account name.
"""
if not self.account_regexp.match(account):
meta = new_metadata(filename, lineno)
self.errors.append(
ParserError(meta, "Invalid account name: {}".format(account), None))
# Intern account names. This should reduces memory usage a
# fair bit because these strings are repeated liberally.
return self.accounts.setdefault(account, account)
beancount.parser.grammar.Builder.amount(self, filename, lineno, number, currency)
处理金额语法规则。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def amount(self, filename, lineno, number, currency):
"""Process an amount grammar rule.
Args:
number: a Decimal instance, the number of the amount.
currency: a currency object (a str, really, see CURRENCY above)
Returns:
An instance of Amount.
"""
# Update the mapping that stores the parsed precisions.
# Note: This is relatively slow, adds about 70ms because of number.as_tuple().
self._dcupdate(number, currency)
return Amount(number, currency)
beancount.parser.grammar.Builder.balance(self, filename, lineno, date, account, amount, tolerance, kvlist)
处理平衡声明。
默认情况下,我们在此处不产生任何错误。我们将在后续的验证例程中替换那些失败的断言,以确认它们是否成功或失败。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def balance(self, filename, lineno, date, account, amount, tolerance, kvlist):
"""Process an assertion directive.
We produce no errors here by default. We replace the failing ones in the
routine that does the verification later one, that these have succeeded
or failed.
Args:
filename: The current filename.
lineno: The current line number.
date: A datetime object.
account: A string, the account to balance.
amount: The expected amount, to be checked.
tolerance: The tolerance number.
kvlist: a list of KeyValue instances.
Returns:
A new Balance object.
"""
diff_amount = None
meta = new_metadata(filename, lineno, kvlist)
return Balance(meta, date, account, amount, tolerance, diff_amount)
beancount.parser.grammar.Builder.build_grammar_error(self, filename, lineno, exc_value, exc_type=None, exc_traceback=None)
构建一个语法错误,并将其添加到待处理错误列表中。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def build_grammar_error(self, filename, lineno, exc_value,
exc_type=None, exc_traceback=None):
"""Build a grammar error and appends it to the list of pending errors.
Args:
filename: The current filename
lineno: The current line number
excvalue: The exception value, or a str, the message of the error.
exc_type: An exception type, if an exception occurred.
exc_traceback: A traceback object.
"""
if exc_type is not None:
assert not isinstance(exc_value, str)
strings = traceback.format_exception_only(exc_type, exc_value)
tblist = traceback.extract_tb(exc_traceback)
filename, lineno, _, __ = tblist[0]
message = '{} ({}:{})'.format(strings[0], filename, lineno)
else:
message = str(exc_value)
meta = new_metadata(filename, lineno)
self.errors.append(
ParserSyntaxError(meta, message, None))
beancount.parser.grammar.Builder.close(self, filename, lineno, date, account, kvlist)
处理关闭声明。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def close(self, filename, lineno, date, account, kvlist):
"""Process a close directive.
Args:
filename: The current filename.
lineno: The current line number.
date: A datetime object.
account: A string, the name of the account.
kvlist: a list of KeyValue instances.
Returns:
A new Close object.
"""
meta = new_metadata(filename, lineno, kvlist)
return Close(meta, date, account)
beancount.parser.grammar.Builder.commodity(self, filename, lineno, date, currency, kvlist)
处理关闭声明。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def commodity(self, filename, lineno, date, currency, kvlist):
"""Process a close directive.
Args:
filename: The current filename.
lineno: The current line number.
date: A datetime object.
currency: A string, the commodity being declared.
kvlist: a list of KeyValue instances.
Returns:
A new Close object.
"""
meta = new_metadata(filename, lineno, kvlist)
return Commodity(meta, date, currency)
beancount.parser.grammar.Builder.compound_amount(self, filename, lineno, number_per, number_total, currency)
处理金额语法规则。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def compound_amount(self, filename, lineno, number_per, number_total, currency):
"""Process an amount grammar rule.
Args:
number_per: a Decimal instance, the number of the cost per share.
number_total: a Decimal instance, the number of the cost over all shares.
currency: a currency object (a str, really, see CURRENCY above)
Returns:
A triple of (Decimal, Decimal, currency string) to be processed further when
creating the final per-unit cost number.
"""
# Update the mapping that stores the parsed precisions.
# Note: This is relatively slow, adds about 70ms because of number.as_tuple().
self._dcupdate(number_per, currency)
self._dcupdate(number_total, currency)
# Note that we are not able to reduce the value to a number per-share
# here because we only get the number of units in the full lot spec.
return CompoundAmount(number_per, number_total, currency)
beancount.parser.grammar.Builder.cost_merge(self, filename, lineno, _)
创建一个“合并成本”标记。
源代码位于 beancount/parser/grammar.py
def cost_merge(self, filename, lineno, _):
"""Create a 'merge cost' token."""
return MERGE_COST
beancount.parser.grammar.Builder.cost_spec(self, filename, lineno, cost_comp_list, is_total)
处理 cost_spec 语法规则。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def cost_spec(self, filename, lineno, cost_comp_list, is_total):
"""Process a cost_spec grammar rule.
Args:
cost_comp_list: A list of CompoundAmount, a datetime.date, or
label ID strings.
is_total: Assume only the total cost is specified; reject the <number> # <number>
syntax, that is, no compound amounts may be specified. This is used to support
the {{...}} syntax.
Returns:
A cost-info tuple of CompoundAmount, lot date and label string. Any of these
may be set to a sentinel indicating "unset".
"""
if not cost_comp_list:
return CostSpec(MISSING, None, MISSING, None, None, False)
assert isinstance(cost_comp_list, list), (
"Internal error in parser: {}".format(cost_comp_list))
compound_cost = None
date_ = None
label = None
merge = None
for comp in cost_comp_list:
if isinstance(comp, CompoundAmount):
if compound_cost is None:
compound_cost = comp
else:
self.errors.append(
ParserError(new_metadata(filename, lineno),
"Duplicate cost: '{}'.".format(comp), None))
elif isinstance(comp, date):
if date_ is None:
date_ = comp
else:
self.errors.append(
ParserError(new_metadata(filename, lineno),
"Duplicate date: '{}'.".format(comp), None))
elif comp is MERGE_COST:
if merge is None:
merge = True
self.errors.append(
ParserError(new_metadata(filename, lineno),
"Cost merging is not supported yet", None))
else:
self.errors.append(
ParserError(new_metadata(filename, lineno),
"Duplicate merge-cost spec", None))
else:
assert isinstance(comp, str), (
"Currency component is not string: '{}'".format(comp))
if label is None:
label = comp
else:
self.errors.append(
ParserError(new_metadata(filename, lineno),
"Duplicate label: '{}'.".format(comp), None))
# If there was a cost_comp_list, thus a "{...}" cost basis spec, you must
# indicate that by creating a CompoundAmount(), always.
if compound_cost is None:
number_per, number_total, currency = MISSING, None, MISSING
else:
number_per, number_total, currency = compound_cost
if is_total:
if number_total is not None:
self.errors.append(
ParserError(
new_metadata(filename, lineno),
("Per-unit cost may not be specified using total cost "
"syntax: '{}'; ignoring per-unit cost").format(compound_cost),
None))
# Ignore per-unit number.
number_per = ZERO
else:
# There's a single number specified; interpret it as a total cost.
number_total = number_per
number_per = ZERO
if merge is None:
merge = False
return CostSpec(number_per, number_total, currency, date_, label, merge)
beancount.parser.grammar.Builder.custom(self, filename, lineno, date, dir_type, custom_values, kvlist)
处理自定义指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def custom(self, filename, lineno, date, dir_type, custom_values, kvlist):
"""Process a custom directive.
Args:
filename: the current filename.
lineno: the current line number.
date: a datetime object.
dir_type: A string, a type for the custom directive being parsed.
custom_values: A list of the various tokens seen on the same line.
kvlist: a list of KeyValue instances.
Returns:
A new Custom object.
"""
meta = new_metadata(filename, lineno, kvlist)
return Custom(meta, date, dir_type, custom_values)
beancount.parser.grammar.Builder.custom_value(self, filename, lineno, value, dtype=None)
创建一个自定义值对象及其类型。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def custom_value(self, filename, lineno, value, dtype=None):
"""Create a custom value object, along with its type.
Args:
value: One of the accepted custom values.
Returns:
A pair of (value, dtype) where 'dtype' is the datatype is that of the
value.
"""
if dtype is None:
dtype = type(value)
return ValueType(value, dtype)
beancount.parser.grammar.Builder.document(self, filename, lineno, date, account, document_filename, tags_links, kvlist)
处理文档指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def document(self, filename, lineno, date, account, document_filename, tags_links,
kvlist):
"""Process a document directive.
Args:
filename: the current filename.
lineno: the current line number.
date: a datetime object.
account: an Account instance.
document_filename: a str, the name of the document file.
tags_links: The current TagsLinks accumulator.
kvlist: a list of KeyValue instances.
Returns:
A new Document object.
"""
meta = new_metadata(filename, lineno, kvlist)
if not path.isabs(document_filename):
document_filename = path.abspath(path.join(path.dirname(filename),
document_filename))
tags, links = self._finalize_tags_links(tags_links.tags, tags_links.links)
return Document(meta, date, account, document_filename, tags, links)
beancount.parser.grammar.Builder.event(self, filename, lineno, date, event_type, description, kvlist)
处理一个事件指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def event(self, filename, lineno, date, event_type, description, kvlist):
"""Process an event directive.
Args:
filename: the current filename.
lineno: the current line number.
date: a datetime object.
event_type: a str, the name of the event type.
description: a str, the event value, the contents.
kvlist: a list of KeyValue instances.
Returns:
A new Event object.
"""
meta = new_metadata(filename, lineno, kvlist)
return Event(meta, date, event_type, description)
beancount.parser.grammar.Builder.finalize(self)
完成解析器,检查最终错误并返回三元组。
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def finalize(self):
"""Finalize the parser, check for final errors and return the triple.
Returns:
A triple of
entries: A list of parsed directives, which may need completion.
errors: A list of errors, hopefully empty.
options_map: A dict of options.
"""
# If the user left some tags unbalanced, issue an error.
for tag in self.tags:
meta = new_metadata(self.options['filename'], 0)
self.errors.append(
ParserError(meta, "Unbalanced pushed tag: '{}'".format(tag), None))
# If the user left some metadata unpopped, issue an error.
for key, value_list in self.meta.items():
meta = new_metadata(self.options['filename'], 0)
self.errors.append(
ParserError(meta, (
"Unbalanced metadata key '{}'; leftover metadata '{}'").format(
key, ', '.join(value_list)), None))
# Weave the commas option in the DisplayContext itself, so it propagates
# everywhere it is used automatically.
self.dcontext.set_commas(self.options['render_commas'])
return (self.get_entries(), self.errors, self.get_options())
beancount.parser.grammar.Builder.get_entries(self)
返回累积的条目。
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def get_entries(self):
"""Return the accumulated entries.
Returns:
A list of sorted directives.
"""
return sorted(self.entries, key=data.entry_sortkey)
beancount.parser.grammar.Builder.get_long_string_maxlines(self)
参见基类。
源代码位于 beancount/parser/grammar.py
def get_long_string_maxlines(self):
"""See base class."""
return self.options['long_string_maxlines']
beancount.parser.grammar.Builder.get_options(self)
返回最终的选项映射。
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def get_options(self):
"""Return the final options map.
Returns:
A dict of option names to options.
"""
# Build and store the inferred DisplayContext instance.
self.options['dcontext'] = self.dcontext
return self.options
beancount.parser.grammar.Builder.handle_list(self, filename, lineno, object_list, new_object)
通用处理递归列表语法规则。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def handle_list(self, filename, lineno, object_list, new_object):
"""Handle a recursive list grammar rule, generically.
Args:
object_list: the current list of objects.
new_object: the new object to be added.
Returns:
The new, updated list of objects.
"""
if object_list is None:
object_list = []
if new_object is not None:
object_list.append(new_object)
return object_list
beancount.parser.grammar.Builder.include(self, filename, lineno, include_filename)
处理 include 指令。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def include(self, filename, lineno, include_filename):
"""Process an include directive.
Args:
filename: current filename.
lineno: current line number.
include_name: A string, the name of the file to include.
"""
self.options['include'].append(include_filename)
beancount.parser.grammar.Builder.key_value(self, filename, lineno, key, value)
处理文档指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def key_value(self, filename, lineno, key, value):
"""Process a document directive.
Args:
filename: The current filename.
lineno: The current line number.
date: A datetime object.
account: A string, the account the document relates to.
document_filename: A str, the name of the document file.
Returns:
A new KeyValue object.
"""
return KeyValue(key, value)
beancount.parser.grammar.Builder.note(self, filename, lineno, date, account, comment, tags_links, kvlist)
处理注释指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def note(self, filename, lineno, date, account, comment, tags_links, kvlist):
"""Process a note directive.
Args:
filename: The current filename.
lineno: The current line number.
date: A datetime object.
account: A string, the account to attach the note to.
comment: A str, the note's comments contents.
kvlist: a list of KeyValue instances.
Returns:
A new Note object.
"""
meta = new_metadata(filename, lineno, kvlist)
tags, links = self._finalize_tags_links(tags_links.tags, tags_links.links)
return Note(meta, date, account, comment, tags, links)
beancount.parser.grammar.Builder.open(self, filename, lineno, date, account, currencies, booking_str, kvlist)
处理开户指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def open(self, filename, lineno, date, account, currencies, booking_str, kvlist):
"""Process an open directive.
Args:
filename: The current filename.
lineno: The current line number.
date: A datetime object.
account: A string, the name of the account.
currencies: A list of constraint currencies.
booking_str: A string, the booking method, or None if none was specified.
kvlist: a list of KeyValue instances.
Returns:
A new Open object.
"""
meta = new_metadata(filename, lineno, kvlist)
error = False
if booking_str:
try:
# Note: Somehow the 'in' membership operator is not defined on Enum.
booking = Booking[booking_str]
except KeyError:
# If the per-account method is invalid, set it to the global
# default method and continue.
booking = self.options['booking_method']
error = True
else:
booking = None
entry = Open(meta, date, account, currencies, booking)
if error:
self.errors.append(ParserError(meta,
"Invalid booking method: {}".format(booking_str),
entry))
return entry
beancount.parser.grammar.Builder.option(self, filename, lineno, key, value)
处理选项指令。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def option(self, filename, lineno, key, value):
"""Process an option directive.
Args:
filename: current filename.
lineno: current line number.
key: option's key (str)
value: option's value
"""
if key not in self.options:
meta = new_metadata(filename, lineno)
self.errors.append(
ParserError(meta, "Invalid option: '{}'".format(key), None))
elif key in options.READ_ONLY_OPTIONS:
meta = new_metadata(filename, lineno)
self.errors.append(
ParserError(meta, "Option '{}' may not be set".format(key), None))
else:
option_descriptor = options.OPTIONS[key]
# Issue a warning if the option is deprecated.
if option_descriptor.deprecated:
assert isinstance(option_descriptor.deprecated, str), "Internal error."
meta = new_metadata(filename, lineno)
self.errors.append(
DeprecatedError(meta, option_descriptor.deprecated, None))
# Rename the option if it has an alias.
if option_descriptor.alias:
key = option_descriptor.alias
option_descriptor = options.OPTIONS[key]
# Convert the value, if necessary.
if option_descriptor.converter:
try:
value = option_descriptor.converter(value)
except ValueError as exc:
meta = new_metadata(filename, lineno)
self.errors.append(
ParserError(meta,
"Error for option '{}': {}".format(key, exc),
None))
return
option = self.options[key]
if isinstance(option, list):
# Append to a list of values.
option.append(value)
elif isinstance(option, dict):
# Set to a dict of values.
if not (isinstance(value, tuple) and len(value) == 2):
self.errors.append(
ParserError(
meta, "Error for option '{}': {}".format(key, value), None))
return
dict_key, dict_value = value
option[dict_key] = dict_value
elif isinstance(option, bool):
# Convert to a boolean.
if not isinstance(value, bool):
value = (value.lower() in {'true', 'on'}) or (value == '1')
self.options[key] = value
else:
# Set the value.
self.options[key] = value
# Refresh the list of valid account regexps as we go along.
if key.startswith('name_'):
# Update the set of valid account types.
self.account_regexp = valid_account_regexp(self.options)
beancount.parser.grammar.Builder.pad(self, filename, lineno, date, account, source_account, kvlist)
处理补足指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def pad(self, filename, lineno, date, account, source_account, kvlist):
"""Process a pad directive.
Args:
filename: The current filename.
lineno: The current line number.
date: A datetime object.
account: A string, the account to be padded.
source_account: A string, the account to pad from.
kvlist: a list of KeyValue instances.
Returns:
A new Pad object.
"""
meta = new_metadata(filename, lineno, kvlist)
return Pad(meta, date, account, source_account)
beancount.parser.grammar.Builder.pipe_deprecated_error(self, filename, lineno)
发出“管道已弃用”错误。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def pipe_deprecated_error(self, filename, lineno):
"""Issue a 'Pipe deprecated' error.
Args:
filename: The current filename
lineno: The current line number
"""
if self.options['allow_pipe_separator']:
return
meta = new_metadata(filename, lineno)
self.errors.append(
ParserSyntaxError(meta, "Pipe symbol is deprecated.", None))
beancount.parser.grammar.Builder.plugin(self, filename, lineno, plugin_name, plugin_config)
处理插件指令。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def plugin(self, filename, lineno, plugin_name, plugin_config):
"""Process a plugin directive.
Args:
filename: current filename.
lineno: current line number.
plugin_name: A string, the name of the plugin module to import.
plugin_config: A string or None, an optional configuration string to
pass in to the plugin module.
"""
self.options['plugin'].append((plugin_name, plugin_config))
beancount.parser.grammar.Builder.popmeta(self, filename, lineno, key)
从当前元数据栈中移除一个键。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def popmeta(self, filename, lineno, key):
"""Removed a key off the current set of stacks.
Args:
key: A string, a key to be removed from the meta dict.
"""
try:
if key not in self.meta:
raise IndexError
value_list = self.meta[key]
value_list.pop(-1)
if not value_list:
self.meta.pop(key)
except IndexError:
meta = new_metadata(filename, lineno)
self.errors.append(
ParserError(meta,
"Attempting to pop absent metadata key: '{}'".format(key),
None))
beancount.parser.grammar.Builder.poptag(self, filename, lineno, tag)
从当前的堆栈集合中弹出一个标签。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def poptag(self, filename, lineno, tag):
"""Pop a tag off the current set of stacks.
Args:
tag: A string, a tag to be removed from the current set of tags.
"""
try:
self.tags.remove(tag)
except ValueError:
meta = new_metadata(filename, lineno)
self.errors.append(
ParserError(meta, "Attempting to pop absent tag: '{}'".format(tag), None))
beancount.parser.grammar.Builder.posting(self, filename, lineno, account, units, cost, price, istotal, flag)
处理一个分录语法规则。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def posting(self, filename, lineno, account, units, cost, price, istotal, flag):
"""Process a posting grammar rule.
Args:
filename: the current filename.
lineno: the current line number.
account: A string, the account of the posting.
units: An instance of Amount for the units.
cost: An instance of CostSpec for the cost.
price: Either None, or an instance of Amount that is the cost of the position.
istotal: A bool, True if the price is for the total amount being parsed, or
False if the price is for each lot of the position.
flag: A string, one-character, the flag associated with this posting.
Returns:
A new Posting object, with no parent entry.
"""
meta = new_metadata(filename, lineno)
# Prices may not be negative.
if price and isinstance(price.number, Decimal) and price.number < ZERO:
self.errors.append(
ParserError(meta, (
"Negative prices are not allowed: {} "
"(see http://furius.ca/beancount/doc/bug-negative-prices "
"for workaround)"
).format(price), None))
# Fix it and continue.
price = Amount(abs(price.number), price.currency)
# If the price is specified for the entire amount, compute the effective
# price here and forget about that detail of the input syntax.
if istotal:
if units.number is MISSING:
# Note: we could potentially do a better job and attempt to fix
# this up after interpolation, but this syntax is pretty rare
# anyway.
self.errors.append(ParserError(
meta, ("Total price on a posting without units: {}.").format(price),
None))
price = None
else:
price_number = price.number
if price_number is not MISSING:
price_number = (ZERO
if units.number == ZERO
else price_number/abs(units.number))
price = Amount(price_number, price.currency)
# Note: Allow zero prices because we need them for round-trips for
# conversion entries.
#
# if price is not None and price.number == ZERO:
# self.errors.append(
# ParserError(meta, "Price is zero: {}".format(price), None))
# If both cost and price are specified, the currencies must match, or
# that is an error.
if (cost is not None and
price is not None and
isinstance(cost.currency, str) and
isinstance(price.currency, str) and
cost.currency != price.currency):
self.errors.append(
ParserError(meta,
"Cost and price currencies must match: {} != {}".format(
cost.currency, price.currency), None))
return Posting(account, units, cost, price, chr(flag) if flag else None, meta)
beancount.parser.grammar.Builder.price(self, filename, lineno, date, currency, amount, kvlist)
处理一个价格指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def price(self, filename, lineno, date, currency, amount, kvlist):
"""Process a price directive.
Args:
filename: the current filename.
lineno: the current line number.
date: a datetime object.
currency: the currency to be priced.
amount: an instance of Amount, that is the price of the currency.
kvlist: a list of KeyValue instances.
Returns:
A new Price object.
"""
meta = new_metadata(filename, lineno, kvlist)
return Price(meta, date, currency, amount)
beancount.parser.grammar.Builder.pushmeta(self, filename, lineno, key_value)
将元数据字段设置到当前的键值对集合中,以添加到交易中。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def pushmeta(self, filename, lineno, key_value):
"""Set a metadata field on the current key-value pairs to be added to transactions.
Args:
key_value: A KeyValue instance, to be added to the dict of metadata.
"""
key, value = key_value
self.meta[key].append(value)
beancount.parser.grammar.Builder.pushtag(self, filename, lineno, tag)
将一个标签压入当前的标签集合。
注意,此操作不需要保持堆栈顺序。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def pushtag(self, filename, lineno, tag):
"""Push a tag on the current set of tags.
Note that this does not need to be stack ordered.
Args:
tag: A string, a tag to be added.
"""
self.tags.append(tag)
beancount.parser.grammar.Builder.query(self, filename, lineno, date, query_name, query_string, kvlist)
处理文档指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def query(self, filename, lineno, date, query_name, query_string, kvlist):
"""Process a document directive.
Args:
filename: the current filename.
lineno: the current line number.
date: a datetime object.
query_name: a str, the name of the query.
query_string: a str, the SQL query itself.
kvlist: a list of KeyValue instances.
Returns:
A new Query object.
"""
meta = new_metadata(filename, lineno, kvlist)
return Query(meta, date, query_name, query_string)
beancount.parser.grammar.Builder.store_result(self, filename, lineno, entries)
开始规则在此处存储最终结果。
| 参数: |
|
|---|
源代码位于 beancount/parser/grammar.py
def store_result(self, filename, lineno, entries):
"""Start rule stores the final result here.
Args:
entries: A list of entries to store.
"""
if entries:
self.entries = entries
# Also record the name of the processed file.
self.options['filename'] = filename
beancount.parser.grammar.Builder.tag_link_LINK(self, filename, lineno, tags_links, link)
将链接添加到 TagsLinks 累加器中。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def tag_link_LINK(self, filename, lineno, tags_links, link):
"""Add a link to the TagsLinks accumulator.
Args:
tags_links: The current TagsLinks accumulator.
link: A string, the new link to insert.
Returns:
An updated TagsLinks instance.
"""
tags_links.links.add(link)
return tags_links
beancount.parser.grammar.Builder.tag_link_TAG(self, filename, lineno, tags_links, tag)
将标签添加到 TagsLinks 累加器中。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def tag_link_TAG(self, filename, lineno, tags_links, tag):
"""Add a tag to the TagsLinks accumulator.
Args:
tags_links: The current TagsLinks accumulator.
tag: A string, the new tag to insert.
Returns:
An updated TagsLinks instance.
"""
tags_links.tags.add(tag)
return tags_links
beancount.parser.grammar.Builder.tag_link_new(self, filename, lineno)
创建一个新的 TagsLinks 实例。
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def tag_link_new(self, filename, lineno):
"""Create a new TagsLinks instance.
Returns:
An instance of TagsLinks, initialized with expected attributes.
"""
return TagsLinks(set(), set())
beancount.parser.grammar.Builder.transaction(self, filename, lineno, date, flag, txn_strings, tags_links, posting_or_kv_list)
处理交易指令。
此时所有交易的分录均已可用,因此交易在此处达到平衡,不完整的分录将通过适当的位置补全,错误将累积在构建器中,以便后续报告。
这是占用大部分解析时间的主要例程;请谨慎修改此处代码,因为它们会对性能产生影响。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def transaction(self, filename, lineno, date, flag, txn_strings, tags_links,
posting_or_kv_list):
"""Process a transaction directive.
All the postings of the transaction are available at this point, and so the
the transaction is balanced here, incomplete postings are completed with the
appropriate position, and errors are being accumulated on the builder to be
reported later on.
This is the main routine that takes up most of the parsing time; be very
careful with modifications here, they have an impact on performance.
Args:
filename: the current filename.
lineno: the current line number.
date: a datetime object.
flag: a str, one-character, the flag associated with this transaction.
txn_strings: A list of strings, possibly empty, possibly longer.
tags_links: A TagsLinks namedtuple of tags, and/or links.
posting_or_kv_list: a list of Posting or KeyValue instances, to be inserted in
this transaction, or None, if no postings have been declared.
Returns:
A new Transaction object.
"""
meta = new_metadata(filename, lineno)
# Separate postings and key-values.
explicit_meta = {}
postings = []
tags, links = tags_links.tags, tags_links.links
if posting_or_kv_list:
last_posting = None
for posting_or_kv in posting_or_kv_list:
if isinstance(posting_or_kv, Posting):
postings.append(posting_or_kv)
last_posting = posting_or_kv
elif isinstance(posting_or_kv, TagsLinks):
if postings:
self.errors.append(ParserError(
meta,
"Tags or links not allowed after first " +
"Posting: {}".format(posting_or_kv), None))
else:
tags.update(posting_or_kv.tags)
links.update(posting_or_kv.links)
else:
if last_posting is None:
value = explicit_meta.setdefault(posting_or_kv.key,
posting_or_kv.value)
if value is not posting_or_kv.value:
self.errors.append(ParserError(
meta, "Duplicate metadata field on entry: {}".format(
posting_or_kv), None))
else:
if last_posting.meta is None:
last_posting = last_posting._replace(meta={})
postings.pop(-1)
postings.append(last_posting)
value = last_posting.meta.setdefault(posting_or_kv.key,
posting_or_kv.value)
if value is not posting_or_kv.value:
self.errors.append(ParserError(
meta, "Duplicate posting metadata field: {}".format(
posting_or_kv), None))
# Freeze the tags & links or set to default empty values.
tags, links = self._finalize_tags_links(tags, links)
# Initialize the metadata fields from the set of active values.
if self.meta:
for key, value_list in self.meta.items():
meta[key] = value_list[-1]
# Add on explicitly defined values.
if explicit_meta:
meta.update(explicit_meta)
# Unpack the transaction fields.
payee_narration = self._unpack_txn_strings(txn_strings, meta)
if payee_narration is None:
return None
payee, narration = payee_narration
# We now allow a single posting when its balance is zero, so we
# commented out the check below. If a transaction has a single posting
# with a non-zero balance, it'll get caught below in the booking code.
#
# # Detect when a transaction does not have at least two legs.
# if postings is None or len(postings) < 2:
# self.errors.append(
# ParserError(meta,
# "Transaction with only one posting: {}".format(postings),
# None))
# return None
# If there are no postings, make sure we insert a list object.
if postings is None:
postings = []
# Create the transaction.
return Transaction(meta, date, chr(flag),
payee, narration, tags, links, postings)
beancount.parser.grammar.CompoundAmount (tuple)
CompoundAmount(number_per, number_total, currency)
beancount.parser.grammar.CompoundAmount.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.grammar.CompoundAmount.__new__(_cls, number_per, number_total, currency)
特殊
静态方法
创建一个新的 CompoundAmount(number_per, number_total, currency) 实例。
beancount.parser.grammar.CompoundAmount.__replace__(/, self, **kwds)
特殊
返回一个新的 CompoundAmount 对象,用指定的新值替换相应字段。
源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.grammar.CompoundAmount.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/grammar.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.grammar.DeprecatedError (tuple)
DeprecatedError(source, message, entry)
beancount.parser.grammar.DeprecatedError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.grammar.DeprecatedError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 DeprecatedError(source, message, entry) 的新实例
beancount.parser.grammar.DeprecatedError.__replace__(/, self, **kwds)
特殊
返回一个新的 DeprecatedError 对象,用指定的新值替换字段
源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.grammar.DeprecatedError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/grammar.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.grammar.KeyValue (tuple)
KeyValue(key, value)
beancount.parser.grammar.KeyValue.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.grammar.KeyValue.__new__(_cls, key, value)
特殊
静态方法
创建 KeyValue(key, value) 的新实例
beancount.parser.grammar.KeyValue.__replace__(/, self, **kwds)
特殊
返回一个新的 KeyValue 对象,用指定的新值替换字段
源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.grammar.KeyValue.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/grammar.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.grammar.ParserError (tuple)
ParserError(source, message, entry)
beancount.parser.grammar.ParserError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.grammar.ParserError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 ParserError(source, message, entry) 的新实例
beancount.parser.grammar.ParserError.__replace__(/, self, **kwds)
特殊
返回一个新的 ParserError 对象,用指定的新值替换字段
源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.grammar.ParserError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/grammar.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.grammar.ParserSyntaxError (tuple)
ParserSyntaxError(source, message, entry)
beancount.parser.grammar.ParserSyntaxError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.grammar.ParserSyntaxError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 ParserSyntaxError(source, message, entry) 的新实例
beancount.parser.grammar.ParserSyntaxError.__replace__(/, self, **kwds)
特殊
返回一个新的 ParserSyntaxError 对象,用指定的新值替换字段
源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.grammar.ParserSyntaxError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/grammar.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.grammar.TagsLinks (tuple)
TagsLinks(tags, links)
beancount.parser.grammar.TagsLinks.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.grammar.TagsLinks.__new__(_cls, tags, links)
特殊
静态方法
创建一个新的 TagsLinks 实例 (tags, links)
beancount.parser.grammar.TagsLinks.__replace__(/, self, **kwds)
特殊
返回一个新的 TagsLinks 对象,用指定的新值替换字段
源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.grammar.TagsLinks.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/grammar.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
ValueType(value, dtype) (元组)
ValueType(value, dtype)
beancount.parser.grammar.ValueType.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.grammar.ValueType.__new__(_cls, value, dtype)
特殊
静态方法
创建一个新的 ValueType 实例 (value, dtype)
beancount.parser.grammar.ValueType.__replace__(/, self, **kwds)
特殊
返回一个新的 ValueType 对象,用指定的新值替换字段
源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.grammar.ValueType.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/grammar.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.grammar.valid_account_regexp(options)
根据选项构建用于验证账户名称的正则表达式。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/grammar.py
def valid_account_regexp(options):
"""Build a regexp to validate account names from the options.
Args:
options: A dict of options, as per beancount.parser.options.
Returns:
A string, a regular expression that will match all account names.
"""
names = map(options.__getitem__, ('name_assets',
'name_liabilities',
'name_equity',
'name_income',
'name_expenses'))
# Replace the first term of the account regular expression with the specific
# names allowed under the options configuration. This code is kept in sync
# with {5672c7270e1e}.
return regex.compile("(?:{})(?:{}{})+".format('|'.join(names),
account.sep,
account.ACC_COMP_NAME_RE))
beancount.parser.hashsrc
计算源文件的哈希值,以便在源文件过期时发出警告。
beancount.parser.hashsrc.check_parser_source_files(parser_module)
检查扩展模块的源文件哈希值,若当前源文件与模块的源文件不一致,则发出警告。
如果源文件未位于 Python 源代码目录中,则忽略警告,因为我们可能正在从已安装的版本运行,此时无需检查(此检查仅对直接从源码运行的用户有用)。
源代码位于 beancount/parser/hashsrc.py
def check_parser_source_files(parser_module: types.ModuleType):
"""Check the extension module's source hash and issue a warning if the
current source differs from that of the module.
If the source files aren't located in the Python source directory, ignore
the warning, we're probably running this from an installed based, in which
case we don't need to check anything (this check is useful only for people
running directly from source).
"""
parser_source_hash = hash_parser_source_files()
if parser_source_hash is None:
return
if parser_module.SOURCE_HASH and parser_module.SOURCE_HASH != parser_source_hash:
warnings.warn(
("The Beancount parser C extension module is out-of-date ('{}' != '{}'). "
"You need to rebuild.").format(parser_module.SOURCE_HASH, parser_source_hash))
beancount.parser.hashsrc.gen_include()
为解析器源哈希生成一个包含文件。
源代码位于 beancount/parser/hashsrc.py
def gen_include():
"""Generate an include file for the parser source hash."""
return textwrap.dedent("""\
#ifndef __BEANCOUNT_PARSER_PARSE_SOURCE_HASH_H__
#define __BEANCOUNT_PARSER_PARSE_SOURCE_HASH_H__
#define PARSER_SOURCE_HASH {source_hash}
#endif // __BEANCOUNT_PARSER_PARSE_SOURCE_HASH_H__
""".format(source_hash=hash_parser_source_files()))
beancount.parser.hashsrc.hash_parser_source_files()
计算解析器 Python 代码的唯一哈希值,以便将其嵌入扩展模块中。该哈希值在加载时用于验证扩展模块与对应的 Python 代码是否匹配;若不匹配,则发出警告,提示您应重新编译扩展模块。
| 返回: |
|
|---|
源代码位于 beancount/parser/hashsrc.py
def hash_parser_source_files():
"""Compute a unique hash of the parser's Python code in order to bake that into
the extension module. This is used at load-time to verify that the extension
module and the corresponding Python codes match each other. If not, it
issues a warning that you should rebuild your extension module.
Returns:
A string, the hexadecimal unique hash of relevant source code that should
trigger a recompilation.
"""
md5 = hashlib.md5()
for filename in PARSER_SOURCE_FILES:
fullname = path.join(path.dirname(__file__), filename)
if not path.exists(fullname):
return None
with open(fullname, 'rb') as file:
md5.update(file.read())
# Note: Prepend a character in front of the hash because under Windows MSDEV
# removes escapes, and if the hash starts with a number it fails to
# recognize this is a string. A small compromise for portability.
return md5.hexdigest()
beancount.parser.lexer
Beancount 语法词法分析器。
beancount.parser.lexer.LexBuilder
仅用于构建词法分析器对象的构建器。
beancount.parser.lexer.LexBuilder.build_lexer_error(self, filename, lineno, message)
构建一个词法分析错误,并将其添加到待处理错误列表中。
| 参数: |
|
|---|
源代码位于 beancount/parser/lexer.py
def build_lexer_error(self, filename, lineno, message): # {0e31aeca3363}
"""Build a lexer error and appends it to the list of pending errors.
Args:
message: The message of the error.
"""
self.errors.append(
LexerError(new_metadata(filename, lineno), str(message), None))
beancount.parser.lexer.LexerError (tuple)
LexerError(source, message, entry)
beancount.parser.lexer.LexerError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/lexer.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.lexer.LexerError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 LexerError(source, message, entry) 的新实例
beancount.parser.lexer.LexerError.__replace__(/, self, **kwds)
特殊
返回一个新的 LexerError 对象,用指定的新值替换字段
源代码位于 beancount/parser/lexer.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.lexer.LexerError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/lexer.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.lexer.lex_iter(file, builder=None)
一个迭代器,生成给定文件中的所有标记。
| 参数: |
|
|---|
生成:输入文件中的所有标记,格式为 (token, lineno, text, value) 元组,其中 token 是表示标记类型的字符串,lineno 是匹配到该标记的输入文件行号,text 是包含精确匹配文本的字节对象,value 是标记的语义值,或为 None。
源代码位于 beancount/parser/lexer.py
def lex_iter(file, builder=None):
"""An iterator that yields all the tokens in the given file.
Args:
file: A string, the filename to run the lexer on, or a file object.
builder: A builder of your choice. If not specified, a LexBuilder is
used and discarded (along with its errors).
Yields:
All the tokens in the input file as ``(token, lineno, text,
value)`` tuples where ``token`` is a string representing the
token kind, ``lineno`` is the line number in the input file
where the token was matched, ``mathed`` is a bytes object
containing the exact text matched, and ``value`` is the semantic
value of the token or None.
"""
with contextlib.ExitStack() as ctx:
# It would be more appropriate here to check for io.RawIOBase but
# that does not work for io.BytesIO despite it implementing the
# readinto() method.
if not isinstance(file, io.IOBase):
file = ctx.enter_context(open(file, 'rb'))
if builder is None:
builder = LexBuilder()
parser = _parser.Parser(builder)
yield from parser.lex(file)
beancount.parser.lexer.lex_iter_string(string, builder=None, **kwargs)
一个迭代器,生成给定字符串中的所有标记。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/lexer.py
def lex_iter_string(string, builder=None, **kwargs):
"""An iterator that yields all the tokens in the given string.
Args:
string: a str or bytes, the contents of the ledger to be parsed.
Returns:
An iterator, see ``lex_iter()`` for details.
"""
if not isinstance(string, bytes):
string = string.encode('utf8')
file = io.BytesIO(string)
yield from lex_iter(file, builder=builder, **kwargs)
beancount.parser.options
选项及其默认值的声明。
beancount.parser.options.OptDesc (tuple)
OptDesc(name, default_value, example_value, converter, deprecated, alias)
beancount.parser.options.OptDesc.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/options.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.options.OptDesc.__new__(_cls, name, default_value, example_value, converter, deprecated, alias)
特殊
静态方法
创建 OptDesc(name, default_value, example_value, converter, deprecated, alias) 的新实例
beancount.parser.options.OptDesc.__replace__(/, self, **kwds)
特殊
返回一个新的 OptDesc 对象,用指定的新值替换字段
源代码位于 beancount/parser/options.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.options.OptDesc.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/options.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.options.OptGroup (元组)
OptGroup(description, options)
beancount.parser.options.OptGroup.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/parser/options.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.parser.options.OptGroup.__new__(_cls, description, options)
特殊
静态方法
创建 OptGroup(description, options) 的新实例
beancount.parser.options.OptGroup.__replace__(/, self, **kwds)
特殊
返回一个新的 OptGroup 对象,用指定的新值替换字段
源代码位于 beancount/parser/options.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.parser.options.OptGroup.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/parser/options.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.parser.options.Opt(name, default_value, example_value=<object object at 0x7f6b937809d0>, converter=None, deprecated=False, alias=None)
OptDesc 的替代构造函数,带有默认值。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/options.py
def Opt(name, default_value,
example_value=UNSET,
converter=None,
deprecated=False,
alias=None):
"""Alternative constructor for OptDesc, with default values.
Args:
name: See OptDesc.
default_value: See OptDesc.
example_value: See OptDesc.
converter: See OptDesc.
deprecated: See OptDesc.
alias: See OptDesc.
Returns:
An instance of OptDesc.
"""
if example_value is UNSET:
example_value = default_value
return OptDesc(name, default_value, example_value, converter, deprecated, alias)
beancount.parser.options.get_account_types(options)
从解析器的选项中提取账户类型名称。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/options.py
def get_account_types(options):
"""Extract the account type names from the parser's options.
Args:
options: a dict of ledger options.
Returns:
An instance of AccountTypes, that contains all the prefixes.
"""
return account_types.AccountTypes(
*[options[key]
for key in ("name_assets",
"name_liabilities",
"name_equity",
"name_income",
"name_expenses")])
beancount.parser.options.get_current_accounts(options)
返回当前收益和折算账户的名称。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/options.py
def get_current_accounts(options):
"""Return account names for the current earnings and conversion accounts.
Args:
options: a dict of ledger options.
Returns:
A tuple of 2 account objects, one for booking current earnings, and one
for current conversions.
"""
equity = options['name_equity']
account_current_earnings = account.join(equity,
options['account_current_earnings'])
account_current_conversions = account.join(equity,
options['account_current_conversions'])
return (account_current_earnings,
account_current_conversions)
beancount.parser.options.get_previous_accounts(options)
返回上期收益、余额和折算账户的账户名称。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/options.py
def get_previous_accounts(options):
"""Return account names for the previous earnings, balances and conversion accounts.
Args:
options: a dict of ledger options.
Returns:
A tuple of 3 account objects, for booking previous earnings,
previous balances, and previous conversions.
"""
equity = options['name_equity']
account_previous_earnings = account.join(equity,
options['account_previous_earnings'])
account_previous_balances = account.join(equity,
options['account_previous_balances'])
account_previous_conversions = account.join(equity,
options['account_previous_conversions'])
return (account_previous_earnings,
account_previous_balances,
account_previous_conversions)
beancount.parser.options.get_unrealized_account(options)
返回未实现损益账户的完整账户名称。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/options.py
def get_unrealized_account(options):
"""Return the full account name for the unrealized account.
Args:
options: a dict of ledger options.
Returns:
A tuple of 2 account objects, one for booking current earnings, and one
for current conversions.
"""
income = options['name_income']
return account.join(income, options['account_unrealized_gains'])
beancount.parser.options.list_options()
生成可用选项及其描述的格式化文本。
| 返回: |
|
|---|
源代码位于 beancount/parser/options.py
def list_options():
"""Produce a formatted text of the available options and their description.
Returns:
A string, formatted nicely to be printed in 80 columns.
"""
oss = io.StringIO()
for group in PUBLIC_OPTION_GROUPS:
for desc in group.options:
oss.write('option "{}" "{}"\n'.format(desc.name, desc.example_value))
if desc.deprecated:
oss.write(textwrap.fill(
"THIS OPTION IS DEPRECATED: {}".format(desc.deprecated),
initial_indent=" ",
subsequent_indent=" "))
oss.write('\n\n')
description = ' '.join(line.strip()
for line in group.description.strip().splitlines())
oss.write(textwrap.fill(description,
initial_indent=' ',
subsequent_indent=' '))
oss.write('\n')
if isinstance(desc.default_value, (list, dict, set)):
oss.write('\n')
oss.write(' (This option may be supplied multiple times.)\n')
oss.write('\n\n')
return oss.getvalue()
beancount.parser.options.options_validate_booking_method(value)
验证记账方法名称。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/options.py
def options_validate_booking_method(value):
"""Validate a booking method name.
Args:
value: A string, the value provided as option.
Returns:
The new value, converted, if the conversion is successful.
Raises:
ValueError: If the value is invalid.
"""
try:
return data.Booking[value]
except KeyError as exc:
raise ValueError(str(exc)) from exc
beancount.parser.options.options_validate_boolean(value)
验证布尔选项。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/options.py
def options_validate_boolean(value):
"""Validate a boolean option.
Args:
value: A string, the value provided as option.
Returns:
The new value, converted, if the conversion is successful.
Raises:
ValueError: If the value is invalid.
"""
return value.lower() in ('1', 'true', 'yes')
beancount.parser.options.options_validate_plugin(value)
验证插件选项。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/options.py
def options_validate_plugin(value):
"""Validate the plugin option.
Args:
value: A string, the value provided as option.
Returns:
The new value, converted, if the conversion is successful.
Raises:
ValueError: If the value is invalid.
"""
# Process the 'plugin' option specially: accept an optional
# argument from it. NOTE: We will eventually phase this out and
# replace it by a dedicated 'plugin' directive.
match = re.match('(.*):(.*)', value)
if match:
plugin_name, plugin_config = match.groups()
else:
plugin_name, plugin_config = value, None
return (plugin_name, plugin_config)
beancount.parser.options.options_validate_processing_mode(value)
验证处理模式选项。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/options.py
def options_validate_processing_mode(value):
"""Validate the options processing mode.
Args:
value: A string, the value provided as option.
Returns:
The new value, converted, if the conversion is successful.
Raises:
ValueError: If the value is invalid.
"""
if value not in ('raw', 'default'):
raise ValueError("Invalid value '{}'".format(value))
return value
beancount.parser.options.options_validate_tolerance(value)
验证容差选项。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/options.py
def options_validate_tolerance(value):
"""Validate the tolerance option.
Args:
value: A string, the value provided as option.
Returns:
The new value, converted, if the conversion is successful.
Raises:
ValueError: If the value is invalid.
"""
return D(value)
beancount.parser.options.options_validate_tolerance_map(value)
验证一个包含货币/容差对映射的字符串选项。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/options.py
def options_validate_tolerance_map(value):
"""Validate an option with a map of currency/tolerance pairs in a string.
Args:
value: A string, the value provided as option.
Returns:
The new value, converted, if the conversion is successful.
Raises:
ValueError: If the value is invalid.
"""
# Process the setting of a key-value, whereby the value is a Decimal
# representation.
match = re.match('(.*):(.*)', value)
if not match:
raise ValueError("Invalid value '{}'".format(value))
currency, tolerance_str = match.groups()
return (currency, D(tolerance_str))
beancount.parser.parser
Beancount 语法解析器。
重要:解析器(及其语法构建器)生成的是“不完整”的交易对象。这意味着解析器输出中某些数据可能缺失,部分数据类型也略有不同。缺失的组件不会被替换为 None,而是使用一个特殊常量 'NA',以便在用户意外处理不完整过账时诊断问题。这些不完整条目随后会经过“记账”例程处理,同时完成以下两件事:
- 查找匹配的批次以减少库存头寸,以及
- 插值缺失的数值。
在此过程中,它们将条目标准化为“完整”条目,将头寸/批次的“成本”属性从 CostSpec 转换为 Cost。Cost 与 Amount 类似,具有“数量”和“货币”属性,但还包含标签和批次日期。CostSpec 与 Cost 类似,但所有数据均为可选;它用于指定与特定库存批次匹配的规范。
过账的其他部分也可能缺失,而不仅仅是成本部分。省略输入中的部分内容是为了触发插值,告知 Beancount 自动计算缺失的数值(如果可能)。
缺失的组件将被设置为特殊值 "beancount.core.number.MISSING",直到库存登记和数值插值完成为止。"MISSING" 值在已完成并加载的交易分录中绝不应出现。
例如,所有单位都可能缺失:
INPUT: Assets:Account posting.units = MISSING
或仅单位的数量缺失:
INPUT: Assets:Account USD posting.units = Amount(MISSING, "USD")
您必须始终指定货币。
如果价格注释完全缺失,则显示为 None:
INPUT: Assets:Account 2 MXN posting.price = None
然而,您可以表明存在价格,但让 Beancount 自动计算它:
INPUT: Assets:Account 2 MXN @ posting.price = Amount(MISSING, MISSING)
也可以指定转换货币(推荐):
INPUT: Assets:Account 2 MXN @ USD posting.price = Amount(MISSING, "USD")
如果提供了成本说明,则会设置一个 "cost" 属性,但它不指向 Cost 实例(如完整条目中那样),而是指向 CostSpec 实例。CostSpec 的某些字段若在输入中未指定,则可能为 MISSING。例如:
INPUT: Assets:Account 1 HOOL {100 # 5 USD} posting.cost = CostSpec(Decimal("100"), Decimal("5"), "USD", None, None, False))
请注意,我们从不将标签或日期覆盖视为 MISSING;这是因为这些输入是可选的:缺失的标签在计算出的成本中保持未设置,而缺失的日期覆盖则使用包含该分录的交易日期。
您可以这样表明需要填充一个总数:
INPUT: Assets:Account 1 HOOL {100 # USD} posting.cost = CostSpec(Decimal("100"), MISSING, "USD", None, None, False))
这与完全不使用总数的情况不同:
INPUT: Assets:Account 1 HOOL {100 USD} posting.cost = CostSpec(Decimal("100"), None, "USD", None, None, False))
单位数量和总数均可省略,此时仅 CostSpec 的单位数量部分显示为 MISSING:
INPUT: Assets:Account 1 HOOL {USD} posting.cost = CostSpec(MISSING, None, "USD", None, None, False))
此外,所有成本基础都可能缺失:
INPUT: Assets:Account 1 HOOL {} posting.cost = CostSpec(MISSING, None, MISSING, None, None, False))
如果您要求合并批次,则会得到以下结果:
INPUT: Assets:Account 1 HOOL {*} posting.cost = CostSpec(MISSING, None, MISSING, None, None, True))
这些数值必须由 Beancount 计算,因此我们以 MISSING 值输出。
当然,您也可以仅提供非基础信息,例如仅指定日期或标签:
INPUT: Assets:Account 1 HOOL {2015-09-21} posting.cost = CostSpec(MISSING, None, MISSING, date(2015, 9, 21), None, True)
有关示例及对应的预期值,请参见 test beancount.parser.grammar_test.TestIncompleteInputs。
beancount.parser.parser.is_entry_incomplete(entry)
检测交易中被省略的金额。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/parser.py
def is_entry_incomplete(entry):
"""Detect the presence of elided amounts in Transactions.
Args:
entries: A directive.
Returns:
A boolean, true if there are some missing portions of any postings found.
"""
if isinstance(entry, data.Transaction):
if any(is_posting_incomplete(posting) for posting in entry.postings):
return True
return False
beancount.parser.parser.is_posting_incomplete(posting)
检测记账项中是否存在任何被省略的金额。
如果任何可能的金额缺失,则返回 True。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/parser.py
def is_posting_incomplete(posting):
"""Detect the presence of any elided amounts in a Posting.
If any of the possible amounts are missing, this returns True.
Args:
entries: A directive.
Returns:
A boolean, true if there are some missing portions of any postings found.
"""
units = posting.units
if (units is MISSING or
units.number is MISSING or
units.currency is MISSING):
return True
price = posting.price
if (price is MISSING or
price is not None and (price.number is MISSING or
price.currency is MISSING)):
return True
cost = posting.cost
if cost is not None and (cost.number_per is MISSING or
cost.number_total is MISSING or
cost.currency is MISSING):
return True
return False
beancount.parser.parser.parse_doc(expect_errors=False, allow_incomplete=False)
生成用于解析函数文档字符串作为参数的装饰器的工厂。
请注意,由此生成的装饰器仅在测试中运行解析器,而不运行加载器,因此不会对解析的文本进行验证、余额检查或应用插件。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/parser.py
def parse_doc(expect_errors=False, allow_incomplete=False):
"""Factory of decorators that parse the function's docstring as an argument.
Note that the decorators thus generated only run the parser on the tests,
not the loader, so is no validation, balance checks, nor plugins applied to
the parsed text.
Args:
expect_errors: A boolean or None, with the following semantics,
True: Expect errors and fail if there are none.
False: Expect no errors and fail if there are some.
None: Do nothing, no check.
allow_incomplete: A boolean, if true, allow incomplete input. Otherwise
barf if the input would require interpolation. The default value is set
not to allow it because we want to minimize the features tests depend on.
Returns:
A decorator for test functions.
"""
def decorator(fun):
"""A decorator that parses the function's docstring as an argument.
Args:
fun: the function object to be decorated.
Returns:
A decorated test function.
"""
filename = inspect.getfile(fun)
lines, lineno = inspect.getsourcelines(fun)
# Skip over decorator invocation and function definition. This
# is imperfect as it assumes that each consumes exactly one
# line, but this is by far the most common case, and this is
# mainly used in test, thus it is good enough.
lineno += 2
@functools.wraps(fun)
def wrapper(self):
assert fun.__doc__ is not None, (
"You need to insert a docstring on {}".format(fun.__name__))
entries, errors, options_map = parse_string(fun.__doc__,
report_filename=filename,
report_firstline=lineno,
dedent=True)
if not allow_incomplete and any(is_entry_incomplete(entry)
for entry in entries):
self.fail("parse_doc() may not use interpolation.")
if expect_errors is not None:
if expect_errors is False and errors:
oss = io.StringIO()
printer.print_errors(errors, file=oss)
self.fail("Unexpected errors found:\n{}".format(oss.getvalue()))
elif expect_errors is True and not errors:
self.fail("Expected errors, none found:")
return fun(self, entries, errors, options_map)
return wrapper
return decorator
beancount.parser.parser.parse_file(file, report_filename=None, report_firstline=1, encoding=None, debug=False, **kw)
解析 Beancount 输入文件,并返回包含交易列表和账户树的 Ledger 对象。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/parser.py
def parse_file(file, report_filename=None, report_firstline=1,
encoding=None, debug=False, **kw):
"""Parse a beancount input file and return Ledger with the list of
transactions and tree of accounts.
Args:
file: file object or path to the file to be parsed.
kw: a dict of keywords to be applied to the C parser.
Returns:
A tuple of (
list of entries parsed in the file,
list of errors that were encountered during parsing, and
a dict of the option values that were parsed from the file.)
"""
if encoding is not None and codecs.lookup(encoding).name != 'utf-8':
raise ValueError('Only UTF-8 encoded files are supported.')
with contextlib.ExitStack() as ctx:
if file == '-':
file = sys.stdin.buffer
# It would be more appropriate here to check for io.RawIOBase but
# that does not work for io.BytesIO despite it implementing the
# readinto() method.
elif not isinstance(file, io.IOBase):
file = ctx.enter_context(open(file, 'rb'))
builder = grammar.Builder()
parser = _parser.Parser(builder, debug=debug)
parser.parse(file, filename=report_filename, lineno=report_firstline, **kw)
return builder.finalize()
beancount.parser.parser.parse_many(string, level=0)
解析包含 Beancount 输入片段的字符串,并替换调用者作用域中的变量。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/parser.py
def parse_many(string, level=0):
"""Parse a string with a snippet of Beancount input and replace vars from caller.
Args:
string: A string with some Beancount input.
level: The number of extra stacks to ignore.
Returns:
A list of entries.
Raises:
AssertionError: If there are any errors.
"""
# Get the locals in the stack for the callers and produce the final text.
frame = inspect.stack()[level+1]
varkwds = frame[0].f_locals
input_string = textwrap.dedent(string.format(**varkwds))
# Parse entries and check there are no errors.
entries, errors, __ = parse_string(input_string)
assert not errors
return entries
beancount.parser.parser.parse_one(string)
解析包含单个 Beancount 指令的字符串,并替换调用者作用域中的变量。
| 参数: |
|
|---|
| 返回: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/parser/parser.py
def parse_one(string):
"""Parse a string with single Beancount directive and replace vars from caller.
Args:
string: A string with some Beancount input.
level: The number of extra stacks to ignore.
Returns:
A list of entries.
Raises:
AssertionError: If there are any errors.
"""
entries = parse_many(string, level=1)
assert len(entries) == 1
return entries[0]
beancount.parser.parser.parse_string(string, report_filename=None, dedent=False, **kw)
解析 Beancount 输入文件,并返回包含交易列表和账户树的 Ledger 对象。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/parser.py
def parse_string(string, report_filename=None, dedent=False, **kw):
"""Parse a beancount input file and return Ledger with the list of
transactions and tree of accounts.
Args:
string: A string, the contents to be parsed instead of a file's.
report_filename: A string, the source filename from which this string
has been extracted, if any. This is stored in the metadata of the
parsed entries.
dedent: Whether to run textwrap.dedent() on the string before parsing.
**kw: See parse.c.
Return:
Same as the output of parse_file().
"""
if dedent:
string = textwrap.dedent(string)
if isinstance(string, str):
string = string.encode('utf8')
if report_filename is None:
report_filename = '<string>'
file = io.BytesIO(string)
return parse_file(file, report_filename=report_filename, **kw)
beancount.parser.printer
从内部数据结构转换为文本。
beancount.parser.printer.EntryPrinter
用于打印所有指令类型的多方法接口。
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
dcontext |
一个 DisplayContext 实例,用于渲染所有数字。 |
|
render_weight |
布尔值,若为真,则将持仓的重量作为注释渲染,用于调试。 |
|
min_width_account |
整数,为账户名称预留的最小宽度。 |
|
prefix |
用户自定义的前缀,用于自定义缩进(适用于 Fava)。 |
|
stringify_invalid_types |
如果元数据值无效,则强制转换为字符串以进行输出。 |
beancount.parser.printer.EntryPrinter.__call__(self, obj)
特殊
渲染一条指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/printer.py
def __call__(self, obj):
"""Render a directive.
Args:
obj: The directive to be rendered.
Returns:
A string, the rendered directive.
"""
oss = io.StringIO()
# We write optional entry source for every entry type, hence writing it here
self.write_entry_source(obj.meta, oss, prefix="")
method = getattr(self, obj.__class__.__name__)
method(obj, oss)
return oss.getvalue()
beancount.parser.printer.EntryPrinter.render_posting_strings(self, posting)
渲染持仓的三个组成部分:账户及其可选的持仓标志、头寸,以及头寸的重量。目的是让调用者对齐这些部分。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/printer.py
def render_posting_strings(self, posting):
"""This renders the three components of a posting: the account and its optional
posting flag, the position, and finally, the weight of the position. The
purpose is to align these in the caller.
Args:
posting: An instance of Posting, the posting to render.
Returns:
A tuple of
flag_account: A string, the account name including the flag.
position_str: A string, the rendered position string.
weight_str: A string, the rendered weight of the posting.
"""
# Render a string of the flag and the account.
flag = '{} '.format(render_flag(posting.flag)) if posting.flag else ''
flag_account = flag + posting.account
# Render a string with the amount and cost and optional price, if
# present. Also render a string with the weight.
weight_str = ''
if isinstance(posting.units, amount.Amount):
position_str = position.to_string(posting, self.dformat)
# Note: we render weights at maximum precision, for debugging.
if posting.cost is None or (isinstance(posting.cost, position.Cost) and
isinstance(posting.cost.number, Decimal)):
weight_str = str(convert.get_weight(posting))
else:
position_str = ''
if posting.price is not None:
position_str += ' @ {}'.format(posting.price.to_string(self.dformat_max))
return flag_account, position_str, weight_str
beancount.parser.printer.EntryPrinter.write_entry_source(self, meta, oss, prefix=None)
以 Emacs、VSCode 或其他编辑器可识别的格式写入源文件和行号。由于这是用于“调试”目的,此信息将通过分号注释掉。
| 参数: |
|
|---|
源代码位于 beancount/parser/printer.py
def write_entry_source(self, meta, oss, prefix=None):
"""Write source file and line number in a format interpretable as a message
location for Emacs, VSCode or other editors. As this is for
"debugging" purposes, this information will be commented out by a
semicolon.
Args:
meta: A dict that contains the metadata for this directive.
oss: A file object to write to.
prefix: User-specific prefix for custom indentation
"""
if not self.write_source:
return
if prefix is None:
prefix = self.prefix
oss.write('{}; source: {}\n'.format(prefix, render_source(meta)))
beancount.parser.printer.EntryPrinter.write_metadata(self, meta, oss, prefix=None)
将元数据写入文件对象,不包括文件名和行号。
| 参数: |
|
|---|
源代码位于 beancount/parser/printer.py
def write_metadata(self, meta, oss, prefix=None):
"""Write metadata to the file object, excluding filename and line number.
Args:
meta: A dict that contains the metadata for this directive.
oss: A file object to write to.
"""
if meta is None:
return
if prefix is None:
prefix = self.prefix
# Note: meta.items() is assumed stable from 3.7 onwards; we're not sorting
# on purpose in order to keep the original insertion order in print.
for key, value in meta.items():
if key not in self.META_IGNORE and not key.startswith('__'):
value_str = None
if isinstance(value, str):
value_str = '"{}"'.format(misc_utils.escape_string(value))
elif isinstance(value, (Decimal, datetime.date, amount.Amount, enum.Enum)):
value_str = str(value)
elif isinstance(value, bool):
value_str = 'TRUE' if value else 'FALSE'
elif isinstance(value, (dict, inventory.Inventory)):
pass # Ignore dicts, don't print them out.
elif value is None:
value_str = '' # Render null metadata as empty, on purpose.
else:
if self.stringify_invalid_types:
# This is only intended to be used during development,
# when debugging for custom values of data types
# attached directly and not coming from the parser.
value_str = str(value)
else:
raise ValueError("Unexpected value: '{!r}'".format(value))
if value_str is not None:
oss.write("{}{}: {}\n".format(prefix, key, value_str))
beancount.parser.printer.align_position_strings(strings)
一个辅助函数,用于将渲染后的金额位置对其到第一个货币字符(大写字母)。该函数接受一个渲染后的位置列表,并计算在列中堆叠显示它们所需的宽度,以使第一个货币单词对齐。它不会进一步对齐其他货币(例如价格或成本中的货币)。
这或许用一个例子解释最好。以下位置将围绕标记为 '^' 的列对齐:
45 HOOL {504.30 USD}
4 HOOL {504.30 USD, 2014-11-11}
9.95 USD
-22473.32 CAD @ 1.10 USD ^
不含货币字符的字符串将左对齐显示。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/printer.py
def align_position_strings(strings):
"""A helper used to align rendered amounts positions to their first currency
character (an uppercase letter). This class accepts a list of rendered
positions and calculates the necessary width to render them stacked in a
column so that the first currency word aligns. It does not go beyond that
(further currencies, e.g. for the price or cost, are not aligned).
This is perhaps best explained with an example. The following positions will
be aligned around the column marked with '^':
45 HOOL {504.30 USD}
4 HOOL {504.30 USD, 2014-11-11}
9.95 USD
-22473.32 CAD @ 1.10 USD
^
Strings without a currency character will be rendered flush left.
Args:
strings: A list of rendered position or amount strings.
Returns:
A pair of a list of aligned strings and the width of the aligned strings.
"""
# Maximum length before the alignment character.
max_before = 0
# Maximum length after the alignment character.
max_after = 0
# Maximum length of unknown strings.
max_unknown = 0
string_items = []
search = re.compile('[A-Z]').search
for string in strings:
match = search(string)
if match:
index = match.start()
if index != 0:
max_before = max(index, max_before)
max_after = max(len(string) - index, max_after)
string_items.append((index, string))
continue
# else
max_unknown = max(len(string), max_unknown)
string_items.append((None, string))
# Compute formatting string.
max_total = max(max_before + max_after, max_unknown)
max_after_prime = max_total - max_before
fmt = "{{:>{0}}}{{:{1}}}".format(max_before, max_after_prime).format
fmt_unknown = "{{:<{0}}}".format(max_total).format
# Align the strings and return them.
# pylint: disable=format-string-without-interpolation
aligned_strings = []
for index, string in string_items:
# pylint: disable=format-string-without-interpolation
if index is not None:
string = fmt(string[:index], string[index:])
else:
string = fmt_unknown(string)
aligned_strings.append(string)
return aligned_strings, max_total
beancount.parser.printer.format_entry(entry, dcontext=None, render_weights=False, prefix=None, write_source=False)
将条目格式化为与解析器接受的输入语法相同的字符串。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/printer.py
def format_entry(entry, dcontext=None, render_weights=False, prefix=None,
write_source=False):
"""Format an entry into a string in the same input syntax the parser accepts.
Args:
entry: An entry instance.
dcontext: An instance of DisplayContext used to format the numbers.
render_weights: A boolean, true to render the weights for debugging.
write_source: If true a source file and line number will be written for
each entry in a format interpretable as a message location for Emacs,
VSCode or other editors. As this is for
"debugging" purposes, this information will be commented out by a
semicolon.
Returns:
A string, the formatted entry.
"""
return EntryPrinter(dcontext, render_weights, prefix=prefix,
write_source=write_source)(entry)
beancount.parser.printer.format_error(error)
给定一个错误对象,返回其格式化后的字符串。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/printer.py
def format_error(error):
"""Given an error objects, return a formatted string for it.
Args:
error: a namedtuple objects representing an error. It has to have an
'entry' attribute that may be either a single directive object or a
list of directive objects.
Returns:
A string, the errors rendered.
"""
oss = io.StringIO()
oss.write('{} {}\n'.format(render_source(error.source), error.message))
if error.entry is not None:
entries = error.entry if isinstance(error.entry, list) else [error.entry]
error_string = '\n'.join(format_entry(entry) for entry in entries)
oss.write('\n')
oss.write(textwrap.indent(error_string, ' '))
oss.write('\n')
return oss.getvalue()
beancount.parser.printer.print_entries(entries, dcontext=None, render_weights=False, file=None, prefix=None, write_source=False)
一个便捷函数,用于将条目列表打印到文件。
| 参数: |
|
|---|
源代码位于 beancount/parser/printer.py
def print_entries(entries, dcontext=None, render_weights=False, file=None, prefix=None,
write_source=False):
"""A convenience function that prints a list of entries to a file.
Args:
entries: A list of directives.
dcontext: An instance of DisplayContext used to format the numbers.
render_weights: A boolean, true to render the weights for debugging.
file: An optional file object to write the entries to.
prefix: User-specific prefix for custom indentation (for Fava).
write_source: If true a source file and line number will be written for
each entry in a format interpretable as a message location for Emacs,
VSCode or other editors. This is usefull for "debugging" peurposes,
especially in a multi-file setup
"""
assert isinstance(entries, list), "Entries is not a list: {}".format(entries)
output = file or (codecs.getwriter("utf-8")(sys.stdout.buffer)
if hasattr(sys.stdout, 'buffer') else
sys.stdout)
if prefix:
output.write(prefix)
previous_type = type(entries[0]) if entries else None
eprinter = EntryPrinter(dcontext, render_weights, write_source=write_source)
for entry in entries:
# Insert a newline between transactions and between blocks of directives
# of the same type.
entry_type = type(entry)
if (entry_type in (data.Transaction, data.Commodity) or
entry_type is not previous_type or write_source):
output.write('\n')
previous_type = entry_type
string = eprinter(entry)
output.write(string)
beancount.parser.printer.print_entry(entry, dcontext=None, render_weights=False, file=None, write_source=False)
一个便捷函数,用于将单个条目打印到文件。
| 参数: |
|
|---|
源代码位于 beancount/parser/printer.py
def print_entry(entry, dcontext=None, render_weights=False, file=None,
write_source=False):
"""A convenience function that prints a single entry to a file.
Args:
entry: A directive entry.
dcontext: An instance of DisplayContext used to format the numbers.
render_weights: A boolean, true to render the weights for debugging.
file: An optional file object to write the entries to.
write_source: If true a source file and line number will be written for
each entry in a format interpretable as a message location for Emacs,
VSCode or other editors. This is usefull for "debugging" purposes,
especially in a multi-file setup
"""
# TODO(blais): DO remove this now, it's a huge annoyance not to be able to
# print in-between other statements.
output = file or (codecs.getwriter("utf-8")(sys.stdout.buffer)
if hasattr(sys.stdout, 'buffer') else
sys.stdout)
output.write(format_entry(entry, dcontext, render_weights,
write_source=write_source))
output.write('\n')
beancount.parser.printer.print_error(error, file=None)
一个便捷函数,用于将单个错误输出到文件。
| 参数: |
|
|---|
源代码位于 beancount/parser/printer.py
def print_error(error, file=None):
"""A convenience function that prints a single error to a file.
Args:
error: An error object.
file: An optional file object to write the errors to.
"""
output = file or sys.stdout
output.write(format_error(error))
output.write('\n')
beancount.parser.printer.print_errors(errors, file=None, prefix=None)
一个便捷函数,用于将错误列表输出到文件。
| 参数: |
|
|---|
源代码位于 beancount/parser/printer.py
def print_errors(errors, file=None, prefix=None):
"""A convenience function that prints a list of errors to a file.
Args:
errors: A list of errors.
file: An optional file object to write the errors to.
"""
output = file or sys.stdout
if prefix:
output.write(prefix)
for error in errors:
output.write(format_error(error))
output.write('\n')
beancount.parser.printer.render_flag(inflag)
将标志渲染为字符串,标志可以是 None 或一个字符符号。
源代码位于 beancount/parser/printer.py
def render_flag(inflag: Optional[str]) -> str:
"""Render a flag, which can be None, a symbol of a character to a string."""
if not inflag:
return ''
return inflag
beancount.parser.printer.render_source(meta)
以一种既能被 Emacs 检测到、又能对齐并良好显示的方式渲染错误来源。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/printer.py
def render_source(meta):
"""Render the source for errors in a way that it will be both detected by
Emacs and align and rendered nicely.
Args:
meta: A dict with the metadata.
Returns:
A string, rendered to be interpretable as a message location for Emacs or
other editors.
"""
return '{}:{:8}'.format(meta['filename'], '{}:'.format(meta['lineno']))
beancount.parser.version
在所有程序中实现通用选项。
beancount.parser.version.compute_version_string(version, changeset, timestamp)
根据嵌入在解析器中的变更集和时间戳计算版本字符串。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/parser/version.py
def compute_version_string(version, changeset, timestamp):
"""Compute a version string from the changeset and timestamp baked in the parser.
Args:
version: A string, the version number.
changeset: A string, a version control string identifying the commit of the version.
timestamp: An integer, the UNIX epoch timestamp of the changeset.
Returns:
A human-readable string for the version.
"""
# Shorten changeset.
if changeset:
if re.match('hg:', changeset):
changeset = changeset[:15]
elif re.match('git:', changeset):
changeset = changeset[:12]
# Convert timestamp to a date.
date = None
if timestamp > 0:
date = datetime.datetime.fromtimestamp(timestamp, datetime.UTC).date()
version = 'Beancount {}'.format(version)
if changeset or date:
version = '{} ({})'.format(
version, '; '.join(map(str, filter(None, [changeset, date]))))
return version