beancount.parser

Beancount 输入文件的解析器模块。

beancount.parser.booking

用于‘预订’库存的算法,即在减少库存内容时查找匹配批次的过程。

beancount.parser.booking.BookingError (元组)

BookingError(source, message, entry)

beancount.parser.booking.BookingError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/booking.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.booking.BookingError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 BookingError(source, message, entry) 的新实例

beancount.parser.booking.BookingError.__replace__(/, self, **kwds) 特殊

返回一个新的 BookingError 对象,用指定的新值替换某些字段

源代码位于 beancount/parser/booking.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.booking.BookingError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/booking.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.booking.book(incomplete_entries, options_map, initial_balances=None)

预订库存批次,并补全所有不完整的仓位。

参数:
  • incomplete_entries – 一组指令列表,其中某些过账可能保留了由解析器生成的不完整金额。

  • options_map – 由解析器生成的选项字典。

  • initial_balances – 一个 (账户, 库存) 键值对字典,用于从该状态开始预订。当尝试在现有状态基础上进行预订时非常有用。

返回:
  • 一对条目 – 一个所有过账均已补全的完成条目列表。errors:插值过程中产生的新错误。

源代码位于 beancount/parser/booking.py
def book(incomplete_entries, options_map, initial_balances=None):
    """Book inventory lots and complete all positions with incomplete numbers.

    Args:
      incomplete_entries: A list of directives, with some postings possibly left
        with incomplete amounts as produced by the parser.
      options_map: An options dict as produced by the parser.
      initial_balances: A dict of (account, inventory) pairs to start booking from.
        This is useful when attempting to book on top of an existing state.
    Returns:
      A pair of
        entries: A list of completed entries with all their postings completed.
        errors: New errors produced during interpolation.
    """
    # Get the list of booking methods for each account.
    booking_methods = collections.defaultdict(lambda: options_map["booking_method"])
    for entry in incomplete_entries:
        if isinstance(entry, data.Open) and entry.booking:
            booking_methods[entry.account] = entry.booking

    # Do the booking here!
    entries, booking_errors = booking_full.book(incomplete_entries, options_map,
                                                booking_methods, initial_balances)

    # Check for MISSING elements remaining.
    missing_errors = validate_missing_eliminated(entries, options_map)

    return entries, (booking_errors + missing_errors)

beancount.parser.booking.convert_lot_specs_to_lots(entries)

对所有条目,将过账仓位的 CostSpec 转换为 Cost 实例。在简单方法中,CostSpec 提供的数据必须能明确计算出成本金额。

这本质上复制了旧解析器的工作方式,但允许仓位使用模糊的批次规格而非已解析的规格。我们过去通常在本地计算成本,而此方法会移除 CostSpec,直接生成不含模糊匹配的 Cost。此功能仅用于过渡到新的匹配逻辑。

参数:
  • entries – 按解析器定义的不完整指令列表。

返回:
  • 一组条目,其过账仓位的成本已转换为 Cost 实例,但仍可能不完整。

异常:
  • ValueError – 如果存在不可接受的数值。

源代码位于 beancount/parser/booking.py
def convert_lot_specs_to_lots(entries):
    """For all the entries, convert the posting's position's CostSpec to Cost
    instances. In the simple method, the data provided in the CostSpec must
    unambiguously provide a way to compute the cost amount.

    This essentially replicates the way the old parser used to work, but
    allowing positions to have the fuzzy lot specifications instead of the
    resolved ones. We used to simply compute the costs locally, and this gets
    rid of the CostSpec to produce the Cost without fuzzy matching. This is only
    there for the sake of transition to the new matching logic.

    Args:
      entries: A list of incomplete directives as per the parser.
    Returns:
      A list of entries whose postings's position costs have been converted to
      Cost instances but that may still be incomplete.
    Raises:
      ValueError: If there's a unacceptable number.
    """
    new_entries = []
    errors = []
    for entry in entries:
        if not isinstance(entry, data.Transaction):
            new_entries.append(entry)
            continue

        new_postings = []
        for posting in entry.postings:
            try:
                units = posting.units
                cost_spec = posting.cost
                cost = convert_spec_to_cost(units, cost_spec)
                if cost_spec is not None and cost is None:
                    errors.append(
                        BookingError(entry.meta,
                                     "Cost syntax not supported; cost spec ignored",
                                     None))

                if cost and isinstance(units, amount.Amount):
                    # If there is a cost, we don't allow either a cost value of
                    # zero, nor a zero number of units. Note that we allow a price
                    # of zero as the only special case (for conversion entries), but
                    # never for costs.
                    if units.number == ZERO:
                        raise ValueError('Amount is zero: "{}"'.format(units))
                    if cost.number is not None and cost.number < ZERO:
                        raise ValueError('Cost is negative: "{}"'.format(cost))
            except ValueError as exc:
                errors.append(BookingError(entry.meta, str(exc), None))
                cost = None
            new_postings.append(posting._replace(cost=cost))
        new_entries.append(entry._replace(postings=new_postings))
    return new_entries, errors

beancount.parser.booking.convert_spec_to_cost(units, cost_spec)

将一个记账项的 CostSpec 实例转换为 Cost。

参数:
  • units – 一个 Amount 实例。

  • cost_spec – 一个 CostSpec 实例。

返回:
  • 一个 Cost 实例。

源代码位于 beancount/parser/booking.py
def convert_spec_to_cost(units, cost_spec):
    """Convert a posting's CostSpec instance to a Cost.

    Args:
      units: An instance of Amount.
      cost_spec: An instance of CostSpec.
    Returns:
      An instance of Cost.
    """
    cost = cost_spec
    errors = []
    if isinstance(units, amount.Amount):
        currency = units.currency
        if cost_spec is not None:
            number_per, number_total, cost_currency, date, label, merge = cost_spec

            # Compute the cost.
            if number_per is not MISSING or number_total is not None:
                if number_total is not None:
                    # Compute the per-unit cost if there is some total cost
                    # component involved.
                    units_num = units.number
                    cost_total = number_total
                    if number_per is not MISSING:
                        cost_total += number_per * units_num
                    unit_cost = cost_total / abs(units_num)
                else:
                    unit_cost = number_per
                cost = position.Cost(unit_cost, cost_currency, date, label)
            else:
                cost = None
    return cost

beancount.parser.booking.validate_inventory_booking(entries, unused_options_map, booking_methods)

验证不允许任何按成本计算的持仓变为负数。

此例程检查当记账项减少一个持仓(无论是否存在)时,后续的库存不会导致单位数量为负的持仓。负的单位数量仅在期货价差交易中做空时可能出现,而目前尚不支持此功能。虽然实现此功能并不困难,但我们希望对此保持严格,因为对这一点的严谨性是检测用户数据输入错误的绝佳方式。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

  • booking_methods – 账户名称到记账方法的映射,在主循环中累积。

返回:
  • 一组错误信息。

源代码位于 beancount/parser/booking.py
def validate_inventory_booking(entries, unused_options_map, booking_methods):
    """Validate that no position at cost is allowed to go negative.

    This routine checks that when a posting reduces a position, existing or not,
    that the subsequent inventory does not result in a position with a negative
    number of units. A negative number of units would only be required for short
    trades of trading spreads on futures, and right now this is not supported.
    It would not be difficult to support this, however, but we want to be strict
    about it, because being pedantic about this is otherwise a great way to
    detect user data entry mistakes.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
      booking_methods: A mapping of account name to booking method, accumulated
        in the main loop.
    Returns:
      A list of errors.

    """
    errors = []
    balances = collections.defaultdict(inventory.Inventory)
    for entry in entries:
        if isinstance(entry, data.Transaction):
            for posting in entry.postings:
                # Update the balance of each posting on its respective account
                # without allowing booking to a negative position, and if an error
                # is encountered, catch it and return it.
                running_balance = balances[posting.account]
                position_, _ = running_balance.add_position(posting)

                # Skip this check if the booking method is set to ignore it.
                if booking_methods.get(posting.account, None) == data.Booking.NONE:
                    continue

                # Check if the resulting inventory is mixed, which is not
                # allowed under the STRICT method.
                if running_balance.is_mixed():
                    errors.append(
                        BookingError(
                            entry.meta,
                            ("Reducing position results in inventory with positive "
                             "and negative lots: {}").format(position_),
                            entry))

    return errors

beancount.parser.booking.validate_missing_eliminated(entries, unused_options_map)

验证所有缺失的记账项部分都已被消除。

参数:
  • entries – 一组指令列表。

  • unused_options_map – 一个选项映射。

返回:
  • 一组错误信息。

源代码位于 beancount/parser/booking.py
def validate_missing_eliminated(entries, unused_options_map):
    """Validate that all the missing bits of postings have been eliminated.

    Args:
      entries: A list of directives.
      unused_options_map: An options map.
    Returns:
      A list of errors.
    """
    errors = []
    for entry in entries:
        if isinstance(entry, data.Transaction):
            for posting in entry.postings:
                units = posting.units
                cost = posting.cost
                if (MISSING in (units.number, units.currency) or
                    cost is not None and MISSING in (cost.number, cost.currency,
                                                     cost.date, cost.label)):
                    errors.append(
                        BookingError(entry.meta,
                                     "Transaction has incomplete elements",
                                     entry))
                    break
    return errors

beancount.parser.booking_full

完整的(新)记账实现。

问题描述:

插值与记账相互依赖:通过插值填充的数值可能影响记账过程,而由记账过程得出的数值也可能帮助完成原本定义不足的插值。以下是插值辅助记账的一个示例:

假设 Assets:Investments 的期初库存包含两笔 HOOL 股票仓位,分别以 100.00 美元和 101.00 美元的成本持有,并应用以下交易:

2015-09-30 *
  Assets:Investments   -10 HOOL {USD}
  Assets:Cash               1000 USD
  Income:Gains              -200 USD

插值能够明确推导出每单位 HOOL 的成本为 100 美元,从而得出明确的记账结果。

另一方面,考虑以下交易:

2015-09-30 *
  Assets:Investments    -10 HOOL {USD}
  Assets:Cash               1000 USD
  Income:Gains

此时插值无法成功。如果 Assets:Investments 账户配置为使用先进先出(FIFO)方法,则会选择最旧的 10 股作为成本,从而可以正确插值资本利得。

第一点观察:第二种情况比第一种更常见,而第一种情况可通过明确指定特定成本轻松手动解决。此外,在许多情况下,并非仅存在单一仓位可供减少,且在给定目标成本的情况下确定正确的股票集合是一个未明确定义的问题。

第二点观察:记账仅适用于库存减少,而不适用于库存增加。因此,我们应在库存减少时执行记账,若减少部分未明确定义则尽早报错;而对于库存增加中缺失的数值,则保持其未定义状态,以便在后续阶段由插值填充。

请注意,我们希望处理但可能无法处理的一种情况是带有插值价格的减少,例如:

2015-09-30 *
  Assets:Investments        -10 HOOL {100.00 # USD}
  Expenses:Commission      9.95 USD
  Assets:Cash            990.05 USD

因此我们选择:

1) 首先对库存减少执行记账,保留库存增加部分不变(可能未定义)。已记账记账项的 'cost' 属性将从 CostSpec 转换为 Cost。对于缺少金额的增加记账项,则保留为 CostSpec 实例,以便插值能够填充总额与单位金额。

2) 对生成的条目进行插值计算。在此阶段(如果可能)可通过插值填补库存增加的未定义成本。

3) 最后,将插值后的 CostSpec 实例转换为 Cost 实例。

改进此算法需要在记账和插值步骤上循环,直到所有数值都已确定或无法再进行推断为止。我们可将其作为实验性功能留待后续考虑。我推测,这种情况极少,因此我们不会对上述算法进行改进。

beancount.parser.booking_full.CategorizationError (元组)

CategorizationError(source, message, entry)

beancount.parser.booking_full.CategorizationError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.booking_full.CategorizationError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 CategorizationError(source, message, entry) 的新实例

beancount.parser.booking_full.CategorizationError.__replace__(/, self, **kwds) 特殊

返回一个新的 CategorizationError 对象,用指定的新值替换字段

源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.booking_full.CategorizationError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/booking_full.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.booking_full.InterpolationError (元组)

InterpolationError(source, message, entry)

beancount.parser.booking_full.InterpolationError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.booking_full.InterpolationError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 InterpolationError(source, message, entry) 的新实例

beancount.parser.booking_full.InterpolationError.__replace__(/, self, **kwds) 特殊

返回一个新的 InterpolationError 对象,用指定的新值替换字段

源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.booking_full.InterpolationError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/booking_full.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.booking_full.MissingType (枚举)

缺失数值的类型。

beancount.parser.booking_full.ReductionError (元组)

ReductionError(source, message, entry)

beancount.parser.booking_full.ReductionError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.booking_full.ReductionError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 ReductionError(source, message, entry) 的新实例

beancount.parser.booking_full.ReductionError.__replace__(/, self, **kwds) 特殊

返回一个新的 ReductionError 对象,用指定的新值替换字段

源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.booking_full.ReductionError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/booking_full.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.booking_full.Refer (元组)

Refer(index, units_currency, cost_currency, price_currency)

beancount.parser.booking_full.Refer.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.booking_full.Refer.__new__(_cls, index, units_currency, cost_currency, price_currency) 特殊 静态方法

创建 Refer(index, units_currency, cost_currency, price_currency) 的新实例

beancount.parser.booking_full.Refer.__replace__(/, self, **kwds) 特殊

返回一个新的 Refer 对象,用指定的新值替换相应字段

源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.booking_full.Refer.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/booking_full.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.booking_full.SelfReduxError (元组)

SelfReduxError(source, message, entry)

beancount.parser.booking_full.SelfReduxError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/booking_full.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.booking_full.SelfReduxError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 SelfReduxError(source, message, entry) 的新实例

beancount.parser.booking_full.SelfReduxError.__replace__(/, self, **kwds) 特殊

返回一个新的 SelfReduxError 对象,用指定的新值替换相应字段

源代码位于 beancount/parser/booking_full.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.booking_full.SelfReduxError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/booking_full.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.booking_full.book(entries, options_map, methods, initial_balances=None)

使用完整的历史算法从条目中插值缺失的数据。详情请参见内部实现 _book()。此方法仅剥离部分返回值。

有关参数和返回值,请参见 _book()。

源代码位于 beancount/parser/booking_full.py
def book(entries, options_map, methods, initial_balances=None):
    """Interpolate missing data from the entries using the full historical algorithm.
    See the internal implementation _book() for details.
    This method only stripes some of the return values.

    See _book() for arguments and return values.
    """
    entries, errors, _ = _book(entries, options_map, methods, initial_balances)
    return entries, errors

beancount.parser.booking_full.book_reductions(entry, group_postings, balances, methods)

将库存减少与前期余额进行匹配。

此函数接受一个 (账户, 库存余额) 字典,对于每个作为库存减少的记账项,尝试查找对应的批次或批次列表以抵减余额。

  • 对于减少批次,记账项的 CostSpec 实例将被替换为 Cost 实例。

  • 对于增加批次,记账项的 CostSpec 实例保持不变,仅其日期继承自父交易。

参数:
  • entry – Transaction 的一个实例。仅在记录错误时引用。

  • group_postings – 一组 Posting 实例。

  • balances – 账户名称到库存内容的字典。

  • methods – 账户名称到其对应记账方法枚举的映射。

返回:
  • 已记账的记账项对 – 一组已记账的记账项,其中减少批次已与相应账户的前期库存余额中的具体仓位匹配。请注意,输入中的单个减少记账项可能在输出中产生多个记账项。同时请注意,持有成本的增加记账项仍会引用 CostSpec 的 'cost' 实例,留待后续插值。errors: 如果有错误,则为错误列表。

源代码位于 beancount/parser/booking_full.py
def book_reductions(entry, group_postings, balances,
                    methods):
    """Book inventory reductions against the ante-balances.

    This function accepts a dict of (account, Inventory balance) and for each
    posting that is a reduction against its inventory, attempts to find a
    corresponding lot or list of lots to reduce the balance with.

    * For reducing lots, the CostSpec instance of the posting is replaced by a
      Cost instance.

    * For augmenting lots, the CostSpec instance of the posting is left alone,
      except for its date, which is inherited from the parent Transaction.

    Args:
      entry: An instance of Transaction. This is only used to refer to when
        logging errors.
      group_postings: A list of Posting instances for the group.
      balances: A dict of account name to inventory contents.
      methods: A mapping of account name to their corresponding booking
        method enum.
    Returns:
      A pair of
        booked_postings: A list of booked postings, with reducing lots resolved
          against specific position in the corresponding accounts'
          ante-inventory balances. Note single reducing posting in the input may
          result in multiple postings in the output. Also note that augmenting
          postings held-at-cost will still refer to 'cost' instances of
          CostSpec, left to be interpolated later.
        errors: A list of errors, if there were any.
    """
    errors = []

    # A local copy of the balances dictionary which is updated just for the
    # duration of this function's updates, in order to take into account the
    # cumulative effect of all the postings inferred here
    local_balances = {}

    empty = inventory.Inventory()
    booked_postings = []
    for posting in group_postings:
        # Process a single posting.
        units = posting.units
        costspec = posting.cost
        account = posting.account

        # Note: We ensure there is no mutation on 'balances' to keep this
        # function without side-effects. Note that we may be able to optimize
        # performance later on by giving up this property.
        #
        # Also note that if there is no existing balance, then won't be any lot
        # reduction because none of the postings will be able to match against
        # any currencies of the balance.
        if account not in local_balances:
            previous_balance = balances.get(account, empty)
            local_balances[account] = copy.copy(previous_balance)
        balance = local_balances[account]

        # Check if this is a lot held at cost.
        if costspec is None or units.number is MISSING:
            # This posting is not held at cost; we do nothing.
            booked_postings.append(posting)
        else:
            # This posting is held at cost; figure out if it's a reduction or an
            # augmentation.
            method = methods[account]
            if (method is not Booking.NONE and
                balance is not None and
                balance.is_reduced_by(units)):
                # This posting is a reduction.

                # Match the positions.
                cost_number = compute_cost_number(costspec, units)
                matches = []
                for position in balance:
                    # Skip inventory contents of a different currency.
                    if (units.currency and
                        position.units.currency != units.currency):
                        continue
                    # Skip balance positions not held at cost.
                    if position.cost is None:
                        continue
                    if (cost_number is not None and
                        position.cost.number != cost_number):
                        continue
                    if (isinstance(costspec.currency, str) and
                        position.cost.currency != costspec.currency):
                        continue
                    if (costspec.date and
                        position.cost.date != costspec.date):
                        continue
                    if (costspec.label and
                        position.cost.label != costspec.label):
                        continue
                    matches.append(position)

                # Check for ambiguous matches.
                if len(matches) == 0:
                    errors.append(
                        ReductionError(entry.meta,
                                       'No position matches "{}" against balance {}'.format(
                                           posting, balance),
                                       entry))
                    return [], errors  # This is irreconcilable, remove these postings.

                # TODO(blais): We'll have to change this, as we want to allow
                # positions crossing from negative to positive and vice-versa in
                # a simple application. See {d3cbd78f1029}.
                reduction_postings, matched_postings, ambi_errors = (
                    booking_method.handle_ambiguous_matches(entry, posting, matches,
                                                            method))
                if ambi_errors:
                    errors.extend(ambi_errors)
                    return [], errors

                # Add the reductions to the resulting list of booked postings.
                booked_postings.extend(reduction_postings)

                # Update the local balance in order to avoid matching against
                # the same postings twice when processing multiple postings in
                # the same transaction. Note that we only do this for postings
                # held at cost because the other postings may need interpolation
                # in order to be resolved properly.
                for posting in reduction_postings:
                    balance.add_position(posting)
            else:
                # This posting is an augmentation.
                #
                # Note that we do not convert the CostSpec instances to Cost
                # instances, because we want to let the subsequent interpolation
                # process able to interpolate either the cost per-unit or the
                # total cost, separately.

                # Put in the date of the parent Transaction if there is no
                # explicit date specified on the spec.
                if costspec.date is None:
                    dated_costspec = costspec._replace(date=entry.date)
                    posting = posting._replace(cost=dated_costspec)

                # FIXME: Insert unique ids for trade tracking; right now this
                # creates ambiguous matches errors (and it shouldn't).
                # # Insert a unique label if there isn't one.
                # if posting.cost is not None and posting.cost.label is None:
                #     posting = posting._replace(
                #         cost=posting.cost._replace(label=unique_label()))

                booked_postings.append(posting)

    return booked_postings, errors

beancount.parser.booking_full.categorize_by_currency(entry, balances)

按其声明的货币对记账项进行分组。

这用于为下一阶段(插值和记账)准备记账项:随后将分别对每个货币组执行插值和记账。在本流程开始时,我们应已拥有明确的货币分组,且不存在关于其需与何种货币平衡的任何歧义。

以下是其工作原理。

  • 首先,我们应用约束条件:如果同时存在成本和价格,则成本货币和价格货币必须一致。这在一定程度上缩小了可能性范围。

  • 如果货币已明确指定,则将该记账项放入对应货币的分组中。

  • 如果未明确指定,则我们还有几种方法可用于消除歧义:

  • 我们查看其余的记账项……如果它们全部属于同一种货币,则该记账项也必须属于该货币。

  • 如果无法通过上述方式确定,则检查该记账项账户的库存内容。如果所有库存内容均为同一种货币,则使用该货币。

参数:
  • postings – 待分类的不完整记账项列表。

  • balances – 交易应用前各货币对应的库存内容字典。

返回:
  • 由 (货币字符串, 元组列表) 项组成的列表,描述每个记账项及其插值货币,以及货币插值过程中生成的错误列表。条目的原始记账项保持不变。值列表中的每个元组包含 – index:原始条目中记账项的索引。units_currency:单位的插值货币。cost_currency:成本的插值货币。price_currency:价格的插值货币。

源代码位于 beancount/parser/booking_full.py
def categorize_by_currency(entry, balances):
    """Group the postings by the currency they declare.

    This is used to prepare the postings for the next stages: Interpolation and
    booking will then be carried out separately on each currency group. At the
    outset of this routine, we should have distinct groups of currencies without
    any ambiguities regarding which currency they need to balance against.

    Here's how this works.

    - First we apply the constraint that cost-currency and price-currency must
      match, if there is both a cost and a price. This reduces the space of
      possibilities somewhat.

    - If the currency is explicitly specified, we put the posting in that
      currency's bucket.

    - If not, we have a few methods left to disambiguate the currency:

      1. We look at the remaining postings... if they are all of a single
         currency, the posting must be in that currency too.

      2. If we cannot do that, we inspect the contents of the inventory of the
         account for the posting. If all the contents are of a single currency,
         we use that one.

    Args:
      postings: A list of incomplete postings to categorize.
      balances: A dict of currency to inventory contents before the transaction is
        applied.
    Returns:
      A list of (currency string, list of tuples) items describing each postings
      and its interpolated currencies, and a list of generated errors for
      currency interpolation. The entry's original postings are left unmodified.
      Each tuple in the value-list contains:
        index: The posting index in the original entry.
        units_currency: The interpolated currency for units.
        cost_currency: The interpolated currency for cost.
        price_currency: The interpolated currency for price.
    """
    errors = []

    groups = collections.defaultdict(list)
    sortdict = {}
    auto_postings = []
    unknown = []
    for index, posting in enumerate(entry.postings):
        units = posting.units
        cost = posting.cost
        price = posting.price

        # Extract and override the currencies locally.
        units_currency = (units.currency
                          if units is not MISSING and units is not None
                          else None)
        cost_currency = (cost.currency
                         if cost is not MISSING and cost is not None
                         else None)
        price_currency = (price.currency
                          if price is not MISSING and price is not None
                          else None)

        # First we apply the constraint that cost-currency and price-currency
        # must match, if there is both a cost and a price. This reduces the
        # space of possibilities somewhat.
        if cost_currency is MISSING and isinstance(price_currency, str):
            cost_currency = price_currency
        if price_currency is MISSING and isinstance(cost_currency, str):
            price_currency = cost_currency

        refer = Refer(index, units_currency, cost_currency, price_currency)

        if units is MISSING and price_currency is None:
            # Bucket auto-postings separately from unknown.
            auto_postings.append(refer)
        else:
            # Bucket with what we know so far.
            currency = get_bucket_currency(refer)
            if currency is not None:
                sortdict.setdefault(currency, index)
                groups[currency].append(refer)
            else:
                # If we need to infer the currency, store in unknown.
                unknown.append(refer)

    # We look at the remaining postings... if they are all of a single currency,
    # the posting must be in that currency too.
    if unknown and len(unknown) == 1 and len(groups) == 1:
        (index, units_currency, cost_currency, price_currency) = unknown.pop()

        other_currency = next(iter(groups.keys()))
        if price_currency is None and cost_currency is None:
            # Infer to the units currency.
            units_currency = other_currency
        else:
            # Infer to the cost and price currencies.
            if price_currency is MISSING:
                price_currency = other_currency
            if cost_currency is MISSING:
                cost_currency = other_currency

        refer = Refer(index, units_currency, cost_currency, price_currency)
        currency = get_bucket_currency(refer)
        assert currency is not None
        sortdict.setdefault(currency, index)
        groups[currency].append(refer)

    # Finally, try to resolve all the unknown legs using the inventory contents
    # of each account.
    for refer in unknown:
        (index, units_currency, cost_currency, price_currency) = refer
        posting = entry.postings[index]
        balance = balances.get(posting.account, None)
        if balance is None:
            balance = inventory.Inventory()

        if units_currency is MISSING:
            balance_currencies = balance.currencies()
            if len(balance_currencies) == 1:
                units_currency = balance_currencies.pop()

        if cost_currency is MISSING or price_currency is MISSING:
            balance_cost_currencies = balance.cost_currencies()
            if len(balance_cost_currencies) == 1:
                balance_cost_currency = balance_cost_currencies.pop()
                if price_currency is MISSING:
                    price_currency = balance_cost_currency
                if cost_currency is MISSING:
                    cost_currency = balance_cost_currency

        refer = Refer(index, units_currency, cost_currency, price_currency)
        currency = get_bucket_currency(refer)
        if currency is not None:
            sortdict.setdefault(currency, index)
            groups[currency].append(refer)
        else:
            errors.append(
                CategorizationError(posting.meta,
                                    "Failed to categorize posting {}".format(index + 1),
                                    entry))

    # Fill in missing units currencies if some remain as missing. This may occur
    # if we used the cost or price to bucket the currency but the units currency
    # was missing.
    for currency, refers in groups.items():
        for rindex, refer in enumerate(refers):
            if refer.units_currency is MISSING:
                posting = entry.postings[refer.index]
                balance = balances.get(posting.account, None)
                if balance is None:
                    continue
                balance_currencies = balance.currencies()
                if len(balance_currencies) == 1:
                    refers[rindex] = refer._replace(units_currency=balance_currencies.pop())

    # Deal with auto-postings.
    if len(auto_postings) > 1:
        refer = auto_postings[-1]
        posting = entry.postings[refer.index]
        errors.append(
            CategorizationError(posting.meta,
                                "You may not have more than one auto-posting per currency",
                                entry))
        auto_postings = auto_postings[0:1]
    for refer in auto_postings:
        for currency, glist in groups.items():
            sortdict.setdefault(currency, refer.index)
            glist.append(Refer(refer.index, currency, None, None))

    # Issue error for all currencies which we could not resolve.
    for currency, refers in groups.items():
        for refer in refers:
            posting = entry.postings[refer.index]
            for currency, name in [(refer.units_currency, 'units'),
                                   (refer.cost_currency, 'cost'),
                                   (refer.price_currency, 'price')]:
                if currency is MISSING:
                    errors.append(CategorizationError(
                        posting.meta,
                        "Could not resolve {} currency".format(name),
                        entry))

    sorted_groups = sorted(groups.items(), key=lambda item: sortdict[item[0]])
    return sorted_groups, errors

beancount.parser.booking_full.compute_cost_number(costspec, units)

给定一个 CostSpec,若可能,则返回其成本数值。

参数:
  • costspec – 已解析的 CostSpec 实例。

  • units – 表示持仓单位的 Amount 实例。

返回:
  • 若无法计算成本,则返回 None;否则返回一个 Decimal 实例,表示单位成本。

源代码位于 beancount/parser/booking_full.py
def compute_cost_number(costspec, units):
    """Given a CostSpec, return the cost number, if possible to compute.

    Args:
      costspec: A parsed instance of CostSpec.
      units: An instance of Amount for the units of the position.
    Returns:
      If it is not possible to calculate the cost, return None.
      Otherwise, returns a Decimal instance, the per-unit cost.
    """
    number_per = costspec.number_per
    number_total = costspec.number_total
    if MISSING in (number_per, number_total):
        return None
    if number_total is not None:
        # Compute the per-unit cost if there is some total cost
        # component involved.
        cost_total = number_total
        units_number = abs(units.number)
        if number_per is not None:
            cost_total += number_per * units_number
        unit_cost = cost_total / units_number
    elif number_per is None:
        return None
    else:
        unit_cost = number_per
    return unit_cost

beancount.parser.booking_full.convert_costspec_to_cost(posting)

若记账项中存在 CostSpec,则将其转换为 Cost。

若记账项无成本,则直接返回其本身。

参数:
  • posting – Posting 实例。

返回:
  • 可能已替换 'cost' 属性的 Posting 实例。

源代码位于 beancount/parser/booking_full.py
def convert_costspec_to_cost(posting):
    """Convert an instance of CostSpec to Cost, if present on the posting.

    If the posting has no cost, it itself is just returned.

    Args:
      posting: An instance of Posting.
    Returns:
      An instance of Posting with a possibly replaced 'cost' attribute.
    """
    cost = posting.cost
    if isinstance(cost, position.CostSpec):
        if cost is not None:
            number_per = cost.number_per
            number_total = cost.number_total
            if number_total is not None:
                # Compute the per-unit cost if there is some total cost
                # component involved.
                units_number = abs(posting.units.number)
                cost_total = number_total
                if number_per is not MISSING:
                    cost_total += number_per * units_number
                unit_cost = cost_total / units_number
            else:
                unit_cost = number_per
            new_cost = Cost(unit_cost, cost.currency, cost.date, cost.label)
            posting = posting._replace(units=posting.units, cost=new_cost)
    return posting

beancount.parser.booking_full.get_bucket_currency(refer)

给定记账项的货币引用,返回其所属分组货币。

参数:
  • refer – Refer 实例。

返回:
  • 货币字符串。

源代码位于 beancount/parser/booking_full.py
def get_bucket_currency(refer):
    """Given currency references for a posting, return the bucket currency.

    Args:
      refer: An instance of Refer.
    Returns:
      A currency string.
    """
    currency = None
    if isinstance(refer.cost_currency, str):
        currency = refer.cost_currency
    elif isinstance(refer.price_currency, str):
        currency = refer.price_currency
    elif (refer.cost_currency is None and
          refer.price_currency is None and
          isinstance(refer.units_currency, str)):
        currency = refer.units_currency
    return currency

beancount.parser.booking_full.has_self_reduction(postings, methods)

若记账项可能在成本层面相互抵减,则返回 True。

参数:
  • postings – 包含未插值 CostSpec 成本的记账项列表。

  • methods – 账户名称到其对应记账方法的映射。

返回:
  • 布尔值,若存在自抵减可能性则为 True。

源代码位于 beancount/parser/booking_full.py
def has_self_reduction(postings, methods):
    """Return true if the postings potentially reduce each other at cost.

    Args:
      postings: A list of postings with uninterpolated CostSpec cost instances.
      methods: A mapping of account name to their corresponding booking
        method.
    Returns:
      A boolean, true if there's a potential for self-reduction.
    """
    # A mapping of (currency, cost-currency) and sign.
    cost_changes = {}
    for posting in postings:
        cost = posting.cost
        if cost is None:
            continue
        if methods[posting.account] is Booking.NONE:
            continue
        key = (posting.account, posting.units.currency)
        sign = 1 if posting.units.number > ZERO else -1
        if cost_changes.setdefault(key, sign) != sign:
            return True
    return False

beancount.parser.booking_full.interpolate_group(postings, balances, currency, tolerances)

插值补全一组记账项中缺失的数值。

参数:
  • postings – 一组 Posting 实例的列表。

  • balances – 一个字典,键为账户名,值为其前置库存。

  • currency – 本组的权重货币,用于报告错误。

  • tolerances – 一个字典,键为货币,值为容差值。

返回:
  • 记账项元组 – 一组新的 Posting 实例。errors: 插值过程中生成的错误列表。interpolated: 布尔值,若必须进行插值则为 True。

    若发生错误,此函数将返回原始的记账项列表,该列表仍不完整。若返回了错误,您应跳过整个交易,或根本不包含其中的记账项。(另一种行为是仅返回有效的记账项列表,但这可能导致交易不平衡。我们选择当前方式是有意为之。)

源代码位于 beancount/parser/booking_full.py
def interpolate_group(postings, balances, currency, tolerances):
    """Interpolate missing numbers in the set of postings.

    Args:
      postings: A list of Posting instances.
      balances: A dict of account to its ante-inventory.
      currency: The weight currency of this group, used for reporting errors.
      tolerances: A dict of currency to tolerance values.
    Returns:
      A tuple of
        postings: A list of new posting instances.
        errors: A list of errors generated during interpolation.
        interpolated: A boolean, true if we did have to interpolate.

      In the case of an error, this returns the original list of postings, which
      is still incomplete. If an error is returned, you should probably skip the
      transaction altogether, or just not include the postings in it. (An
      alternative behaviour would be to return only the list of valid postings,
      but that would likely result in an unbalanced transaction. We do it this
      way by choice.)
    """
    errors = []

    # Figure out which type of amount is missing, by creating a list of
    # incomplete postings and which type of units is missing.
    incomplete = []
    for index, posting in enumerate(postings):
        units = posting.units
        cost = posting.cost
        price = posting.price

        # Identify incomplete parts of the Posting components.
        if units.number is MISSING:
            incomplete.append((MissingType.UNITS, index))

        if isinstance(cost, CostSpec):
            if cost and cost.number_per is MISSING:
                incomplete.append((MissingType.COST_PER, index))
            if cost and cost.number_total is MISSING:
                incomplete.append((MissingType.COST_TOTAL, index))
        else:
            # Check that a resolved instance of Cost never needs interpolation.
            #
            # Note that in theory we could support the interpolation of regular
            # per-unit costs in these if we wanted to; but because they're all
            # reducing postings that have been booked earlier, those never need
            # to be interpolated.
            if cost is not None:
                assert isinstance(cost.number, Decimal), (
                    "Internal error: cost has no number: {}; on postings: {}".format(
                        cost, postings))

        if price and price.number is MISSING:
            incomplete.append((MissingType.PRICE, index))

    # The replacement posting for the incomplete posting of this group.
    new_posting = None

    if len(incomplete) == 0:
        # If there are no missing numbers, just convert the CostSpec to Cost and
        # return that.
        out_postings = [convert_costspec_to_cost(posting)
                        for posting in postings]

    elif len(incomplete) > 1:
        # If there is more than a single value to be interpolated, generate an
        # error and return no postings.
        _, posting_index = incomplete[0]
        errors.append(InterpolationError(
            postings[posting_index].meta,
            "Too many missing numbers for currency group '{}'".format(currency),
            None))
        out_postings = []

    else:
        # If there is a single missing number, calculate it and fill it in here.
        missing, index = incomplete[0]
        incomplete_posting = postings[index]

        # Convert augmenting postings' costs from CostSpec to corresponding Cost
        # instances, except for the incomplete posting.
        new_postings = [(posting
                         if posting is incomplete_posting
                         else convert_costspec_to_cost(posting))
                        for posting in postings]

        # Compute the balance of the other postings.
        residual = interpolate.compute_residual(posting
                                                for posting in new_postings
                                                if posting is not incomplete_posting)
        assert len(residual) < 2, "Internal error in grouping postings by currencies."
        if not residual.is_empty():
            respos = next(iter(residual))
            assert respos.cost is None, (
                "Internal error; cost appears in weight calculation.")
            assert respos.units.currency == currency, (
                "Internal error; residual different than currency group.")
            weight = -respos.units.number
            weight_currency = respos.units.currency
        else:
            weight = ZERO
            weight_currency = currency

        if missing == MissingType.UNITS:
            units = incomplete_posting.units
            cost = incomplete_posting.cost
            if cost:
                # Handle the special case where we only have total cost.
                if cost.number_per == ZERO:
                    errors.append(InterpolationError(
                        incomplete_posting.meta,
                        "Cannot infer per-unit cost only from total", None))
                    return postings, errors, True

                assert cost.currency == weight_currency, (
                    "Internal error; residual currency different than missing currency.")
                cost_total = cost.number_total or ZERO
                units_number = (weight - cost_total) / cost.number_per

            elif incomplete_posting.price:
                assert incomplete_posting.price.currency == weight_currency, (
                    "Internal error; residual currency different than missing currency.")
                units_number = weight / incomplete_posting.price.number

            else:
                assert units.currency == weight_currency, (
                    "Internal error; residual currency different than missing currency.")
                units_number = weight

            # Quantize the interpolated units if necessary.
            units_number = interpolate.quantize_with_tolerance(tolerances,
                                                               units.currency,
                                                               units_number)

            if weight != ZERO:
                new_pos = Position(Amount(units_number, units.currency), cost)
                new_posting = incomplete_posting._replace(units=new_pos.units,
                                                          cost=new_pos.cost)
            else:
                new_posting = None

        elif missing == MissingType.COST_PER:
            units = incomplete_posting.units
            cost = incomplete_posting.cost
            assert cost.currency == weight_currency, (
                "Internal error; residual currency different than missing currency.")
            if units.number != ZERO:
                number_per = (weight - (cost.number_total or ZERO)) / units.number
                new_cost = cost._replace(number_per=number_per)
                new_pos = Position(units, new_cost)
                new_posting = incomplete_posting._replace(units=new_pos.units,
                                                          cost=new_pos.cost)
            else:
                new_posting = None

        elif missing == MissingType.COST_TOTAL:
            units = incomplete_posting.units
            cost = incomplete_posting.cost
            assert cost.currency == weight_currency, (
                "Internal error; residual currency different than missing currency.")
            number_total = (weight - cost.number_per * units.number)
            new_cost = cost._replace(number_total=number_total)
            new_pos = Position(units, new_cost)
            new_posting = incomplete_posting._replace(units=new_pos.units,
                                                      cost=new_pos.cost)

        elif missing == MissingType.PRICE:
            units = incomplete_posting.units
            cost = incomplete_posting.cost
            if cost is not None:
                errors.append(InterpolationError(
                    incomplete_posting.meta,
                    "Cannot infer price for postings with units held at cost", None))
                return postings, errors, True
            else:
                price = incomplete_posting.price
                assert price.currency == weight_currency, (
                    "Internal error; residual currency different than missing currency.")
                new_price_number = abs(weight / units.number)
                new_posting = incomplete_posting._replace(price=Amount(new_price_number,
                                                                       price.currency))

        else:
            assert False, "Internal error; Invalid missing type."

        # Replace the number in the posting.
        if new_posting is not None:
            # Set meta-data on the new posting to indicate it was interpolated.
            if new_posting.meta is None:
                new_posting = new_posting._replace(meta={})
            new_posting.meta[interpolate.AUTOMATIC_META] = True

            # Convert augmenting posting costs from CostSpec to a corresponding
            # Cost instance.
            new_postings[index] = convert_costspec_to_cost(new_posting)
        else:
            del new_postings[index]
        out_postings = new_postings

    assert all(not isinstance(posting.cost, CostSpec)
               for posting in out_postings)

    # Check that units are non-zero and that no cost remains negative; issue an
    # error if this is the case.
    for posting in out_postings:
        if posting.cost is None:
            continue
        # If there is a cost, we don't allow either a cost value of zero,
        # nor a zero number of units. Note that we allow a price of zero as
        # the only special case allowed (for conversion entries), but never
        # for costs.
        if posting.units.number == ZERO:
            errors.append(InterpolationError(
                posting.meta,
                'Amount is zero: "{}"'.format(posting.units), None))
        if posting.cost.number is not None and posting.cost.number < ZERO:
            errors.append(InterpolationError(
                posting.meta,
                'Cost is negative: "{}"'.format(posting.cost), None))

    return out_postings, errors, (new_posting is not None)

beancount.parser.booking_full.replace_currencies(postings, refer_groups)

替换条目中记账项的已解析货币。

此函数本质上将 categorize_by_currency() 的结果应用于生成所有货币均已解析的新记账项。

参数:
  • postings – 需要替换的 Posting 实例列表。

  • refer_groups – 由 categorize_by_currency() 返回的 (货币, 记账项引用列表) 项的列表。

返回:
  • 一组新的 (货币, 记账项列表) 项,其中记账项的货币已被其插值后的货币值替换。

源代码位于 beancount/parser/booking_full.py
def replace_currencies(postings, refer_groups):
    """Replace resolved currencies in the entry's Postings.

    This essentially applies the findings of categorize_by_currency() to produce
    new postings with all currencies resolved.

    Args:
      postings: A list of Posting instances to replace.
      refer_groups: A list of (currency, list of posting references) items as
        returned by categorize_by_currency().
    Returns:
      A new list of items of (currency, list of Postings), postings for which the
      currencies have been replaced by their interpolated currency values.
    """
    new_groups = []
    for currency, refers in refer_groups:
        new_postings = []
        for refer in sorted(refers, key=lambda r: r.index):
            posting = postings[refer.index]
            units = posting.units
            if units is MISSING or units is None:
                posting = posting._replace(units=Amount(MISSING, refer.units_currency))
            else:
                replace = False
                cost = posting.cost
                price = posting.price
                if units.currency is MISSING:
                    units = Amount(units.number, refer.units_currency)
                    replace = True
                if cost and cost.currency is MISSING:
                    cost = cost._replace(currency=refer.cost_currency)
                    replace = True
                if price and price.currency is MISSING:
                    price = Amount(price.number, refer.price_currency)
                    replace = True
                if replace:
                    posting = posting._replace(units=units, cost=cost, price=price)
            new_postings.append(posting)
        new_groups.append((currency, new_postings))
    return new_groups

beancount.parser.booking_full.unique_label()

为成本条目返回一个全局唯一的标签。

源代码位于 beancount/parser/booking_full.py
def unique_label() -> str:
    "Return a globally unique label for cost entries."
    return str(uuid.uuid4())

beancount.parser.booking_method

所有特定记账方法的实现。此代码被完整记账算法使用。

beancount.parser.booking_method.AmbiguousMatchError (元组)

AmbiguousMatchError(source, message, entry)

beancount.parser.booking_method.AmbiguousMatchError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/booking_method.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.booking_method.AmbiguousMatchError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 AmbiguousMatchError(source, message, entry) 的新实例

beancount.parser.booking_method.AmbiguousMatchError.__replace__(/, self, **kwds) 特殊

返回一个新 AmbiguousMatchError 对象,用指定的新值替换字段

源代码位于 beancount/parser/booking_method.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.booking_method.AmbiguousMatchError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/booking_method.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.booking_method.booking_method_AVERAGE(entry, posting, matches)

AVERAGE 记账方法的实现。

源代码位于 beancount/parser/booking_method.py
def booking_method_AVERAGE(entry, posting, matches):
    """AVERAGE booking method implementation."""
    booked_reductions = []
    booked_matches = []
    errors = [AmbiguousMatchError(entry.meta, "AVERAGE method is not supported", entry)]
    return booked_reductions, booked_matches, errors, False

    # FIXME: Future implementation here.
    # pylint: disable=unreachable
    if False: # pylint: disable=using-constant-test
        # DISABLED - This is the code for AVERAGE, which is currently disabled.

        # If there is more than a single match we need to ultimately merge the
        # postings. Also, if the reducing posting provides a specific cost, we
        # need to update the cost basis as well. Both of these cases are carried
        # out by removing all the matches and readding them later on.
        if len(matches) == 1 and (
                not isinstance(posting.cost.number_per, Decimal) and
                not isinstance(posting.cost.number_total, Decimal)):
            # There is no cost. Just reduce the one leg. This should be the
            # normal case if we always merge augmentations and the user lets
            # Beancount deal with the cost.
            match = matches[0]
            sign = -1 if posting.units.number < ZERO else 1
            number = min(abs(match.units.number), abs(posting.units.number))
            match_units = Amount(number * sign, match.units.currency)
            booked_reductions.append(posting._replace(units=match_units, cost=match.cost))
            insufficient = (match_units.number != posting.units.number)
        else:
            # Merge the matching postings to a single one.
            merged_units = inventory.Inventory()
            merged_cost = inventory.Inventory()
            for match in matches:
                merged_units.add_amount(match.units)
                merged_cost.add_amount(convert.get_weight(match))
            if len(merged_units) != 1 or len(merged_cost) != 1:
                errors.append(
                    AmbiguousMatchError(
                        entry.meta,
                        'Cannot merge positions in multiple currencies: {}'.format(
                            ', '.join(position.to_string(match_posting)
                                      for match_posting in matches)), entry))
            else:
                if (isinstance(posting.cost.number_per, Decimal) or
                    isinstance(posting.cost.number_total, Decimal)):
                    errors.append(
                        AmbiguousMatchError(
                            entry.meta,
                            "Explicit cost reductions aren't supported yet: {}".format(
                                position.to_string(posting)), entry))
                else:
                    # Insert postings to remove all the matches.
                    booked_reductions.extend(
                        posting._replace(units=-match.units, cost=match.cost,
                                         flag=flags.FLAG_MERGING)
                        for match in matches)
                    units = merged_units[0].units
                    date = matches[0].cost.date  ## FIXME: Select which one,
                                                 ## oldest or latest.
                    cost_units = merged_cost[0].units
                    cost = Cost(cost_units.number/units.number, cost_units.currency,
                                date, None)

                    # Insert a posting to refill those with a replacement match.
                    booked_reductions.append(
                        posting._replace(units=units, cost=cost, flag=flags.FLAG_MERGING))

                    # Now, match the reducing request against this lot.
                    booked_reductions.append(
                        posting._replace(units=posting.units, cost=cost))
                    insufficient = abs(posting.units.number) > abs(units.number)

beancount.parser.booking_method.booking_method_FIFO(entry, posting, matches)

FIFO 记账方法实现。

源代码位于 beancount/parser/booking_method.py
def booking_method_FIFO(entry, posting, matches):
    """FIFO booking method implementation."""
    return _booking_method_xifo(entry, posting, matches, "date", False)

beancount.parser.booking_method.booking_method_HIFO(entry, posting, matches)

HIFO 记账方法实现。

源代码位于 beancount/parser/booking_method.py
def booking_method_HIFO(entry, posting, matches):
    """HIFO booking method implementation."""
    return _booking_method_xifo(entry, posting, matches, "number", True)

beancount.parser.booking_method.booking_method_LIFO(entry, posting, matches)

LIFO 记账方法实现。

源代码位于 beancount/parser/booking_method.py
def booking_method_LIFO(entry, posting, matches):
    """LIFO booking method implementation."""
    return _booking_method_xifo(entry, posting, matches, "date", True)

beancount.parser.booking_method.booking_method_NONE(entry, posting, matches)

NONE 记账方法实现。

源代码位于 beancount/parser/booking_method.py
def booking_method_NONE(entry, posting, matches):
    """NONE booking method implementation."""

    # This never needs to match against any existing positions... we
    # disregard the matches, there's never any error. Note that this never
    # gets called in practice, we want to treat NONE postings as
    # augmentations. Default behaviour is to return them with their original
    # CostSpec, and the augmentation code will handle signaling an error if
    # there is insufficient detail to carry out the conversion to an
    # instance of Cost.

    # Note that it's an interesting question whether a reduction on an
    # account with NONE method which happens to match a single position
    # ought to be matched against it. We don't allow it for now.

    return [posting], [], False

beancount.parser.booking_method.booking_method_STRICT(entry, posting, matches)

严格记账方法。如果存在模糊匹配,此方法将失败。

源代码位于 beancount/parser/booking_method.py
def booking_method_STRICT(entry, posting, matches):
    """Strict booking method. This method fails if there are ambiguous matches.
    """
    booked_reductions = []
    booked_matches = []
    errors = []
    insufficient = False

    # In strict mode, we require at most a single matching posting.
    if len(matches) > 1:
        # If the total requested to reduce matches the sum of all the
        # ambiguous postings, match against all of them.
        sum_matches = sum(p.units.number for p in matches)
        if sum_matches == -posting.units.number:
            booked_reductions.extend(
                posting._replace(units=-match.units, cost=match.cost)
                for match in matches)
        else:
            errors.append(
                AmbiguousMatchError(entry.meta,
                                    'Ambiguous matches for "{}": {}'.format(
                                        position.to_string(posting),
                                        ', '.join(position.to_string(match_posting)
                                                  for match_posting in matches)),
                                    entry))
    else:
        # Replace the posting's units and cost values.
        match = matches[0]
        sign = -1 if posting.units.number < ZERO else 1
        number = min(abs(match.units.number), abs(posting.units.number))
        match_units = Amount(number * sign, match.units.currency)
        booked_reductions.append(posting._replace(units=match_units, cost=match.cost))
        booked_matches.append(match)
        insufficient = (match_units.number != posting.units.number)

    return booked_reductions, booked_matches, errors, insufficient

beancount.parser.booking_method.booking_method_STRICT_WITH_SIZE(entry, posting, matches)

严格记账方法,但进一步通过数量消除歧义。

此记账方法应用与 STRICT 方法相同的算法,但如果仅有一个模糊批次匹配所需数量,则自动选择该批次。

源代码位于 beancount/parser/booking_method.py
def booking_method_STRICT_WITH_SIZE(entry, posting, matches):
    """Strict booking method, but disambiguate further with sizes.

    This booking method applies the same algorithm as the STRICT method, but if
    only one of the ambiguous lots matches the desired size, select that one
    automatically.
    """
    (booked_reductions, booked_matches, errors,
     insufficient) = booking_method_STRICT(entry, posting, matches)

    # If we couldn't match strictly, attempt to find a match with the same
    # number of units. If there is one or more of these, accept the oldest lot.
    if errors and len(matches) > 1:
        number = -posting.units.number
        matching_units = [match
                          for match in matches
                          if number == match.units.number]
        if matching_units:
            matching_units.sort(key=lambda match: match.cost.date)

            # Replace the posting's units and cost values.
            match = matching_units[0]
            booked_reductions.append(posting._replace(units=-match.units, cost=match.cost))
            booked_matches.append(match)
            insufficient = False
            errors = []

    return booked_reductions, booked_matches, errors, insufficient

beancount.parser.booking_method.handle_ambiguous_matches(entry, posting, matches, method)

通过分派到特定方法来处理模糊匹配。

参数:
  • entry – 父级 Transaction 实例。

  • posting – Posting 实例,即我们试图匹配的减少项。

  • matches – 来自前置库存的匹配 Position 实例列表。这些位置已知与 'posting' 规范匹配。

  • methods – 账户名称到其对应记账方法的映射。

返回:
  • A triple of booked_reductions – 匹配的 Posting 实例列表,其 'cost' 属性确保为 Cost 类型。errors:要生成的错误列表。insufficient:布尔值,若无法找到足够匹配项以覆盖整个位置则为真。

源代码位于 beancount/parser/booking_method.py
def handle_ambiguous_matches(entry, posting, matches, method):
    """Handle ambiguous matches by dispatching to a particular method.

    Args:
      entry: The parent Transaction instance.
      posting: An instance of Posting, the reducing posting which we're
        attempting to match.
      matches: A list of matching Position instances from the ante-inventory.
        Those positions are known to already match the 'posting' spec.
      methods: A mapping of account name to their corresponding booking
        method.
    Returns:
      A triple of
        booked_reductions: A list of matched Posting instances, whose 'cost'
          attributes are ensured to be of type Cost.
        errors: A list of errors to be generated.
        insufficient: A boolean, true if we could not find enough matches to
        cover the entire position.
    """
    assert isinstance(method, Booking), (
        "Invalid type: {}".format(method))
    assert matches, "Internal error: Invalid call with no matches"

    #method = globals()['booking_method_{}'.format(method.name)]
    method = _BOOKING_METHODS[method]
    (booked_reductions,
     booked_matches, errors, insufficient) = method(entry, posting, matches)
    if insufficient:
        errors.append(
            AmbiguousMatchError(entry.meta,
                                'Not enough lots to reduce "{}": {}'.format(
                                    position.to_string(posting),
                                    ', '.join(position.to_string(match_posting)
                                              for match_posting in matches)),
                                entry))

    return booked_reductions, booked_matches, errors

beancount.parser.cmptest

用于测试脚本的辅助工具。

beancount.parser.cmptest.TestCase (TestCase)

beancount.parser.cmptest.TestCase.assertEqualEntries(self, expected_entries, actual_entries, allow_incomplete=False)

检查两个条目列表是否相等。

条目可以作为指令列表或字符串提供。在后一种情况下,字符串将使用 beancount.parser.parse_string() 解析,并使用生成的指令列表。如果 allow_incomplete 为 True,则在比较指令列表前执行轻量级记账,以允许比较具有不完整记账项的交易。

参数:
  • expected_entries – 期望的条目。

  • actual_entries – 实际的条目。

  • allow_incomplete – 在比较前执行记账。

异常:
  • AssertionError – 如果断言失败。

源代码位于 beancount/parser/cmptest.py
def assertEqualEntries(self, expected_entries, actual_entries, allow_incomplete=False):
    """Check that two lists of entries are equal.

    Entries can be provided either as a list of directives or as a
    string.  In the latter case, the string is parsed with
    beancount.parser.parse_string() and the resulting directives
    list is used. If allow_incomplete is True, light-weight
    booking is performed before comparing the directive lists,
    allowing to compare transactions with incomplete postings.

    Args:
      expected_entries: Expected entries.
      actual_entries: Actual entries.
      allow_incomplete: Perform booking before comparison.

    Raises:
      AssertionError: If the exception fails.

    """
    expected_entries = read_string_or_entries(expected_entries, allow_incomplete)
    actual_entries = read_string_or_entries(actual_entries, allow_incomplete)

    same, expected_missing, actual_missing = \
        compare.compare_entries(expected_entries, actual_entries)
    if not same:
        assert expected_missing or actual_missing, \
            "Missing is missing: {}, {}".format(expected_missing, actual_missing)
        oss = io.StringIO()
        if expected_missing:
            oss.write("Present in expected set and not in actual set:\n\n")
            for entry in expected_missing:
                oss.write(printer.format_entry(entry))
                oss.write('\n')
        if actual_missing:
            oss.write("Present in actual set and not in expected set:\n\n")
            for entry in actual_missing:
                oss.write(printer.format_entry(entry))
                oss.write('\n')
        self.fail(oss.getvalue())

beancount.parser.cmptest.TestCase.assertExcludesEntries(self, subset_entries, entries, allow_incomplete=False)

检查 subset_entries 是否未包含在 entries 中。

条目可以作为指令列表或字符串提供。在后一种情况下,字符串将使用 beancount.parser.parse_string() 解析,并使用生成的指令列表。如果 allow_incomplete 为 True,则在比较指令列表前执行轻量级记账,以允许比较具有不完整记账项的交易。

参数:
  • subset_entries – 子集条目。

  • entries – 条目。

  • allow_incomplete – 在比较前执行记账。

异常:
  • AssertionError – 如果断言失败。

源代码位于 beancount/parser/cmptest.py
def assertExcludesEntries(self, subset_entries, entries, allow_incomplete=False):
    """Check that subset_entries is not included in entries.

    Entries can be provided either as a list of directives or as a
    string. In the latter case, the string is parsed with
    beancount.parser.parse_string() and the resulting directives
    list is used. If allow_incomplete is True, light-weight
    booking is performed before comparing the directive lists,
    allowing to compare transactions with incomplete postings.

    Args:
      subset_entries: Subset entries.
      entries: Entries.
      allow_incomplete: Perform booking before comparison.

    Raises:
      AssertionError: If the exception fails.

    """
    subset_entries = read_string_or_entries(subset_entries, allow_incomplete)
    entries = read_string_or_entries(entries)

    excludes, extra = compare.excludes_entries(subset_entries, entries)
    if not excludes:
        assert extra, "Extra is empty: {}".format(extra)
        oss = io.StringIO()
        if extra:
            oss.write("Extra from from first/excluded set:\n\n")
            for entry in extra:
                oss.write(printer.format_entry(entry))
                oss.write('\n')
        self.fail(oss.getvalue())

beancount.parser.cmptest.TestCase.assertIncludesEntries(self, subset_entries, entries, allow_incomplete=False)

检查 subset_entries 是否包含在 entries 中。

条目可以作为指令列表或字符串提供。在后一种情况下,字符串将使用 beancount.parser.parse_string() 解析,并使用生成的指令列表。如果 allow_incomplete 为 True,则在比较指令列表前执行轻量级记账,以允许比较具有不完整记账项的交易。

参数:
  • subset_entries – 子集条目。

  • entries – 条目。

  • allow_incomplete – 在比较前执行记账。

异常:
  • AssertionError – 如果断言失败。

源代码位于 beancount/parser/cmptest.py
def assertIncludesEntries(self, subset_entries, entries, allow_incomplete=False):
    """Check that subset_entries is included in entries.

    Entries can be provided either as a list of directives or as a
    string.  In the latter case, the string is parsed with
    beancount.parser.parse_string() and the resulting directives
    list is used. If allow_incomplete is True, light-weight
    booking is performed before comparing the directive lists,
    allowing to compare transactions with incomplete postings.

    Args:
      subset_entries: Subset entries.
      entries: Entries.
      allow_incomplete: Perform booking before comparison.

    Raises:
      AssertionError: If the exception fails.

    """
    subset_entries = read_string_or_entries(subset_entries, allow_incomplete)
    entries = read_string_or_entries(entries)

    includes, missing = compare.includes_entries(subset_entries, entries)
    if not includes:
        assert missing, "Missing is empty: {}".format(missing)
        oss = io.StringIO()
        if missing:
            oss.write("Missing from from expected set:\n\n")
            for entry in missing:
                oss.write(printer.format_entry(entry))
                oss.write('\n')
        self.fail(oss.getvalue())

beancount.parser.cmptest.TestError (异常)

测试实现内部的错误。这些错误不应发生。

beancount.parser.cmptest.read_string_or_entries(entries_or_str, allow_incomplete=False)

读取条目字符串或直接读取条目。

参数:
  • entries_or_str – 要么是指令列表,要么是包含指令的字符串。

  • allow_incomplete – 布尔值,若为 True,则允许不完整的输入并执行轻量级记账。

返回:
  • 指令列表。

源代码位于 beancount/parser/cmptest.py
def read_string_or_entries(entries_or_str, allow_incomplete=False):
    """Read a string of entries or just entries.

    Args:
      entries_or_str: Either a list of directives, or a string containing directives.
      allow_incomplete: A boolean, true if we allow incomplete inputs and perform
        light-weight booking.
    Returns:
      A list of directives.
    """
    if isinstance(entries_or_str, str):
        entries, errors, options_map = parser.parse_string(
            textwrap.dedent(entries_or_str))

        if allow_incomplete:
            # Do a simplistic local conversion in order to call the comparison.
            entries = [_local_booking(entry) for entry in entries]
        else:
            # Don't accept incomplete entries either.
            if any(parser.is_entry_incomplete(entry) for entry in entries):
                raise TestError("Entries in assertions may not use interpolation.")

            entries, booking_errors = booking.book(entries, options_map)
            errors = errors + booking_errors

        # Don't tolerate errors.
        if errors:
            oss = io.StringIO()
            printer.print_errors(errors, file=oss)
            raise TestError("Unexpected errors in expected: {}".format(oss.getvalue()))

    else:
        assert isinstance(entries_or_str, list), "Expecting list: {}".format(entries_or_str)
        entries = entries_or_str

    return entries

beancount.parser.context

生成在应用特定条目前后账户余额的渲染结果。

beancount.parser.context.render_entry_context(entries, options_map, entry, parsed_entry=None)

渲染特定交易应用前后的上下文。

参数:
  • entries – 一组指令列表。

  • options_map – 由解析器生成的选项字典。

  • entry – 应被渲染的条目实例。(注意:此对象应位于 entries 集合中,而不仅仅是结构上相等。)

  • parsed_entry – 可选的不完整条目,已解析但未记账或插值。若提供此参数,则用于检查先前账户列表,并也会被渲染。

返回:
  • 一个多行文本字符串,包含交易应用前的上下文、交易本身以及应用后的上下文。可直接打印,其格式专为用户阅读设计。

beancount/parser/context.py 中的源代码
def render_entry_context(entries, options_map, entry, parsed_entry=None):
    """Render the context before and after a particular transaction is applied.

    Args:
      entries: A list of directives.
      options_map: A dict of options, as produced by the parser.
      entry: The entry instance which should be rendered. (Note that this object is
        expected to be in the set of entries, not just structurally equal.)
      parsed_entry: An optional incomplete, parsed but not booked nor interpolated
        entry. If this is provided, this is used for inspecting the list of prior
        accounts and it is also rendered.
    Returns:
      A multiline string of text, which consists of the context before the
      transaction is applied, the transaction itself, and the context after it
      is applied. You can just print that, it is in form that is intended to be
      consumed by the user.
    """
    oss = io.StringIO()
    pr = functools.partial(print, file=oss)
    header = "** {} --------------------------------"

    meta = entry.meta
    pr(header.format("Transaction Id"))
    pr()
    pr("Hash:{}".format(compare.hash_entry(entry)))
    pr("Location: {}:{}".format(meta["filename"], meta["lineno"]))
    pr()
    pr()

    # Get the list of accounts sorted by the order in which they appear in the
    # closest entry.
    order = {}
    if parsed_entry is None:
        parsed_entry = entry
    if isinstance(parsed_entry, data.Transaction):
        order = {posting.account: index
                 for index, posting in enumerate(parsed_entry.postings)}
    accounts = sorted(getters.get_entry_accounts(parsed_entry),
                      key=lambda account: order.get(account, 10000))

    # Accumulate the balances of these accounts up to the entry.
    balance_before, balance_after = interpolate.compute_entry_context(
        entries, entry, additional_accounts=accounts)

    # Create a format line for printing the contents of account balances.
    max_account_width = max(map(len, accounts)) if accounts else 1
    position_line = '{{:1}} {{:{width}}}  {{:>49}}'.format(width=max_account_width)

    # Print the context before.
    pr(header.format("Balances before transaction"))
    pr()
    before_hashes = set()
    average_costs = {}
    for account in accounts:
        balance = balance_before[account]

        pc_balances = balance.split()
        for currency, pc_balance in pc_balances.items():
            if len(pc_balance) > 1:
                average_costs[account] = pc_balance.average()

        positions = balance.get_positions()
        for position in positions:
            before_hashes.add((account, hash(position)))
            pr(position_line.format('', account, str(position)))
        if not positions:
            pr(position_line.format('', account, ''))
        pr()
    pr()

    # Print average cost per account, if relevant.
    if average_costs:
        pr(header.format("Average Costs"))
        pr()
        for account, average_cost in sorted(average_costs.items()):
            for position in average_cost:
                pr(position_line.format('', account, str(position)))
        pr()
        pr()

    # Print the entry itself.
    dcontext = options_map['dcontext']
    pr(header.format("Unbooked Transaction"))
    pr()
    if parsed_entry:
        printer.print_entry(parsed_entry, dcontext, render_weights=True, file=oss)
    pr()

    pr(header.format("Transaction"))
    pr()
    printer.print_entry(entry, dcontext, render_weights=True, file=oss)
    pr()

    if isinstance(entry, data.Transaction):
        pr(header.format("Residual and Tolerances"))
        pr()

        # Print residuals.
        residual = interpolate.compute_residual(entry.postings)
        if not residual.is_empty():
            # Note: We render the residual at maximum precision, for debugging.
            pr('Residual: {}'.format(residual))

        # Dump the tolerances used.
        tolerances = interpolate.infer_tolerances(entry.postings, options_map)
        if tolerances:
            pr('Tolerances: {}'.format(
                ', '.join('{}={}'.format(key, value)
                          for key, value in sorted(tolerances.items()))))

        # Compute the total cost basis.
        cost_basis = inventory.Inventory(
            pos for pos in entry.postings if pos.cost is not None
        ).reduce(convert.get_cost)
        if not cost_basis.is_empty():
            pr('Basis: {}'.format(cost_basis))
        pr()
        pr()

    # Print the context after.
    pr(header.format("Balances after transaction"))
    pr()
    for account in accounts:
        positions = balance_after[account].get_positions()
        for position in positions:
            changed = (account, hash(position)) not in before_hashes
            print(position_line.format('*' if changed else '', account, str(position)),
                  file=oss)
        if not positions:
            pr(position_line.format('', account, ''))
        pr()

    return oss.getvalue()

beancount.parser.context.render_file_context(entries, options_map, filename, lineno)

渲染特定交易应用前后的上下文。

参数:
  • entries – 一组指令列表。

  • options_map – 由解析器生成的选项字典。

  • filename – 字符串,表示交易被解析的文件名。

  • lineno – 整数,表示交易在文件中的行号。

返回:
  • 一个多行文本字符串,包含交易应用前的上下文、交易本身以及应用后的上下文。可直接打印,其格式专为用户阅读设计。

beancount/parser/context.py 中的源代码
def render_file_context(entries, options_map, filename, lineno):
    """Render the context before and after a particular transaction is applied.

    Args:
      entries: A list of directives.
      options_map: A dict of options, as produced by the parser.
      filename: A string, the name of the file from which the transaction was parsed.
      lineno: An integer, the line number in the file the transaction was parsed from.
    Returns:
      A multiline string of text, which consists of the context before the
      transaction is applied, the transaction itself, and the context after it
      is applied. You can just print that, it is in form that is intended to be
      consumed by the user.
    """
    # Find the closest entry.
    closest_entry = data.find_closest(entries, filename, lineno)
    if closest_entry is None:
        raise SystemExit("No entry could be found before {}:{}".format(filename, lineno))

    # Run just the parser stage (no booking nor interpolation, which would
    # remove the postings) on the input file to produced the corresponding
    # unbooked transaction, so that we can get the list of accounts.
    if path.exists(filename):
        parsed_entries, _, __= parser.parse_file(filename)

        # Note: We cannot bisect as we cannot rely on sorting behavior from the parser.
        lineno = closest_entry.meta['lineno']
        closest_parsed_entries = [parsed_entry
                                  for parsed_entry in parsed_entries
                                  if parsed_entry.meta['lineno'] == lineno]
        if len(closest_parsed_entries) != 1:
            # This is an internal error, this should never occur.
            raise RuntimeError(
                "Parsed entry corresponding to real entry not found in original filename.")
        closest_parsed_entry = next(iter(closest_parsed_entries))
    else:
        closest_parsed_entry = None

    return render_entry_context(entries, options_map, closest_entry, closest_parsed_entry)

beancount.parser.grammar

Beancount 语法构建器。

beancount.parser.grammar.Builder (LexBuilder)

一种由词法分析器和语法解析器使用的构建器,作为回调函数,用于根据输入文件中解析出的规则创建对应的数据对象。

beancount.parser.grammar.Builder.account(self, filename, lineno, account)

检查账户名称的有效性。

参数:
  • account – 一个字符串,表示账户名称。

返回:
  • 一个字符串,表示账户名称。

源代码位于 beancount/parser/grammar.py
def account(self, filename, lineno, account):
    """Check account name validity.

    Args:
      account: a str, the account name.
    Returns:
      A string, the account name.
    """
    if not self.account_regexp.match(account):
        meta = new_metadata(filename, lineno)
        self.errors.append(
            ParserError(meta, "Invalid account name: {}".format(account), None))
    # Intern account names. This should reduces memory usage a
    # fair bit because these strings are repeated liberally.
    return self.accounts.setdefault(account, account)

beancount.parser.grammar.Builder.amount(self, filename, lineno, number, currency)

处理金额语法规则。

参数:
  • number – 一个 Decimal 实例,表示金额的数量。

  • currency – 一个货币对象(实际上是一个字符串,请参阅上方的 CURRENCY)

返回:
  • 一个 Amount 实例。

源代码位于 beancount/parser/grammar.py
def amount(self, filename, lineno, number, currency):
    """Process an amount grammar rule.

    Args:
      number: a Decimal instance, the number of the amount.
      currency: a currency object (a str, really, see CURRENCY above)
    Returns:
      An instance of Amount.
    """
    # Update the mapping that stores the parsed precisions.
    # Note: This is relatively slow, adds about 70ms because of number.as_tuple().
    self._dcupdate(number, currency)
    return Amount(number, currency)

beancount.parser.grammar.Builder.balance(self, filename, lineno, date, account, amount, tolerance, kvlist)

处理平衡声明。

默认情况下,我们在此处不产生任何错误。我们将在后续的验证例程中替换那些失败的断言,以确认它们是否成功或失败。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • account – 一个字符串,表示要平衡的账户。

  • amount – 预期的金额,用于校验。

  • tolerance – 容差数值。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Balance 对象。

源代码位于 beancount/parser/grammar.py
def balance(self, filename, lineno, date, account, amount, tolerance, kvlist):
    """Process an assertion directive.

    We produce no errors here by default. We replace the failing ones in the
    routine that does the verification later one, that these have succeeded
    or failed.

    Args:
      filename: The current filename.
      lineno: The current line number.
      date: A datetime object.
      account: A string, the account to balance.
      amount: The expected amount, to be checked.
      tolerance: The tolerance number.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Balance object.
    """
    diff_amount = None
    meta = new_metadata(filename, lineno, kvlist)
    return Balance(meta, date, account, amount, tolerance, diff_amount)

beancount.parser.grammar.Builder.build_grammar_error(self, filename, lineno, exc_value, exc_type=None, exc_traceback=None)

构建一个语法错误,并将其添加到待处理错误列表中。

参数:
  • filename – 当前文件名

  • lineno – 当前行号

  • excvalue – 异常值,或一个字符串,表示错误信息。

  • exc_type – 异常类型,如果发生了异常。

  • exc_traceback – 一个回溯对象。

源代码位于 beancount/parser/grammar.py
def build_grammar_error(self, filename, lineno, exc_value,
                        exc_type=None, exc_traceback=None):
    """Build a grammar error and appends it to the list of pending errors.

    Args:
      filename: The current filename
      lineno: The current line number
      excvalue: The exception value, or a str, the message of the error.
      exc_type: An exception type, if an exception occurred.
      exc_traceback: A traceback object.
    """
    if exc_type is not None:
        assert not isinstance(exc_value, str)
        strings = traceback.format_exception_only(exc_type, exc_value)
        tblist = traceback.extract_tb(exc_traceback)
        filename, lineno, _, __ = tblist[0]
        message = '{} ({}:{})'.format(strings[0], filename, lineno)
    else:
        message = str(exc_value)
    meta = new_metadata(filename, lineno)
    self.errors.append(
        ParserSyntaxError(meta, message, None))

beancount.parser.grammar.Builder.close(self, filename, lineno, date, account, kvlist)

处理关闭声明。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • account – 一个字符串,表示账户名称。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Close 对象。

源代码位于 beancount/parser/grammar.py
def close(self, filename, lineno, date, account, kvlist):
    """Process a close directive.

    Args:
      filename: The current filename.
      lineno: The current line number.
      date: A datetime object.
      account: A string, the name of the account.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Close object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    return Close(meta, date, account)

beancount.parser.grammar.Builder.commodity(self, filename, lineno, date, currency, kvlist)

处理关闭声明。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • currency – 一个字符串,表示要声明的货币单位。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Close 对象。

源代码位于 beancount/parser/grammar.py
def commodity(self, filename, lineno, date, currency, kvlist):
    """Process a close directive.

    Args:
      filename: The current filename.
      lineno: The current line number.
      date: A datetime object.
      currency: A string, the commodity being declared.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Close object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    return Commodity(meta, date, currency)

beancount.parser.grammar.Builder.compound_amount(self, filename, lineno, number_per, number_total, currency)

处理金额语法规则。

参数:
  • number_per – 一个 Decimal 实例,表示每股的成本。

  • number_total – 一个 Decimal 实例,表示所有股份的总成本。

  • currency – 一个货币对象(实际上是一个字符串,请参阅上方的 CURRENCY)

返回:
  • 一个三元组 (Decimal, Decimal, 货币字符串),用于在计算最终单位成本时进一步处理。

源代码位于 beancount/parser/grammar.py
def compound_amount(self, filename, lineno, number_per, number_total, currency):
    """Process an amount grammar rule.

    Args:
      number_per: a Decimal instance, the number of the cost per share.
      number_total: a Decimal instance, the number of the cost over all shares.
      currency: a currency object (a str, really, see CURRENCY above)
    Returns:
      A triple of (Decimal, Decimal, currency string) to be processed further when
      creating the final per-unit cost number.
    """
    # Update the mapping that stores the parsed precisions.
    # Note: This is relatively slow, adds about 70ms because of number.as_tuple().
    self._dcupdate(number_per, currency)
    self._dcupdate(number_total, currency)

    # Note that we are not able to reduce the value to a number per-share
    # here because we only get the number of units in the full lot spec.
    return CompoundAmount(number_per, number_total, currency)

beancount.parser.grammar.Builder.cost_merge(self, filename, lineno, _)

创建一个“合并成本”标记。

源代码位于 beancount/parser/grammar.py
def cost_merge(self, filename, lineno, _):
    """Create a 'merge cost' token."""
    return MERGE_COST

beancount.parser.grammar.Builder.cost_spec(self, filename, lineno, cost_comp_list, is_total)

处理 cost_spec 语法规则。

参数:
  • cost_comp_list – 一个包含 CompoundAmount、datetime.date 或标签 ID 字符串的列表。

  • is_total – 假设仅指定了总成本;拒绝使用 <number> # <number> 语法,即不允许指定复合数量。此参数用于支持 {{...}} 语法。

返回:
  • 一个成本信息元组,包含 CompoundAmount、仓位日期和标签字符串。这些项中的任意一项均可设为表示“未设置”的哨兵值。

源代码位于 beancount/parser/grammar.py
def cost_spec(self, filename, lineno, cost_comp_list, is_total):
    """Process a cost_spec grammar rule.

    Args:
      cost_comp_list: A list of CompoundAmount, a datetime.date, or
        label ID strings.
      is_total: Assume only the total cost is specified; reject the <number> # <number>
          syntax, that is, no compound amounts may be specified. This is used to support
          the {{...}} syntax.
    Returns:
      A cost-info tuple of CompoundAmount, lot date and label string. Any of these
      may be set to a sentinel indicating "unset".
    """
    if not cost_comp_list:
        return CostSpec(MISSING, None, MISSING, None, None, False)
    assert isinstance(cost_comp_list, list), (
        "Internal error in parser: {}".format(cost_comp_list))

    compound_cost = None
    date_ = None
    label = None
    merge = None
    for comp in cost_comp_list:
        if isinstance(comp, CompoundAmount):
            if compound_cost is None:
                compound_cost = comp
            else:
                self.errors.append(
                    ParserError(new_metadata(filename, lineno),
                                "Duplicate cost: '{}'.".format(comp), None))

        elif isinstance(comp, date):
            if date_ is None:
                date_ = comp
            else:
                self.errors.append(
                    ParserError(new_metadata(filename, lineno),
                                "Duplicate date: '{}'.".format(comp), None))

        elif comp is MERGE_COST:
            if merge is None:
                merge = True
                self.errors.append(
                    ParserError(new_metadata(filename, lineno),
                                "Cost merging is not supported yet", None))
            else:
                self.errors.append(
                    ParserError(new_metadata(filename, lineno),
                                "Duplicate merge-cost spec", None))

        else:
            assert isinstance(comp, str), (
                "Currency component is not string: '{}'".format(comp))
            if label is None:
                label = comp
            else:
                self.errors.append(
                    ParserError(new_metadata(filename, lineno),
                                "Duplicate label: '{}'.".format(comp), None))

    # If there was a cost_comp_list, thus a "{...}" cost basis spec, you must
    # indicate that by creating a CompoundAmount(), always.

    if compound_cost is None:
        number_per, number_total, currency = MISSING, None, MISSING
    else:
        number_per, number_total, currency = compound_cost
        if is_total:
            if number_total is not None:
                self.errors.append(
                    ParserError(
                        new_metadata(filename, lineno),
                        ("Per-unit cost may not be specified using total cost "
                         "syntax: '{}'; ignoring per-unit cost").format(compound_cost),
                        None))
                # Ignore per-unit number.
                number_per = ZERO
            else:
                # There's a single number specified; interpret it as a total cost.
                number_total = number_per
                number_per = ZERO

    if merge is None:
        merge = False

    return CostSpec(number_per, number_total, currency, date_, label, merge)

beancount.parser.grammar.Builder.custom(self, filename, lineno, date, dir_type, custom_values, kvlist)

处理自定义指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • dir_type – 一个字符串,表示正在解析的自定义指令类型。

  • custom_values – 同一行上看到的各种标记的列表。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Custom 对象。

源代码位于 beancount/parser/grammar.py
def custom(self, filename, lineno, date, dir_type, custom_values, kvlist):
    """Process a custom directive.

    Args:
      filename: the current filename.
      lineno: the current line number.
      date: a datetime object.
      dir_type: A string, a type for the custom directive being parsed.
      custom_values: A list of the various tokens seen on the same line.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Custom object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    return Custom(meta, date, dir_type, custom_values)

beancount.parser.grammar.Builder.custom_value(self, filename, lineno, value, dtype=None)

创建一个自定义值对象及其类型。

参数:
  • value – 一个被接受的自定义值。

返回:
  • 一个二元组 (value, dtype),其中 'dtype' 是该值的数据类型。

源代码位于 beancount/parser/grammar.py
def custom_value(self, filename, lineno, value, dtype=None):
    """Create a custom value object, along with its type.

    Args:
      value: One of the accepted custom values.
    Returns:
      A pair of (value, dtype) where 'dtype' is the datatype is that of the
      value.
    """
    if dtype is None:
        dtype = type(value)
    return ValueType(value, dtype)

beancount.parser.grammar.Builder.document(self, filename, lineno, date, account, document_filename, tags_links, kvlist)

处理文档指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • account – 一个 Account 实例。

  • document_filename – 一个字符串,表示文档文件的名称。

  • tags_links – 当前的 TagsLinks 累积器。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Document 对象。

源代码位于 beancount/parser/grammar.py
def document(self, filename, lineno, date, account, document_filename, tags_links,
             kvlist):
    """Process a document directive.

    Args:
      filename: the current filename.
      lineno: the current line number.
      date: a datetime object.
      account: an Account instance.
      document_filename: a str, the name of the document file.
      tags_links: The current TagsLinks accumulator.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Document object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    if not path.isabs(document_filename):
        document_filename = path.abspath(path.join(path.dirname(filename),
                                                   document_filename))
    tags, links = self._finalize_tags_links(tags_links.tags, tags_links.links)
    return Document(meta, date, account, document_filename, tags, links)

beancount.parser.grammar.Builder.event(self, filename, lineno, date, event_type, description, kvlist)

处理一个事件指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • event_type – 一个字符串,事件类型的名称。

  • description – 一个字符串,事件的值或内容。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Event 对象。

源代码位于 beancount/parser/grammar.py
def event(self, filename, lineno, date, event_type, description, kvlist):
    """Process an event directive.

    Args:
      filename: the current filename.
      lineno: the current line number.
      date: a datetime object.
      event_type: a str, the name of the event type.
      description: a str, the event value, the contents.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Event object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    return Event(meta, date, event_type, description)

beancount.parser.grammar.Builder.finalize(self)

完成解析器,检查最终错误并返回三元组。

返回:
  • 条目三元组 – 解析后的指令列表,可能需要补全;errors:错误列表,应为空;options_map:选项字典。

源代码位于 beancount/parser/grammar.py
def finalize(self):
    """Finalize the parser, check for final errors and return the triple.

    Returns:
      A triple of
        entries: A list of parsed directives, which may need completion.
        errors: A list of errors, hopefully empty.
        options_map: A dict of options.
    """
    # If the user left some tags unbalanced, issue an error.
    for tag in self.tags:
        meta = new_metadata(self.options['filename'], 0)
        self.errors.append(
            ParserError(meta, "Unbalanced pushed tag: '{}'".format(tag), None))

    # If the user left some metadata unpopped, issue an error.
    for key, value_list in self.meta.items():
        meta = new_metadata(self.options['filename'], 0)
        self.errors.append(
            ParserError(meta, (
                "Unbalanced metadata key '{}'; leftover metadata '{}'").format(
                    key, ', '.join(value_list)), None))

    # Weave the commas option in the DisplayContext itself, so it propagates
    # everywhere it is used automatically.
    self.dcontext.set_commas(self.options['render_commas'])

    return (self.get_entries(), self.errors, self.get_options())

beancount.parser.grammar.Builder.get_entries(self)

返回累积的条目。

返回:
  • 已排序的指令列表。

源代码位于 beancount/parser/grammar.py
def get_entries(self):
    """Return the accumulated entries.

    Returns:
      A list of sorted directives.
    """
    return sorted(self.entries, key=data.entry_sortkey)

beancount.parser.grammar.Builder.get_long_string_maxlines(self)

参见基类。

源代码位于 beancount/parser/grammar.py
def get_long_string_maxlines(self):
    """See base class."""
    return self.options['long_string_maxlines']

beancount.parser.grammar.Builder.get_options(self)

返回最终的选项映射。

返回:
  • 选项名称到选项的字典。

源代码位于 beancount/parser/grammar.py
def get_options(self):
    """Return the final options map.

    Returns:
      A dict of option names to options.
    """
    # Build and store the inferred DisplayContext instance.
    self.options['dcontext'] = self.dcontext

    return self.options

beancount.parser.grammar.Builder.handle_list(self, filename, lineno, object_list, new_object)

通用处理递归列表语法规则。

参数:
  • object_list – 当前的对象列表。

  • new_object – 要添加的新对象。

返回:
  • 更新后的新对象列表。

源代码位于 beancount/parser/grammar.py
def handle_list(self, filename, lineno, object_list, new_object):
    """Handle a recursive list grammar rule, generically.

    Args:
      object_list: the current list of objects.
      new_object: the new object to be added.
    Returns:
      The new, updated list of objects.
    """
    if object_list is None:
        object_list = []
    if new_object is not None:
        object_list.append(new_object)
    return object_list

beancount.parser.grammar.Builder.include(self, filename, lineno, include_filename)

处理 include 指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • include_name – 一个字符串,要包含的文件名。

源代码位于 beancount/parser/grammar.py
def include(self, filename, lineno, include_filename):
    """Process an include directive.

    Args:
      filename: current filename.
      lineno: current line number.
      include_name: A string, the name of the file to include.
    """
    self.options['include'].append(include_filename)

beancount.parser.grammar.Builder.key_value(self, filename, lineno, key, value)

处理文档指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • account – 一个字符串,该文档关联的账户。

  • document_filename – 一个字符串,文档文件的名称。

返回:
  • 一个新的 KeyValue 对象。

源代码位于 beancount/parser/grammar.py
def key_value(self, filename, lineno, key, value):
    """Process a document directive.

    Args:
      filename: The current filename.
      lineno: The current line number.
      date: A datetime object.
      account: A string, the account the document relates to.
      document_filename: A str, the name of the document file.
    Returns:
      A new KeyValue object.
    """
    return KeyValue(key, value)

beancount.parser.grammar.Builder.note(self, filename, lineno, date, account, comment, tags_links, kvlist)

处理注释指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • account – 一个字符串,指定要附加注释的账户。

  • comment – 一个字符串,表示注释的内容。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Note 对象。

源代码位于 beancount/parser/grammar.py
def note(self, filename, lineno, date, account, comment, tags_links, kvlist):
    """Process a note directive.

    Args:
      filename: The current filename.
      lineno: The current line number.
      date: A datetime object.
      account: A string, the account to attach the note to.
      comment: A str, the note's comments contents.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Note object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    tags, links = self._finalize_tags_links(tags_links.tags, tags_links.links)
    return Note(meta, date, account, comment, tags, links)

beancount.parser.grammar.Builder.open(self, filename, lineno, date, account, currencies, booking_str, kvlist)

处理开户指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • account – 一个字符串,表示账户名称。

  • currencies – 一个约束货币的列表。

  • booking_str – 一个字符串,表示记账方式;若未指定则为 None。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Open 对象。

源代码位于 beancount/parser/grammar.py
def open(self, filename, lineno, date, account, currencies, booking_str, kvlist):
    """Process an open directive.

    Args:
      filename: The current filename.
      lineno: The current line number.
      date: A datetime object.
      account: A string, the name of the account.
      currencies: A list of constraint currencies.
      booking_str: A string, the booking method, or None if none was specified.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Open object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    error = False
    if booking_str:
        try:
            # Note: Somehow the 'in' membership operator is not defined on Enum.
            booking = Booking[booking_str]
        except KeyError:
            # If the per-account method is invalid, set it to the global
            # default method and continue.
            booking = self.options['booking_method']
            error = True
    else:
        booking = None

    entry = Open(meta, date, account, currencies, booking)
    if error:
        self.errors.append(ParserError(meta,
                                       "Invalid booking method: {}".format(booking_str),
                                       entry))
    return entry

beancount.parser.grammar.Builder.option(self, filename, lineno, key, value)

处理选项指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • key – 选项的键(str)

  • value – 选项的值

源代码位于 beancount/parser/grammar.py
def option(self, filename, lineno, key, value):
    """Process an option directive.

    Args:
      filename: current filename.
      lineno: current line number.
      key: option's key (str)
      value: option's value
    """
    if key not in self.options:
        meta = new_metadata(filename, lineno)
        self.errors.append(
            ParserError(meta, "Invalid option: '{}'".format(key), None))

    elif key in options.READ_ONLY_OPTIONS:
        meta = new_metadata(filename, lineno)
        self.errors.append(
            ParserError(meta, "Option '{}' may not be set".format(key), None))

    else:
        option_descriptor = options.OPTIONS[key]

        # Issue a warning if the option is deprecated.
        if option_descriptor.deprecated:
            assert isinstance(option_descriptor.deprecated, str), "Internal error."
            meta = new_metadata(filename, lineno)
            self.errors.append(
                DeprecatedError(meta, option_descriptor.deprecated, None))

        # Rename the option if it has an alias.
        if option_descriptor.alias:
            key = option_descriptor.alias
            option_descriptor = options.OPTIONS[key]

        # Convert the value, if necessary.
        if option_descriptor.converter:
            try:
                value = option_descriptor.converter(value)
            except ValueError as exc:
                meta = new_metadata(filename, lineno)
                self.errors.append(
                    ParserError(meta,
                                "Error for option '{}': {}".format(key, exc),
                                None))
                return

        option = self.options[key]
        if isinstance(option, list):
            # Append to a list of values.
            option.append(value)

        elif isinstance(option, dict):
            # Set to a dict of values.
            if not (isinstance(value, tuple) and len(value) == 2):
                self.errors.append(
                    ParserError(
                        meta, "Error for option '{}': {}".format(key, value), None))
                return
            dict_key, dict_value = value
            option[dict_key] = dict_value

        elif isinstance(option, bool):
            # Convert to a boolean.
            if not isinstance(value, bool):
                value = (value.lower() in {'true', 'on'}) or (value == '1')
            self.options[key] = value

        else:
            # Set the value.
            self.options[key] = value

        # Refresh the list of valid account regexps as we go along.
        if key.startswith('name_'):
            # Update the set of valid account types.
            self.account_regexp = valid_account_regexp(self.options)

beancount.parser.grammar.Builder.pad(self, filename, lineno, date, account, source_account, kvlist)

处理补足指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • account – 一个字符串,指定要被补足的账户。

  • source_account – 一个字符串,指定从中补足的账户。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Pad 对象。

源代码位于 beancount/parser/grammar.py
def pad(self, filename, lineno, date, account, source_account, kvlist):
    """Process a pad directive.

    Args:
      filename: The current filename.
      lineno: The current line number.
      date: A datetime object.
      account: A string, the account to be padded.
      source_account: A string, the account to pad from.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Pad object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    return Pad(meta, date, account, source_account)

beancount.parser.grammar.Builder.pipe_deprecated_error(self, filename, lineno)

发出“管道已弃用”错误。

参数:
  • filename – 当前文件名

  • lineno – 当前行号

源代码位于 beancount/parser/grammar.py
def pipe_deprecated_error(self, filename, lineno):
    """Issue a 'Pipe deprecated' error.

    Args:
      filename: The current filename
      lineno: The current line number
    """
    if self.options['allow_pipe_separator']:
        return
    meta = new_metadata(filename, lineno)
    self.errors.append(
        ParserSyntaxError(meta, "Pipe symbol is deprecated.", None))

beancount.parser.grammar.Builder.plugin(self, filename, lineno, plugin_name, plugin_config)

处理插件指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • plugin_name – 一个字符串,指定要导入的插件模块名称。

  • plugin_config – 一个字符串或 None,可选的配置字符串,用于传递给插件模块。

源代码位于 beancount/parser/grammar.py
def plugin(self, filename, lineno, plugin_name, plugin_config):
    """Process a plugin directive.

    Args:
      filename: current filename.
      lineno: current line number.
      plugin_name: A string, the name of the plugin module to import.
      plugin_config: A string or None, an optional configuration string to
        pass in to the plugin module.
    """
    self.options['plugin'].append((plugin_name, plugin_config))

beancount.parser.grammar.Builder.popmeta(self, filename, lineno, key)

从当前元数据栈中移除一个键。

参数:
  • key – 一个字符串,要从元数据字典中移除的键。

源代码位于 beancount/parser/grammar.py
def popmeta(self, filename, lineno, key):
    """Removed a key off the current set of stacks.

    Args:
      key: A string, a key to be removed from the meta dict.
    """
    try:
        if key not in self.meta:
            raise IndexError
        value_list = self.meta[key]
        value_list.pop(-1)
        if not value_list:
            self.meta.pop(key)
    except IndexError:
        meta = new_metadata(filename, lineno)
        self.errors.append(
            ParserError(meta,
                        "Attempting to pop absent metadata key: '{}'".format(key),
                        None))

beancount.parser.grammar.Builder.poptag(self, filename, lineno, tag)

从当前的堆栈集合中弹出一个标签。

参数:
  • tag – 一个字符串,表示要从当前标签集合中移除的标签。

源代码位于 beancount/parser/grammar.py
def poptag(self, filename, lineno, tag):
    """Pop a tag off the current set of stacks.

    Args:
      tag: A string, a tag to be removed from the current set of tags.
    """
    try:
        self.tags.remove(tag)
    except ValueError:
        meta = new_metadata(filename, lineno)
        self.errors.append(
            ParserError(meta, "Attempting to pop absent tag: '{}'".format(tag), None))

beancount.parser.grammar.Builder.posting(self, filename, lineno, account, units, cost, price, istotal, flag)

处理一个分录语法规则。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • account – 一个字符串,表示分录的账户。

  • units – 一个 Amount 实例,表示数量。

  • cost – 一个 CostSpec 实例,表示成本。

  • price – 为 None,或一个 Amount 实例,表示持仓的价格。

  • istotal – 一个布尔值,若为 True 表示价格是针对整个解析金额的,若为 False 则表示价格是针对每个持仓批次的。

  • flag – 一个单字符字符串,表示与此分录关联的标记。

返回:
  • 一个新的 Posting 对象,无父级条目。

源代码位于 beancount/parser/grammar.py
def posting(self, filename, lineno, account, units, cost, price, istotal, flag):
    """Process a posting grammar rule.

    Args:
      filename: the current filename.
      lineno: the current line number.
      account: A string, the account of the posting.
      units: An instance of Amount for the units.
      cost: An instance of CostSpec for the cost.
      price: Either None, or an instance of Amount that is the cost of the position.
      istotal: A bool, True if the price is for the total amount being parsed, or
               False if the price is for each lot of the position.
      flag: A string, one-character, the flag associated with this posting.
    Returns:
      A new Posting object, with no parent entry.
    """
    meta = new_metadata(filename, lineno)

    # Prices may not be negative.
    if price and isinstance(price.number, Decimal) and price.number < ZERO:
        self.errors.append(
            ParserError(meta, (
                "Negative prices are not allowed: {} "
                "(see http://furius.ca/beancount/doc/bug-negative-prices "
                "for workaround)"
            ).format(price), None))
        # Fix it and continue.
        price = Amount(abs(price.number), price.currency)

    # If the price is specified for the entire amount, compute the effective
    # price here and forget about that detail of the input syntax.
    if istotal:
        if units.number is MISSING:
            # Note: we could potentially do a better job and attempt to fix
            # this up after interpolation, but this syntax is pretty rare
            # anyway.
            self.errors.append(ParserError(
                meta, ("Total price on a posting without units: {}.").format(price),
                None))
            price = None
        else:
            price_number = price.number
            if price_number is not MISSING:
                price_number = (ZERO
                                if units.number == ZERO
                                else price_number/abs(units.number))
                price = Amount(price_number, price.currency)

    # Note: Allow zero prices because we need them for round-trips for
    # conversion entries.
    #
    # if price is not None and price.number == ZERO:
    #     self.errors.append(
    #         ParserError(meta, "Price is zero: {}".format(price), None))

    # If both cost and price are specified, the currencies must match, or
    # that is an error.
    if (cost is not None and
        price is not None and
        isinstance(cost.currency, str) and
        isinstance(price.currency, str) and
        cost.currency != price.currency):
        self.errors.append(
            ParserError(meta,
                        "Cost and price currencies must match: {} != {}".format(
                            cost.currency, price.currency), None))

    return Posting(account, units, cost, price, chr(flag) if flag else None, meta)

beancount.parser.grammar.Builder.price(self, filename, lineno, date, currency, amount, kvlist)

处理一个价格指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • currency – 要定价的货币。

  • amount – 一个 Amount 实例,表示该货币的价格。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Price 对象。

源代码位于 beancount/parser/grammar.py
def price(self, filename, lineno, date, currency, amount, kvlist):
    """Process a price directive.

    Args:
      filename: the current filename.
      lineno: the current line number.
      date: a datetime object.
      currency: the currency to be priced.
      amount: an instance of Amount, that is the price of the currency.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Price object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    return Price(meta, date, currency, amount)

beancount.parser.grammar.Builder.pushmeta(self, filename, lineno, key_value)

将元数据字段设置到当前的键值对集合中,以添加到交易中。

参数:
  • key_value – 一个 KeyValue 实例,将被添加到元数据字典中。

源代码位于 beancount/parser/grammar.py
def pushmeta(self, filename, lineno, key_value):
    """Set a metadata field on the current key-value pairs to be added to transactions.

    Args:
      key_value: A KeyValue instance, to be added to the dict of metadata.
    """
    key, value = key_value
    self.meta[key].append(value)

beancount.parser.grammar.Builder.pushtag(self, filename, lineno, tag)

将一个标签压入当前的标签集合。

注意,此操作不需要保持堆栈顺序。

参数:
  • tag – 一个字符串,表示要添加的标签。

源代码位于 beancount/parser/grammar.py
def pushtag(self, filename, lineno, tag):
    """Push a tag on the current set of tags.

    Note that this does not need to be stack ordered.

    Args:
      tag: A string, a tag to be added.
    """
    self.tags.append(tag)

beancount.parser.grammar.Builder.query(self, filename, lineno, date, query_name, query_string, kvlist)

处理文档指令。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • query_name – 一个字符串,表示查询的名称。

  • query_string – 一个字符串,表示实际的 SQL 查询语句。

  • kvlist – 一组 KeyValue 实例的列表。

返回:
  • 一个新的 Query 对象。

源代码位于 beancount/parser/grammar.py
def query(self, filename, lineno, date, query_name, query_string, kvlist):
    """Process a document directive.

    Args:
      filename: the current filename.
      lineno: the current line number.
      date: a datetime object.
      query_name: a str, the name of the query.
      query_string: a str, the SQL query itself.
      kvlist: a list of KeyValue instances.
    Returns:
      A new Query object.
    """
    meta = new_metadata(filename, lineno, kvlist)
    return Query(meta, date, query_name, query_string)

beancount.parser.grammar.Builder.store_result(self, filename, lineno, entries)

开始规则在此处存储最终结果。

参数:
  • entries – 要存储的条目列表。

源代码位于 beancount/parser/grammar.py
def store_result(self, filename, lineno, entries):
    """Start rule stores the final result here.

    Args:
      entries: A list of entries to store.
    """
    if entries:
        self.entries = entries
    # Also record the name of the processed file.
    self.options['filename'] = filename

将链接添加到 TagsLinks 累加器中。

参数:
  • tags_links – 当前的 TagsLinks 累积器。

  • link – 一个字符串,表示要插入的新链接。

返回:
  • 更新后的 TagsLinks 实例。

源代码位于 beancount/parser/grammar.py
def tag_link_LINK(self, filename, lineno, tags_links, link):
    """Add a link to the TagsLinks accumulator.

    Args:
      tags_links: The current TagsLinks accumulator.
      link: A string, the new link to insert.
    Returns:
      An updated TagsLinks instance.
    """
    tags_links.links.add(link)
    return tags_links

将标签添加到 TagsLinks 累加器中。

参数:
  • tags_links – 当前的 TagsLinks 累积器。

  • tag – 一个字符串,表示要插入的新标签。

返回:
  • 更新后的 TagsLinks 实例。

源代码位于 beancount/parser/grammar.py
def tag_link_TAG(self, filename, lineno, tags_links, tag):
    """Add a tag to the TagsLinks accumulator.

    Args:
      tags_links: The current TagsLinks accumulator.
      tag: A string, the new tag to insert.
    Returns:
      An updated TagsLinks instance.
    """
    tags_links.tags.add(tag)
    return tags_links

创建一个新的 TagsLinks 实例。

返回:
  • 一个初始化了预期属性的 TagsLinks 实例。

源代码位于 beancount/parser/grammar.py
def tag_link_new(self, filename, lineno):
    """Create a new TagsLinks instance.

    Returns:
      An instance of TagsLinks, initialized with expected attributes.
    """
    return TagsLinks(set(), set())

beancount.parser.grammar.Builder.transaction(self, filename, lineno, date, flag, txn_strings, tags_links, posting_or_kv_list)

处理交易指令。

此时所有交易的分录均已可用,因此交易在此处达到平衡,不完整的分录将通过适当的位置补全,错误将累积在构建器中,以便后续报告。

这是占用大部分解析时间的主要例程;请谨慎修改此处代码,因为它们会对性能产生影响。

参数:
  • filename – 当前文件名。

  • lineno – 当前行号。

  • date – 一个 datetime 对象。

  • flag – 一个字符串,包含与该交易关联的单字符标志。

  • txn_strings – 一个字符串列表,可能为空,也可能包含多个元素。

  • tags_links – 一个包含标签和/或链接的 TagsLinks 命名元组。

  • posting_or_kv_list – 一个 Posting 或 KeyValue 实例的列表,用于插入到此交易中;若未声明任何分录,则为 None。

返回:
  • 一个新的 Transaction 对象。

源代码位于 beancount/parser/grammar.py
def transaction(self, filename, lineno, date, flag, txn_strings, tags_links,
                posting_or_kv_list):
    """Process a transaction directive.

    All the postings of the transaction are available at this point, and so the
    the transaction is balanced here, incomplete postings are completed with the
    appropriate position, and errors are being accumulated on the builder to be
    reported later on.

    This is the main routine that takes up most of the parsing time; be very
    careful with modifications here, they have an impact on performance.

    Args:
      filename: the current filename.
      lineno: the current line number.
      date: a datetime object.
      flag: a str, one-character, the flag associated with this transaction.
      txn_strings: A list of strings, possibly empty, possibly longer.
      tags_links: A TagsLinks namedtuple of tags, and/or links.
      posting_or_kv_list: a list of Posting or KeyValue instances, to be inserted in
        this transaction, or None, if no postings have been declared.
    Returns:
      A new Transaction object.
    """
    meta = new_metadata(filename, lineno)

    # Separate postings and key-values.
    explicit_meta = {}
    postings = []
    tags, links = tags_links.tags, tags_links.links
    if posting_or_kv_list:
        last_posting = None
        for posting_or_kv in posting_or_kv_list:
            if isinstance(posting_or_kv, Posting):
                postings.append(posting_or_kv)
                last_posting = posting_or_kv
            elif isinstance(posting_or_kv, TagsLinks):
                if postings:
                    self.errors.append(ParserError(
                        meta,
                        "Tags or links not allowed after first " +
                        "Posting: {}".format(posting_or_kv), None))
                else:
                    tags.update(posting_or_kv.tags)
                    links.update(posting_or_kv.links)
            else:
                if last_posting is None:
                    value = explicit_meta.setdefault(posting_or_kv.key,
                                                     posting_or_kv.value)
                    if value is not posting_or_kv.value:
                        self.errors.append(ParserError(
                            meta, "Duplicate metadata field on entry: {}".format(
                                posting_or_kv), None))
                else:
                    if last_posting.meta is None:
                        last_posting = last_posting._replace(meta={})
                        postings.pop(-1)
                        postings.append(last_posting)

                    value = last_posting.meta.setdefault(posting_or_kv.key,
                                                         posting_or_kv.value)
                    if value is not posting_or_kv.value:
                        self.errors.append(ParserError(
                            meta, "Duplicate posting metadata field: {}".format(
                                posting_or_kv), None))

    # Freeze the tags & links or set to default empty values.
    tags, links = self._finalize_tags_links(tags, links)

    # Initialize the metadata fields from the set of active values.
    if self.meta:
        for key, value_list in self.meta.items():
            meta[key] = value_list[-1]

    # Add on explicitly defined values.
    if explicit_meta:
        meta.update(explicit_meta)

    # Unpack the transaction fields.
    payee_narration = self._unpack_txn_strings(txn_strings, meta)
    if payee_narration is None:
        return None
    payee, narration = payee_narration

    # We now allow a single posting when its balance is zero, so we
    # commented out the check below. If a transaction has a single posting
    # with a non-zero balance, it'll get caught below in the booking code.
    #
    # # Detect when a transaction does not have at least two legs.
    # if postings is None or len(postings) < 2:
    #     self.errors.append(
    #         ParserError(meta,
    #                     "Transaction with only one posting: {}".format(postings),
    #                     None))
    #     return None

    # If there are no postings, make sure we insert a list object.
    if postings is None:
        postings = []

    # Create the transaction.
    return Transaction(meta, date, chr(flag),
                       payee, narration, tags, links, postings)

beancount.parser.grammar.CompoundAmount (tuple)

CompoundAmount(number_per, number_total, currency)

beancount.parser.grammar.CompoundAmount.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.grammar.CompoundAmount.__new__(_cls, number_per, number_total, currency) 特殊 静态方法

创建一个新的 CompoundAmount(number_per, number_total, currency) 实例。

beancount.parser.grammar.CompoundAmount.__replace__(/, self, **kwds) 特殊

返回一个新的 CompoundAmount 对象,用指定的新值替换相应字段。

源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.grammar.CompoundAmount.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/grammar.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.grammar.DeprecatedError (tuple)

DeprecatedError(source, message, entry)

beancount.parser.grammar.DeprecatedError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.grammar.DeprecatedError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 DeprecatedError(source, message, entry) 的新实例

beancount.parser.grammar.DeprecatedError.__replace__(/, self, **kwds) 特殊

返回一个新的 DeprecatedError 对象,用指定的新值替换字段

源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.grammar.DeprecatedError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/grammar.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.grammar.KeyValue (tuple)

KeyValue(key, value)

beancount.parser.grammar.KeyValue.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.grammar.KeyValue.__new__(_cls, key, value) 特殊 静态方法

创建 KeyValue(key, value) 的新实例

beancount.parser.grammar.KeyValue.__replace__(/, self, **kwds) 特殊

返回一个新的 KeyValue 对象,用指定的新值替换字段

源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.grammar.KeyValue.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/grammar.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.grammar.ParserError (tuple)

ParserError(source, message, entry)

beancount.parser.grammar.ParserError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.grammar.ParserError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 ParserError(source, message, entry) 的新实例

beancount.parser.grammar.ParserError.__replace__(/, self, **kwds) 特殊

返回一个新的 ParserError 对象,用指定的新值替换字段

源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.grammar.ParserError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/grammar.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.grammar.ParserSyntaxError (tuple)

ParserSyntaxError(source, message, entry)

beancount.parser.grammar.ParserSyntaxError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.grammar.ParserSyntaxError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 ParserSyntaxError(source, message, entry) 的新实例

beancount.parser.grammar.ParserSyntaxError.__replace__(/, self, **kwds) 特殊

返回一个新的 ParserSyntaxError 对象,用指定的新值替换字段

源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.grammar.ParserSyntaxError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/grammar.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

TagsLinks(tags, links)

beancount.parser.grammar.TagsLinks.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.grammar.TagsLinks.__new__(_cls, tags, links) 特殊 静态方法

创建一个新的 TagsLinks 实例 (tags, links)

beancount.parser.grammar.TagsLinks.__replace__(/, self, **kwds) 特殊

返回一个新的 TagsLinks 对象,用指定的新值替换字段

源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.grammar.TagsLinks.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/grammar.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

ValueType(value, dtype) (元组)

ValueType(value, dtype)

beancount.parser.grammar.ValueType.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/grammar.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.grammar.ValueType.__new__(_cls, value, dtype) 特殊 静态方法

创建一个新的 ValueType 实例 (value, dtype)

beancount.parser.grammar.ValueType.__replace__(/, self, **kwds) 特殊

返回一个新的 ValueType 对象,用指定的新值替换字段

源代码位于 beancount/parser/grammar.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.grammar.ValueType.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/grammar.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.grammar.valid_account_regexp(options)

根据选项构建用于验证账户名称的正则表达式。

参数:
  • options – 一个选项字典,格式参见 beancount.parser.options。

返回:
  • 一个字符串,即匹配所有账户名称的正则表达式。

源代码位于 beancount/parser/grammar.py
def valid_account_regexp(options):
    """Build a regexp to validate account names from the options.

    Args:
      options: A dict of options, as per beancount.parser.options.
    Returns:
      A string, a regular expression that will match all account names.
    """
    names = map(options.__getitem__, ('name_assets',
                                      'name_liabilities',
                                      'name_equity',
                                      'name_income',
                                      'name_expenses'))

    # Replace the first term of the account regular expression with the specific
    # names allowed under the options configuration. This code is kept in sync
    # with {5672c7270e1e}.
    return regex.compile("(?:{})(?:{}{})+".format('|'.join(names),
                                                  account.sep,
                                                  account.ACC_COMP_NAME_RE))

beancount.parser.hashsrc

计算源文件的哈希值,以便在源文件过期时发出警告。

beancount.parser.hashsrc.check_parser_source_files(parser_module)

检查扩展模块的源文件哈希值,若当前源文件与模块的源文件不一致,则发出警告。

如果源文件未位于 Python 源代码目录中,则忽略警告,因为我们可能正在从已安装的版本运行,此时无需检查(此检查仅对直接从源码运行的用户有用)。

源代码位于 beancount/parser/hashsrc.py
def check_parser_source_files(parser_module: types.ModuleType):
    """Check the extension module's source hash and issue a warning if the
    current source differs from that of the module.

    If the source files aren't located in the Python source directory, ignore
    the warning, we're probably running this from an installed based, in which
    case we don't need to check anything (this check is useful only for people
    running directly from source).
    """
    parser_source_hash = hash_parser_source_files()
    if parser_source_hash is None:
        return
    if parser_module.SOURCE_HASH and parser_module.SOURCE_HASH != parser_source_hash:
        warnings.warn(
            ("The Beancount parser C extension module is out-of-date ('{}' != '{}'). "
             "You need to rebuild.").format(parser_module.SOURCE_HASH, parser_source_hash))

beancount.parser.hashsrc.gen_include()

为解析器源哈希生成一个包含文件。

源代码位于 beancount/parser/hashsrc.py
def gen_include():
    """Generate an include file for the parser source hash."""
    return textwrap.dedent("""\
      #ifndef __BEANCOUNT_PARSER_PARSE_SOURCE_HASH_H__
      #define __BEANCOUNT_PARSER_PARSE_SOURCE_HASH_H__

      #define PARSER_SOURCE_HASH {source_hash}

      #endif // __BEANCOUNT_PARSER_PARSE_SOURCE_HASH_H__
    """.format(source_hash=hash_parser_source_files()))

beancount.parser.hashsrc.hash_parser_source_files()

计算解析器 Python 代码的唯一哈希值,以便将其嵌入扩展模块中。该哈希值在加载时用于验证扩展模块与对应的 Python 代码是否匹配;若不匹配,则发出警告,提示您应重新编译扩展模块。

返回:
  • 一个字符串,即相关源代码的十六进制唯一哈希值,用于触发重新编译。

源代码位于 beancount/parser/hashsrc.py
def hash_parser_source_files():
    """Compute a unique hash of the parser's Python code in order to bake that into
    the extension module. This is used at load-time to verify that the extension
    module and the corresponding Python codes match each other. If not, it
    issues a warning that you should rebuild your extension module.

    Returns:
      A string, the hexadecimal unique hash of relevant source code that should
      trigger a recompilation.
    """
    md5 = hashlib.md5()
    for filename in PARSER_SOURCE_FILES:
        fullname = path.join(path.dirname(__file__), filename)
        if not path.exists(fullname):
            return None
        with open(fullname, 'rb') as file:
            md5.update(file.read())
    # Note: Prepend a character in front of the hash because under Windows MSDEV
    # removes escapes, and if the hash starts with a number it fails to
    # recognize this is a string. A small compromise for portability.
    return md5.hexdigest()

beancount.parser.lexer

Beancount 语法词法分析器。

beancount.parser.lexer.LexBuilder

仅用于构建词法分析器对象的构建器。

beancount.parser.lexer.LexBuilder.build_lexer_error(self, filename, lineno, message)

构建一个词法分析错误,并将其添加到待处理错误列表中。

参数:
  • message – 错误消息。

源代码位于 beancount/parser/lexer.py
def build_lexer_error(self, filename, lineno, message): # {0e31aeca3363}
    """Build a lexer error and appends it to the list of pending errors.

    Args:
      message: The message of the error.
    """
    self.errors.append(
        LexerError(new_metadata(filename, lineno), str(message), None))

beancount.parser.lexer.LexerError (tuple)

LexerError(source, message, entry)

beancount.parser.lexer.LexerError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/lexer.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.lexer.LexerError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 LexerError(source, message, entry) 的新实例

beancount.parser.lexer.LexerError.__replace__(/, self, **kwds) 特殊

返回一个新的 LexerError 对象,用指定的新值替换字段

源代码位于 beancount/parser/lexer.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.lexer.LexerError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/lexer.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.lexer.lex_iter(file, builder=None)

一个迭代器,生成给定文件中的所有标记。

参数:
  • file – 一个字符串,表示要运行词法分析器的文件名,或一个文件对象。

  • builder – 可选的构建器。若未指定,则使用 LexBuilder 并在使用后丢弃(连同其错误信息)。

生成:输入文件中的所有标记,格式为 (token, lineno, text, value) 元组,其中 token 是表示标记类型的字符串,lineno 是匹配到该标记的输入文件行号,text 是包含精确匹配文本的字节对象,value 是标记的语义值,或为 None。

源代码位于 beancount/parser/lexer.py
def lex_iter(file, builder=None):
    """An iterator that yields all the tokens in the given file.

    Args:
      file: A string, the filename to run the lexer on, or a file object.
      builder: A builder of your choice. If not specified, a LexBuilder is
        used and discarded (along with its errors).
    Yields:
      All the tokens in the input file as ``(token, lineno, text,
      value)`` tuples where ``token`` is a string representing the
      token kind, ``lineno`` is the line number in the input file
      where the token was matched, ``mathed`` is a bytes object
      containing the exact text matched, and ``value`` is the semantic
      value of the token or None.
    """
    with contextlib.ExitStack() as ctx:
        # It would be more appropriate here to check for io.RawIOBase but
        # that does not work for io.BytesIO despite it implementing the
        # readinto() method.
        if not isinstance(file, io.IOBase):
            file = ctx.enter_context(open(file, 'rb'))
        if builder is None:
            builder = LexBuilder()
        parser = _parser.Parser(builder)
        yield from parser.lex(file)

beancount.parser.lexer.lex_iter_string(string, builder=None, **kwargs)

一个迭代器,生成给定字符串中的所有标记。

参数:
  • string – 一个 str 或 bytes,表示要解析的账本内容。

返回:
  • 一个迭代器,详情参见 lex_iter()

源代码位于 beancount/parser/lexer.py
def lex_iter_string(string, builder=None, **kwargs):
    """An iterator that yields all the tokens in the given string.

    Args:
      string: a str or bytes, the contents of the ledger to be parsed.
    Returns:
      An iterator, see ``lex_iter()`` for details.
    """
    if not isinstance(string, bytes):
        string = string.encode('utf8')
    file = io.BytesIO(string)
    yield from lex_iter(file, builder=builder, **kwargs)

beancount.parser.options

选项及其默认值的声明。

beancount.parser.options.OptDesc (tuple)

OptDesc(name, default_value, example_value, converter, deprecated, alias)

beancount.parser.options.OptDesc.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/options.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.options.OptDesc.__new__(_cls, name, default_value, example_value, converter, deprecated, alias) 特殊 静态方法

创建 OptDesc(name, default_value, example_value, converter, deprecated, alias) 的新实例

beancount.parser.options.OptDesc.__replace__(/, self, **kwds) 特殊

返回一个新的 OptDesc 对象,用指定的新值替换字段

源代码位于 beancount/parser/options.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.options.OptDesc.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/options.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.options.OptGroup (元组)

OptGroup(description, options)

beancount.parser.options.OptGroup.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/parser/options.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.parser.options.OptGroup.__new__(_cls, description, options) 特殊 静态方法

创建 OptGroup(description, options) 的新实例

beancount.parser.options.OptGroup.__replace__(/, self, **kwds) 特殊

返回一个新的 OptGroup 对象,用指定的新值替换字段

源代码位于 beancount/parser/options.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.parser.options.OptGroup.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/parser/options.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.parser.options.Opt(name, default_value, example_value=<object object at 0x7f6b937809d0>, converter=None, deprecated=False, alias=None)

OptDesc 的替代构造函数,带有默认值。

参数:
  • name – 请参阅 OptDesc。

  • default_value – 请参阅 OptDesc。

  • example_value – 请参阅 OptDesc。

  • converter – 请参阅 OptDesc。

  • deprecated – 请参阅 OptDesc。

  • alias – 请参阅 OptDesc。

返回:
  • OptDesc 的一个实例。

源代码位于 beancount/parser/options.py
def Opt(name, default_value,
        example_value=UNSET,
        converter=None,
        deprecated=False,
        alias=None):
    """Alternative constructor for OptDesc, with default values.

    Args:
      name: See OptDesc.
      default_value: See OptDesc.
      example_value: See OptDesc.
      converter: See OptDesc.
      deprecated: See OptDesc.
      alias: See OptDesc.
    Returns:
      An instance of OptDesc.
    """
    if example_value is UNSET:
        example_value = default_value
    return OptDesc(name, default_value, example_value, converter, deprecated, alias)

beancount.parser.options.get_account_types(options)

从解析器的选项中提取账户类型名称。

参数:
  • options – 一个包含账本选项的字典。

返回:
  • AccountTypes 的一个实例,包含所有前缀。

源代码位于 beancount/parser/options.py
def get_account_types(options):
    """Extract the account type names from the parser's options.

    Args:
      options: a dict of ledger options.
    Returns:
      An instance of AccountTypes, that contains all the prefixes.
    """
    return account_types.AccountTypes(
        *[options[key]
          for key in ("name_assets",
                      "name_liabilities",
                      "name_equity",
                      "name_income",
                      "name_expenses")])

beancount.parser.options.get_current_accounts(options)

返回当前收益和折算账户的名称。

参数:
  • options – 一个包含账本选项的字典。

返回:
  • 一个包含两个账户对象的元组,分别用于记录当前收益和当前折算。

源代码位于 beancount/parser/options.py
def get_current_accounts(options):
    """Return account names for the current earnings and conversion accounts.

    Args:
      options: a dict of ledger options.
    Returns:
      A tuple of 2 account objects, one for booking current earnings, and one
      for current conversions.
    """
    equity = options['name_equity']
    account_current_earnings = account.join(equity,
                                            options['account_current_earnings'])
    account_current_conversions = account.join(equity,
                                               options['account_current_conversions'])
    return (account_current_earnings,
            account_current_conversions)

beancount.parser.options.get_previous_accounts(options)

返回上期收益、余额和折算账户的账户名称。

参数:
  • options – 一个包含账本选项的字典。

返回:
  • 一个包含三个账户对象的元组,分别用于记录上期收益、上期余额和上期折算。

源代码位于 beancount/parser/options.py
def get_previous_accounts(options):
    """Return account names for the previous earnings, balances and conversion accounts.

    Args:
      options: a dict of ledger options.
    Returns:
      A tuple of 3 account objects, for booking previous earnings,
      previous balances, and previous conversions.
    """
    equity = options['name_equity']
    account_previous_earnings = account.join(equity,
                                             options['account_previous_earnings'])
    account_previous_balances = account.join(equity,
                                             options['account_previous_balances'])
    account_previous_conversions = account.join(equity,
                                                options['account_previous_conversions'])
    return (account_previous_earnings,
            account_previous_balances,
            account_previous_conversions)

beancount.parser.options.get_unrealized_account(options)

返回未实现损益账户的完整账户名称。

参数:
  • options – 一个包含账本选项的字典。

返回:
  • 一个包含两个账户对象的元组,分别用于记录当前收益和当前折算。

源代码位于 beancount/parser/options.py
def get_unrealized_account(options):
    """Return the full account name for the unrealized account.

    Args:
      options: a dict of ledger options.
    Returns:
      A tuple of 2 account objects, one for booking current earnings, and one
      for current conversions.
    """
    income = options['name_income']
    return  account.join(income, options['account_unrealized_gains'])

beancount.parser.options.list_options()

生成可用选项及其描述的格式化文本。

返回:
  • 一个字符串,格式化为适合在 80 列宽度下打印。

源代码位于 beancount/parser/options.py
def list_options():
    """Produce a formatted text of the available options and their description.

    Returns:
      A string, formatted nicely to be printed in 80 columns.
    """
    oss = io.StringIO()
    for group in PUBLIC_OPTION_GROUPS:
        for desc in group.options:
            oss.write('option "{}" "{}"\n'.format(desc.name, desc.example_value))
            if desc.deprecated:
                oss.write(textwrap.fill(
                    "THIS OPTION IS DEPRECATED: {}".format(desc.deprecated),
                    initial_indent="  ",
                    subsequent_indent="  "))
                oss.write('\n\n')
        description = ' '.join(line.strip()
                               for line in group.description.strip().splitlines())
        oss.write(textwrap.fill(description,
                                initial_indent='  ',
                                subsequent_indent='  '))
        oss.write('\n')

        if isinstance(desc.default_value, (list, dict, set)):
            oss.write('\n')
            oss.write('  (This option may be supplied multiple times.)\n')

        oss.write('\n\n')

    return oss.getvalue()

beancount.parser.options.options_validate_booking_method(value)

验证记账方法名称。

参数:
  • value – 一个字符串,作为选项提供的值。

返回:
  • 转换成功后的新值。

异常:
  • ValueError – 如果值无效。

源代码位于 beancount/parser/options.py
def options_validate_booking_method(value):
    """Validate a booking method name.

    Args:
      value: A string, the value provided as option.
    Returns:
      The new value, converted, if the conversion is successful.
    Raises:
      ValueError: If the value is invalid.
    """
    try:
        return data.Booking[value]
    except KeyError as exc:
        raise ValueError(str(exc)) from exc

beancount.parser.options.options_validate_boolean(value)

验证布尔选项。

参数:
  • value – 一个字符串,作为选项提供的值。

返回:
  • 转换成功后的新值。

异常:
  • ValueError – 如果值无效。

源代码位于 beancount/parser/options.py
def options_validate_boolean(value):
    """Validate a boolean option.

    Args:
      value: A string, the value provided as option.
    Returns:
      The new value, converted, if the conversion is successful.
    Raises:
      ValueError: If the value is invalid.
    """
    return value.lower() in ('1', 'true', 'yes')

beancount.parser.options.options_validate_plugin(value)

验证插件选项。

参数:
  • value – 一个字符串,作为选项提供的值。

返回:
  • 转换成功后的新值。

异常:
  • ValueError – 如果值无效。

源代码位于 beancount/parser/options.py
def options_validate_plugin(value):
    """Validate the plugin option.

    Args:
      value: A string, the value provided as option.
    Returns:
      The new value, converted, if the conversion is successful.
    Raises:
      ValueError: If the value is invalid.
    """
    # Process the 'plugin' option specially: accept an optional
    # argument from it. NOTE: We will eventually phase this out and
    # replace it by a dedicated 'plugin' directive.
    match = re.match('(.*):(.*)', value)
    if match:
        plugin_name, plugin_config = match.groups()
    else:
        plugin_name, plugin_config = value, None
    return (plugin_name, plugin_config)

beancount.parser.options.options_validate_processing_mode(value)

验证处理模式选项。

参数:
  • value – 一个字符串,作为选项提供的值。

返回:
  • 转换成功后的新值。

异常:
  • ValueError – 如果值无效。

源代码位于 beancount/parser/options.py
def options_validate_processing_mode(value):
    """Validate the options processing mode.

    Args:
      value: A string, the value provided as option.
    Returns:
      The new value, converted, if the conversion is successful.
    Raises:
      ValueError: If the value is invalid.
    """
    if value not in ('raw', 'default'):
        raise ValueError("Invalid value '{}'".format(value))
    return value

beancount.parser.options.options_validate_tolerance(value)

验证容差选项。

参数:
  • value – 一个字符串,作为选项提供的值。

返回:
  • 转换成功后的新值。

异常:
  • ValueError – 如果值无效。

源代码位于 beancount/parser/options.py
def options_validate_tolerance(value):
    """Validate the tolerance option.

    Args:
      value: A string, the value provided as option.
    Returns:
      The new value, converted, if the conversion is successful.
    Raises:
      ValueError: If the value is invalid.
    """
    return D(value)

beancount.parser.options.options_validate_tolerance_map(value)

验证一个包含货币/容差对映射的字符串选项。

参数:
  • value – 一个字符串,作为选项提供的值。

返回:
  • 转换成功后的新值。

异常:
  • ValueError – 如果值无效。

源代码位于 beancount/parser/options.py
def options_validate_tolerance_map(value):
    """Validate an option with a map of currency/tolerance pairs in a string.

    Args:
      value: A string, the value provided as option.
    Returns:
      The new value, converted, if the conversion is successful.
    Raises:
      ValueError: If the value is invalid.
    """
    # Process the setting of a key-value, whereby the value is a Decimal
    # representation.
    match = re.match('(.*):(.*)', value)
    if not match:
        raise ValueError("Invalid value '{}'".format(value))
    currency, tolerance_str = match.groups()
    return (currency, D(tolerance_str))

beancount.parser.parser

Beancount 语法解析器。

重要:解析器(及其语法构建器)生成的是“不完整”的交易对象。这意味着解析器输出中某些数据可能缺失,部分数据类型也略有不同。缺失的组件不会被替换为 None,而是使用一个特殊常量 'NA',以便在用户意外处理不完整过账时诊断问题。这些不完整条目随后会经过“记账”例程处理,同时完成以下两件事:

  1. 查找匹配的批次以减少库存头寸,以及
  2. 插值缺失的数值。

在此过程中,它们将条目标准化为“完整”条目,将头寸/批次的“成本”属性从 CostSpec 转换为 Cost。Cost 与 Amount 类似,具有“数量”和“货币”属性,但还包含标签和批次日期。CostSpec 与 Cost 类似,但所有数据均为可选;它用于指定与特定库存批次匹配的规范。

过账的其他部分也可能缺失,而不仅仅是成本部分。省略输入中的部分内容是为了触发插值,告知 Beancount 自动计算缺失的数值(如果可能)。

缺失的组件将被设置为特殊值 "beancount.core.number.MISSING",直到库存登记和数值插值完成为止。"MISSING" 值在已完成并加载的交易分录中绝不应出现。

例如,所有单位都可能缺失:

INPUT: Assets:Account posting.units = MISSING

或仅单位的数量缺失:

INPUT: Assets:Account USD posting.units = Amount(MISSING, "USD")

您必须始终指定货币。

如果价格注释完全缺失,则显示为 None:

INPUT: Assets:Account 2 MXN posting.price = None

然而,您可以表明存在价格,但让 Beancount 自动计算它:

INPUT: Assets:Account 2 MXN @ posting.price = Amount(MISSING, MISSING)

也可以指定转换货币(推荐):

INPUT: Assets:Account 2 MXN @ USD posting.price = Amount(MISSING, "USD")

如果提供了成本说明,则会设置一个 "cost" 属性,但它不指向 Cost 实例(如完整条目中那样),而是指向 CostSpec 实例。CostSpec 的某些字段若在输入中未指定,则可能为 MISSING。例如:

INPUT: Assets:Account 1 HOOL {100 # 5 USD} posting.cost = CostSpec(Decimal("100"), Decimal("5"), "USD", None, None, False))

请注意,我们从不将标签或日期覆盖视为 MISSING;这是因为这些输入是可选的:缺失的标签在计算出的成本中保持未设置,而缺失的日期覆盖则使用包含该分录的交易日期。

您可以这样表明需要填充一个总数:

INPUT: Assets:Account 1 HOOL {100 # USD} posting.cost = CostSpec(Decimal("100"), MISSING, "USD", None, None, False))

这与完全不使用总数的情况不同:

INPUT: Assets:Account 1 HOOL {100 USD} posting.cost = CostSpec(Decimal("100"), None, "USD", None, None, False))

单位数量和总数均可省略,此时仅 CostSpec 的单位数量部分显示为 MISSING:

INPUT: Assets:Account 1 HOOL {USD} posting.cost = CostSpec(MISSING, None, "USD", None, None, False))

此外,所有成本基础都可能缺失:

INPUT: Assets:Account 1 HOOL {} posting.cost = CostSpec(MISSING, None, MISSING, None, None, False))

如果您要求合并批次,则会得到以下结果:

INPUT: Assets:Account 1 HOOL {*} posting.cost = CostSpec(MISSING, None, MISSING, None, None, True))

这些数值必须由 Beancount 计算,因此我们以 MISSING 值输出。

当然,您也可以仅提供非基础信息,例如仅指定日期或标签:

INPUT: Assets:Account 1 HOOL {2015-09-21} posting.cost = CostSpec(MISSING, None, MISSING, date(2015, 9, 21), None, True)

有关示例及对应的预期值,请参见 test beancount.parser.grammar_test.TestIncompleteInputs。

beancount.parser.parser.is_entry_incomplete(entry)

检测交易中被省略的金额。

参数:
  • entries – 一个指令。

返回:
  • 布尔值,如果任何记账项中存在缺失部分,则为 True。

源代码位于 beancount/parser/parser.py
def is_entry_incomplete(entry):
    """Detect the presence of elided amounts in Transactions.

    Args:
      entries: A directive.
    Returns:
      A boolean, true if there are some missing portions of any postings found.
    """
    if isinstance(entry, data.Transaction):
        if any(is_posting_incomplete(posting) for posting in entry.postings):
            return True
    return False

beancount.parser.parser.is_posting_incomplete(posting)

检测记账项中是否存在任何被省略的金额。

如果任何可能的金额缺失,则返回 True。

参数:
  • entries – 一个指令。

返回:
  • 布尔值,如果任何记账项中存在缺失部分,则为 True。

源代码位于 beancount/parser/parser.py
def is_posting_incomplete(posting):
    """Detect the presence of any elided amounts in a Posting.

    If any of the possible amounts are missing, this returns True.

    Args:
      entries: A directive.
    Returns:
      A boolean, true if there are some missing portions of any postings found.
    """
    units = posting.units
    if (units is MISSING or
        units.number is MISSING or
        units.currency is MISSING):
        return True
    price = posting.price
    if (price is MISSING or
        price is not None and (price.number is MISSING or
                               price.currency is MISSING)):
        return True
    cost = posting.cost
    if cost is not None and (cost.number_per is MISSING or
                             cost.number_total is MISSING or
                             cost.currency is MISSING):
        return True
    return False

beancount.parser.parser.parse_doc(expect_errors=False, allow_incomplete=False)

生成用于解析函数文档字符串作为参数的装饰器的工厂。

请注意,由此生成的装饰器仅在测试中运行解析器,而不运行加载器,因此不会对解析的文本进行验证、余额检查或应用插件。

参数:
  • expect_errors – 布尔值或 None,其语义如下:True:预期存在错误,若无错误则失败;False:预期无错误,若存在错误则失败;None:不执行任何检查。

  • allow_incomplete – 布尔值,若为 True,则允许不完整的输入;否则,若输入需要插值则报错。默认值设为不允许,因为我们希望尽量减少测试对功能的依赖。

返回:
  • 用于测试函数的装饰器。

源代码位于 beancount/parser/parser.py
def parse_doc(expect_errors=False, allow_incomplete=False):
    """Factory of decorators that parse the function's docstring as an argument.

    Note that the decorators thus generated only run the parser on the tests,
    not the loader, so is no validation, balance checks, nor plugins applied to
    the parsed text.

    Args:
      expect_errors: A boolean or None, with the following semantics,
        True: Expect errors and fail if there are none.
        False: Expect no errors and fail if there are some.
        None: Do nothing, no check.
      allow_incomplete: A boolean, if true, allow incomplete input. Otherwise
        barf if the input would require interpolation. The default value is set
        not to allow it because we want to minimize the features tests depend on.
    Returns:
      A decorator for test functions.
    """
    def decorator(fun):
        """A decorator that parses the function's docstring as an argument.

        Args:
          fun: the function object to be decorated.
        Returns:
          A decorated test function.
        """
        filename = inspect.getfile(fun)
        lines, lineno = inspect.getsourcelines(fun)

        # Skip over decorator invocation and function definition. This
        # is imperfect as it assumes that each consumes exactly one
        # line, but this is by far the most common case, and this is
        # mainly used in test, thus it is good enough.
        lineno += 2

        @functools.wraps(fun)
        def wrapper(self):
            assert fun.__doc__ is not None, (
                "You need to insert a docstring on {}".format(fun.__name__))
            entries, errors, options_map = parse_string(fun.__doc__,
                                                        report_filename=filename,
                                                        report_firstline=lineno,
                                                        dedent=True)

            if not allow_incomplete and any(is_entry_incomplete(entry)
                                            for entry in entries):
                self.fail("parse_doc() may not use interpolation.")

            if expect_errors is not None:
                if expect_errors is False and errors:
                    oss = io.StringIO()
                    printer.print_errors(errors, file=oss)
                    self.fail("Unexpected errors found:\n{}".format(oss.getvalue()))
                elif expect_errors is True and not errors:
                    self.fail("Expected errors, none found:")

            return fun(self, entries, errors, options_map)

        return wrapper

    return decorator

beancount.parser.parser.parse_file(file, report_filename=None, report_firstline=1, encoding=None, debug=False, **kw)

解析 Beancount 输入文件,并返回包含交易列表和账户树的 Ledger 对象。

参数:
  • file – 文件对象或要解析的文件路径。

  • kw – 传递给 C 解析器的关键字参数字典。

返回:
  • 一个元组,包含(解析文件得到的指令列表、解析过程中遇到的错误列表、以及从文件中解析出的选项值字典)。

源代码位于 beancount/parser/parser.py
def parse_file(file, report_filename=None, report_firstline=1,
               encoding=None, debug=False, **kw):
    """Parse a beancount input file and return Ledger with the list of
    transactions and tree of accounts.

    Args:
      file: file object or path to the file to be parsed.
      kw: a dict of keywords to be applied to the C parser.
    Returns:
      A tuple of (
        list of entries parsed in the file,
        list of errors that were encountered during parsing, and
        a dict of the option values that were parsed from the file.)
    """
    if encoding is not None and codecs.lookup(encoding).name != 'utf-8':
        raise ValueError('Only UTF-8 encoded files are supported.')
    with contextlib.ExitStack() as ctx:
        if file == '-':
            file = sys.stdin.buffer
        # It would be more appropriate here to check for io.RawIOBase but
        # that does not work for io.BytesIO despite it implementing the
        # readinto() method.
        elif not isinstance(file, io.IOBase):
            file = ctx.enter_context(open(file, 'rb'))
        builder = grammar.Builder()
        parser = _parser.Parser(builder, debug=debug)
        parser.parse(file, filename=report_filename, lineno=report_firstline, **kw)
    return builder.finalize()

beancount.parser.parser.parse_many(string, level=0)

解析包含 Beancount 输入片段的字符串,并替换调用者作用域中的变量。

参数:
  • string – 包含部分 Beancount 输入的字符串。

  • level – 需要忽略的额外调用栈层数。

返回:
  • 指令列表。

异常:
  • AssertionError – 如果存在任何错误。

源代码位于 beancount/parser/parser.py
def parse_many(string, level=0):
    """Parse a string with a snippet of Beancount input and replace vars from caller.

    Args:
      string: A string with some Beancount input.
      level: The number of extra stacks to ignore.
    Returns:
      A list of entries.
    Raises:
      AssertionError: If there are any errors.
    """
    # Get the locals in the stack for the callers and produce the final text.
    frame = inspect.stack()[level+1]
    varkwds = frame[0].f_locals
    input_string = textwrap.dedent(string.format(**varkwds))

    # Parse entries and check there are no errors.
    entries, errors, __ = parse_string(input_string)
    assert not errors

    return entries

beancount.parser.parser.parse_one(string)

解析包含单个 Beancount 指令的字符串,并替换调用者作用域中的变量。

参数:
  • string – 包含部分 Beancount 输入的字符串。

  • level – 需要忽略的额外调用栈层数。

返回:
  • 指令列表。

异常:
  • AssertionError – 如果存在任何错误。

源代码位于 beancount/parser/parser.py
def parse_one(string):
    """Parse a string with single Beancount directive and replace vars from caller.

    Args:
      string: A string with some Beancount input.
      level: The number of extra stacks to ignore.
    Returns:
      A list of entries.
    Raises:
      AssertionError: If there are any errors.
    """
    entries = parse_many(string, level=1)
    assert len(entries) == 1
    return entries[0]

beancount.parser.parser.parse_string(string, report_filename=None, dedent=False, **kw)

解析 Beancount 输入文件,并返回包含交易列表和账户树的 Ledger 对象。

参数:
  • string – 字符串,作为要解析的内容,而非文件内容。

  • report_filename – 字符串,表示此字符串提取自的源文件名(如有)。该名称将存储在解析后指令的元数据中。

  • dedent – 是否在解析前对字符串运行 textwrap.dedent()。

  • **kw – 请参阅 parse.c。

返回:
  • 与 parse_file() 的输出相同。

源代码位于 beancount/parser/parser.py
def parse_string(string, report_filename=None, dedent=False, **kw):
    """Parse a beancount input file and return Ledger with the list of
    transactions and tree of accounts.

    Args:
      string: A string, the contents to be parsed instead of a file's.
      report_filename: A string, the source filename from which this string
        has been extracted, if any. This is stored in the metadata of the
        parsed entries.
      dedent: Whether to run textwrap.dedent() on the string before parsing.
      **kw: See parse.c.
    Return:
      Same as the output of parse_file().
    """
    if dedent:
        string = textwrap.dedent(string)
    if isinstance(string, str):
        string = string.encode('utf8')
    if report_filename is None:
        report_filename = '<string>'
    file = io.BytesIO(string)
    return parse_file(file, report_filename=report_filename, **kw)

beancount.parser.printer

从内部数据结构转换为文本。

beancount.parser.printer.EntryPrinter

用于打印所有指令类型的多方法接口。

属性:

名称 类型 描述
dcontext

一个 DisplayContext 实例,用于渲染所有数字。

render_weight

布尔值,若为真,则将持仓的重量作为注释渲染,用于调试。

min_width_account

整数,为账户名称预留的最小宽度。

prefix

用户自定义的前缀,用于自定义缩进(适用于 Fava)。

stringify_invalid_types

如果元数据值无效,则强制转换为字符串以进行输出。

beancount.parser.printer.EntryPrinter.__call__(self, obj) 特殊

渲染一条指令。

参数:
  • obj – 要渲染的指令。

返回:
  • 字符串,渲染后的指令。

源代码位于 beancount/parser/printer.py
def __call__(self, obj):
    """Render a directive.

    Args:
      obj: The directive to be rendered.
    Returns:
      A string, the rendered directive.
    """
    oss = io.StringIO()

    # We write optional entry source for every entry type, hence writing it here
    self.write_entry_source(obj.meta, oss, prefix="")

    method = getattr(self, obj.__class__.__name__)
    method(obj, oss)
    return oss.getvalue()

beancount.parser.printer.EntryPrinter.render_posting_strings(self, posting)

渲染持仓的三个组成部分:账户及其可选的持仓标志、头寸,以及头寸的重量。目的是让调用者对齐这些部分。

参数:
  • posting – Posting 实例,要渲染的持仓。

返回:
  • A tuple of flag_account – 字符串,包含标志的账户名称。position_str:字符串,渲染后的头寸字符串。weight_str:字符串,渲染后的持仓重量。

源代码位于 beancount/parser/printer.py
def render_posting_strings(self, posting):
    """This renders the three components of a posting: the account and its optional
    posting flag, the position, and finally, the weight of the position. The
    purpose is to align these in the caller.

    Args:
      posting: An instance of Posting, the posting to render.
    Returns:
      A tuple of
        flag_account: A string, the account name including the flag.
        position_str: A string, the rendered position string.
        weight_str: A string, the rendered weight of the posting.
    """
    # Render a string of the flag and the account.
    flag = '{} '.format(render_flag(posting.flag)) if posting.flag else ''
    flag_account = flag + posting.account

    # Render a string with the amount and cost and optional price, if
    # present. Also render a string with the weight.
    weight_str = ''
    if isinstance(posting.units, amount.Amount):
        position_str = position.to_string(posting, self.dformat)
        # Note: we render weights at maximum precision, for debugging.
        if posting.cost is None or (isinstance(posting.cost, position.Cost) and
                                    isinstance(posting.cost.number, Decimal)):
            weight_str = str(convert.get_weight(posting))
    else:
        position_str = ''

    if posting.price is not None:
        position_str += ' @ {}'.format(posting.price.to_string(self.dformat_max))

    return flag_account, position_str, weight_str

beancount.parser.printer.EntryPrinter.write_entry_source(self, meta, oss, prefix=None)

以 Emacs、VSCode 或其他编辑器可识别的格式写入源文件和行号。由于这是用于“调试”目的,此信息将通过分号注释掉。

参数:
  • meta – 一个包含此指令元数据的字典。

  • oss – 用于写入的文件对象。

  • prefix – 用于自定义缩进的用户特定前缀

源代码位于 beancount/parser/printer.py
def write_entry_source(self, meta, oss, prefix=None):
    """Write source file and line number in a format interpretable as a message
    location for Emacs, VSCode or other editors. As this is for
    "debugging" purposes, this information will be commented out by a
    semicolon.

    Args:
      meta: A dict that contains the metadata for this directive.
      oss: A file object to write to.
      prefix: User-specific prefix for custom indentation
    """
    if not self.write_source:
        return

    if prefix is None:
        prefix = self.prefix

    oss.write('{}; source: {}\n'.format(prefix, render_source(meta)))

beancount.parser.printer.EntryPrinter.write_metadata(self, meta, oss, prefix=None)

将元数据写入文件对象,不包括文件名和行号。

参数:
  • meta – 一个包含此指令元数据的字典。

  • oss – 用于写入的文件对象。

源代码位于 beancount/parser/printer.py
def write_metadata(self, meta, oss, prefix=None):
    """Write metadata to the file object, excluding filename and line number.

    Args:
      meta: A dict that contains the metadata for this directive.
      oss: A file object to write to.
    """
    if meta is None:
        return
    if prefix is None:
        prefix = self.prefix

    # Note: meta.items() is assumed stable from 3.7 onwards; we're not sorting
    # on purpose in order to keep the original insertion order in print.
    for key, value in meta.items():
        if key not in self.META_IGNORE and not key.startswith('__'):
            value_str = None
            if isinstance(value, str):
                value_str = '"{}"'.format(misc_utils.escape_string(value))
            elif isinstance(value, (Decimal, datetime.date, amount.Amount, enum.Enum)):
                value_str = str(value)
            elif isinstance(value, bool):
                value_str = 'TRUE' if value else 'FALSE'
            elif isinstance(value, (dict, inventory.Inventory)):
                pass # Ignore dicts, don't print them out.
            elif value is None:
                value_str = ''  # Render null metadata as empty, on purpose.
            else:
                if self.stringify_invalid_types:
                    # This is only intended to be used during development,
                    # when debugging for custom values of data types
                    # attached directly and not coming from the parser.
                    value_str = str(value)
                else:
                    raise ValueError("Unexpected value: '{!r}'".format(value))
            if value_str is not None:
                oss.write("{}{}: {}\n".format(prefix, key, value_str))

beancount.parser.printer.align_position_strings(strings)

一个辅助函数,用于将渲染后的金额位置对其到第一个货币字符(大写字母)。该函数接受一个渲染后的位置列表,并计算在列中堆叠显示它们所需的宽度,以使第一个货币单词对齐。它不会进一步对齐其他货币(例如价格或成本中的货币)。

这或许用一个例子解释最好。以下位置将围绕标记为 '^' 的列对齐:

      45 HOOL {504.30 USD}
       4 HOOL {504.30 USD, 2014-11-11}
    9.95 USD

-22473.32 CAD @ 1.10 USD ^

不含货币字符的字符串将左对齐显示。

参数:
  • strings – 一个包含渲染后位置或金额字符串的列表。

返回:
  • 一个包含对齐后字符串列表及其对齐宽度的元组。

源代码位于 beancount/parser/printer.py
def align_position_strings(strings):
    """A helper used to align rendered amounts positions to their first currency
    character (an uppercase letter). This class accepts a list of rendered
    positions and calculates the necessary width to render them stacked in a
    column so that the first currency word aligns. It does not go beyond that
    (further currencies, e.g. for the price or cost, are not aligned).

    This is perhaps best explained with an example. The following positions will
    be aligned around the column marked with '^':

              45 HOOL {504.30 USD}
               4 HOOL {504.30 USD, 2014-11-11}
            9.95 USD
       -22473.32 CAD @ 1.10 USD
                 ^

    Strings without a currency character will be rendered flush left.

    Args:
      strings: A list of rendered position or amount strings.
    Returns:
      A pair of a list of aligned strings and the width of the aligned strings.
    """
    # Maximum length before the alignment character.
    max_before = 0
    # Maximum length after the alignment character.
    max_after = 0
    # Maximum length of unknown strings.
    max_unknown = 0

    string_items = []
    search = re.compile('[A-Z]').search
    for string in strings:
        match = search(string)
        if match:
            index = match.start()
            if index != 0:
                max_before = max(index, max_before)
                max_after = max(len(string) - index, max_after)
                string_items.append((index, string))
                continue
        # else
        max_unknown = max(len(string), max_unknown)
        string_items.append((None, string))

    # Compute formatting string.
    max_total = max(max_before + max_after, max_unknown)
    max_after_prime = max_total - max_before
    fmt = "{{:>{0}}}{{:{1}}}".format(max_before, max_after_prime).format
    fmt_unknown = "{{:<{0}}}".format(max_total).format

    # Align the strings and return them.
    # pylint: disable=format-string-without-interpolation
    aligned_strings = []
    for index, string in string_items:
        # pylint: disable=format-string-without-interpolation
        if index is not None:
            string = fmt(string[:index], string[index:])
        else:
            string = fmt_unknown(string)
        aligned_strings.append(string)

    return aligned_strings, max_total

beancount.parser.printer.format_entry(entry, dcontext=None, render_weights=False, prefix=None, write_source=False)

将条目格式化为与解析器接受的输入语法相同的字符串。

参数:
  • entry – 一个条目实例。

  • dcontext – 用于格式化数字的 DisplayContext 实例。

  • render_weights – 布尔值,为 True 时将渲染权重以供调试。

  • write_source – 如果为 True,则会为每个条目写入源文件和行号,格式可被 Emacs、VSCode 或其他编辑器识别为消息位置。由于这是用于“调试”目的,这些信息将通过分号注释掉。

返回:
  • 一个字符串,即格式化后的条目。

源代码位于 beancount/parser/printer.py
def format_entry(entry, dcontext=None, render_weights=False, prefix=None,
                 write_source=False):
    """Format an entry into a string in the same input syntax the parser accepts.

    Args:
      entry: An entry instance.
      dcontext: An instance of DisplayContext used to format the numbers.
      render_weights: A boolean, true to render the weights for debugging.
      write_source: If true a source file and line number will be written for
        each entry in a format interpretable as a message location for Emacs,
        VSCode or other editors. As this is for
        "debugging" purposes, this information will be commented out by a
        semicolon.
    Returns:
      A string, the formatted entry.
    """
    return EntryPrinter(dcontext, render_weights, prefix=prefix,
                        write_source=write_source)(entry)

beancount.parser.printer.format_error(error)

给定一个错误对象,返回其格式化后的字符串。

参数:
  • error – 一个表示错误的命名元组对象,必须包含一个 'entry' 属性,该属性可以是单个指令对象或指令对象列表。

返回:
  • 一个字符串,即渲染后的错误信息。

源代码位于 beancount/parser/printer.py
def format_error(error):
    """Given an error objects, return a formatted string for it.

    Args:
      error: a namedtuple objects representing an error. It has to have an
        'entry' attribute that may be either a single directive object or a
        list of directive objects.
    Returns:
      A string, the errors rendered.
    """
    oss = io.StringIO()
    oss.write('{} {}\n'.format(render_source(error.source), error.message))
    if error.entry is not None:
        entries = error.entry if isinstance(error.entry, list) else [error.entry]
        error_string = '\n'.join(format_entry(entry) for entry in entries)
        oss.write('\n')
        oss.write(textwrap.indent(error_string, '   '))
        oss.write('\n')
    return oss.getvalue()

beancount.parser.printer.print_entries(entries, dcontext=None, render_weights=False, file=None, prefix=None, write_source=False)

一个便捷函数,用于将条目列表打印到文件。

参数:
  • entries – 一组指令列表。

  • dcontext – 用于格式化数字的 DisplayContext 实例。

  • render_weights – 布尔值,为 True 时将渲染权重以供调试。

  • file – 可选的文件对象,用于写入条目。

  • prefix – 用于自定义缩进的用户特定前缀(适用于 Fava)。

  • write_source – 如果为 True,则会为每个条目写入源文件和行号,格式可被 Emacs、VSCode 或其他编辑器识别为消息位置。这对于“调试”非常有用,尤其是在多文件环境中。

源代码位于 beancount/parser/printer.py
def print_entries(entries, dcontext=None, render_weights=False, file=None, prefix=None,
                  write_source=False):
    """A convenience function that prints a list of entries to a file.

    Args:
      entries: A list of directives.
      dcontext: An instance of DisplayContext used to format the numbers.
      render_weights: A boolean, true to render the weights for debugging.
      file: An optional file object to write the entries to.
      prefix: User-specific prefix for custom indentation (for Fava).
      write_source: If true a source file and line number will be written for
        each entry in a format interpretable as a message location for Emacs,
        VSCode or other editors. This is usefull for "debugging" peurposes,
        especially in a multi-file setup
    """
    assert isinstance(entries, list), "Entries is not a list: {}".format(entries)
    output = file or (codecs.getwriter("utf-8")(sys.stdout.buffer)
                      if hasattr(sys.stdout, 'buffer') else
                      sys.stdout)

    if prefix:
        output.write(prefix)
    previous_type = type(entries[0]) if entries else None
    eprinter = EntryPrinter(dcontext, render_weights, write_source=write_source)
    for entry in entries:
        # Insert a newline between transactions and between blocks of directives
        # of the same type.
        entry_type = type(entry)
        if (entry_type in (data.Transaction, data.Commodity) or
            entry_type is not previous_type or write_source):
            output.write('\n')
            previous_type = entry_type

        string = eprinter(entry)
        output.write(string)

beancount.parser.printer.print_entry(entry, dcontext=None, render_weights=False, file=None, write_source=False)

一个便捷函数,用于将单个条目打印到文件。

参数:
  • entry – 一个指令条目。

  • dcontext – 用于格式化数字的 DisplayContext 实例。

  • render_weights – 布尔值,为 True 时将渲染权重以供调试。

  • file – 可选的文件对象,用于写入条目。

  • write_source – 如果为真,则每个条目都会写入源文件和行号,格式可被 Emacs、VSCode 或其他编辑器识别为消息位置。这在“调试”时非常有用,尤其是在多文件环境中。

源代码位于 beancount/parser/printer.py
def print_entry(entry, dcontext=None, render_weights=False, file=None,
                write_source=False):
    """A convenience function that prints a single entry to a file.

    Args:
      entry: A directive entry.
      dcontext: An instance of DisplayContext used to format the numbers.
      render_weights: A boolean, true to render the weights for debugging.
      file: An optional file object to write the entries to.
      write_source: If true a source file and line number will be written for
        each entry in a format interpretable as a message location for Emacs,
        VSCode or other editors. This is usefull for "debugging" purposes,
        especially in a multi-file setup
    """
    # TODO(blais): DO remove this now, it's a huge annoyance not to be able to
    # print in-between other statements.
    output = file or (codecs.getwriter("utf-8")(sys.stdout.buffer)
                      if hasattr(sys.stdout, 'buffer') else
                      sys.stdout)
    output.write(format_entry(entry, dcontext, render_weights,
                              write_source=write_source))
    output.write('\n')

beancount.parser.printer.print_error(error, file=None)

一个便捷函数,用于将单个错误输出到文件。

参数:
  • error – 一个错误对象。

  • file – 可选的文件对象,用于写入错误信息。

源代码位于 beancount/parser/printer.py
def print_error(error, file=None):
    """A convenience function that prints a single error to a file.

    Args:
      error: An error object.
      file: An optional file object to write the errors to.
    """
    output = file or sys.stdout
    output.write(format_error(error))
    output.write('\n')

beancount.parser.printer.print_errors(errors, file=None, prefix=None)

一个便捷函数,用于将错误列表输出到文件。

参数:
  • errors – 错误列表。

  • file – 可选的文件对象,用于写入错误信息。

源代码位于 beancount/parser/printer.py
def print_errors(errors, file=None, prefix=None):
    """A convenience function that prints a list of errors to a file.

    Args:
      errors: A list of errors.
      file: An optional file object to write the errors to.
    """
    output = file or sys.stdout
    if prefix:
        output.write(prefix)
    for error in errors:
        output.write(format_error(error))
        output.write('\n')

beancount.parser.printer.render_flag(inflag)

将标志渲染为字符串,标志可以是 None 或一个字符符号。

源代码位于 beancount/parser/printer.py
def render_flag(inflag: Optional[str]) -> str:
    """Render a flag, which can be None, a symbol of a character to a string."""
    if not inflag:
        return ''
    return inflag

beancount.parser.printer.render_source(meta)

以一种既能被 Emacs 检测到、又能对齐并良好显示的方式渲染错误来源。

参数:
  • meta – 包含元数据的字典。

返回:
  • 一个字符串,可被 Emacs 或其他编辑器解释为消息位置。

源代码位于 beancount/parser/printer.py
def render_source(meta):
    """Render the source for errors in a way that it will be both detected by
    Emacs and align and rendered nicely.

    Args:
      meta: A dict with the metadata.
    Returns:
      A string, rendered to be interpretable as a message location for Emacs or
      other editors.
    """
    return '{}:{:8}'.format(meta['filename'], '{}:'.format(meta['lineno']))

beancount.parser.version

在所有程序中实现通用选项。

beancount.parser.version.compute_version_string(version, changeset, timestamp)

根据嵌入在解析器中的变更集和时间戳计算版本字符串。

参数:
  • version – 字符串,版本号。

  • changeset – 字符串,标识版本提交的版本控制系统字符串。

  • timestamp – 整数,变更集的 UNIX 时间戳。

返回:
  • 版本的可读字符串。

源代码位于 beancount/parser/version.py
def compute_version_string(version, changeset, timestamp):
    """Compute a version string from the changeset and timestamp baked in the parser.

    Args:
      version: A string, the version number.
      changeset: A string, a version control string identifying the commit of the version.
      timestamp: An integer, the UNIX epoch timestamp of the changeset.
    Returns:
      A human-readable string for the version.
    """
    # Shorten changeset.
    if changeset:
        if re.match('hg:', changeset):
            changeset = changeset[:15]
        elif re.match('git:', changeset):
            changeset = changeset[:12]

    # Convert timestamp to a date.
    date = None
    if timestamp > 0:
        date = datetime.datetime.fromtimestamp(timestamp, datetime.UTC).date()

    version = 'Beancount {}'.format(version)
    if changeset or date:
        version = '{} ({})'.format(
            version, '; '.join(map(str, filter(None, [changeset, date]))))

    return version