beancount.loader

加载器代码。这是加载文件的主要入口点。

beancount.loader.LoadError (元组)

LoadError(source, message, entry)

beancount.loader.LoadError.__getnewargs__(self) 特殊

将自身返回为一个普通元组。供 copy 和 pickle 使用。

源代码位于 beancount/loader.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)

beancount.loader.LoadError.__new__(_cls, source, message, entry) 特殊 静态方法

创建 LoadError(source, message, entry) 的新实例

beancount.loader.LoadError.__replace__(/, self, **kwds) 特殊

返回一个新的 LoadError 对象,用指定的新值替换字段

源代码位于 beancount/loader.py
def _replace(self, /, **kwds):
    result = self._make(_map(kwds.pop, field_names, self))
    if kwds:
        raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
    return result

beancount.loader.LoadError.__repr__(self) 特殊

返回一个格式良好的表示字符串

源代码位于 beancount/loader.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

beancount.loader.aggregate_options_map(options_map, other_options_map)

聚合选项映射中的某些属性。

参数:
  • options_map – 目标映射,用于聚合属性。注意:此值将被原地修改。

  • other_options_map – 其他选项映射的列表,其中一些值将被聚合。

源代码位于 beancount/loader.py
def aggregate_options_map(options_map, other_options_map):
    """Aggregate some of the attributes of options map.

    Args:
      options_map: The target map in which we want to aggregate attributes.
        Note: This value is mutated in-place.
      other_options_map: A list of other options maps, some of whose values
        we'd like to see aggregated.
    """
    options_map = copy.copy(options_map)

    currencies = list(options_map["operating_currency"])
    for omap in other_options_map:
        currencies.extend(omap["operating_currency"])
        options_map["dcontext"].update_from(omap["dcontext"])
    options_map["operating_currency"] = list(misc_utils.uniquify(currencies))


    # Produce a 'pythonpath' value for transformers.
    pythonpath = set()
    for omap in itertools.chain((options_map,), other_options_map):
        if omap.get("insert_pythonpath", False):
            pythonpath.add(path.dirname(omap["filename"]))
    options_map["pythonpath"] = sorted(pythonpath)

    return options_map

beancount.loader.combine_plugins(*plugin_modules)

合并指定插件模块中的插件。

此功能用于创建插件的插件。

参数:
  • *plugins_modules – 一组模块对象。

返回:
  • 一个可赋值给新模块的plugins属性的列表。

源代码位于 beancount/loader.py
def combine_plugins(*plugin_modules):
    """Combine the plugins from the given plugin modules.

    This is used to create plugins of plugins.
    Args:
      *plugins_modules: A sequence of module objects.
    Returns:
      A list that can be assigned to the new module's __plugins__ attribute.
    """
    modules = []
    for module in plugin_modules:
        modules.extend([getattr(module, name)
                        for name in module.__plugins__])
    return modules

beancount.loader.compute_input_hash(filenames)

计算输入数据的哈希值。

参数:
  • filenames – 输入文件列表。顺序无关。

源代码位于 beancount/loader.py
def compute_input_hash(filenames):
    """Compute a hash of the input data.

    Args:
      filenames: A list of input files. Order is not relevant.
    """
    md5 = hashlib.md5()
    for filename in sorted(filenames):
        md5.update(filename.encode('utf8'))
        if not path.exists(filename):
            continue
        stat = os.stat(filename)
        md5.update(struct.pack('dd', stat.st_mtime_ns, stat.st_size))
    return md5.hexdigest()

beancount.loader.delete_cache_function(cache_getter, function)

一个用于移除缓存文件名的包装器。

参数:
  • cache_getter – 一个接受单个参数(顶级文件名)的函数,返回对应的缓存文件名。

  • function – 需要被装饰以支持缓存的函数对象。

返回:
  • 一个装饰后的函数,若缓存文件存在,则会删除它。

源代码位于 beancount/loader.py
def delete_cache_function(cache_getter, function):
    """A wrapper that removes the cached filename.

    Args:
      cache_getter: A function of one argument, the top-level filename, which
        will return the name of the corresponding cache file.
      function: A function object to decorate for caching.
    Returns:
      A decorated function which will delete the cached filename, if it exists.
    """
    @functools.wraps(function)
    def wrapped(toplevel_filename, *args, **kw):
        # Delete the cache.
        cache_filename = cache_getter(toplevel_filename)
        if path.exists(cache_filename):
            os.remove(cache_filename)

        # Invoke the original function.
        return function(toplevel_filename, *args, **kw)
    return wrapped

beancount.loader.get_cache_filename(pattern, filename)

根据给定的模式和顶级文件名计算缓存文件名。

参数:
  • pattern (str) – 缓存文件名或模式。若模式中包含 '{filename}',则会被顶级文件名替换。此路径可以是绝对路径或相对路径。

  • filename (str) – 顶级文件名。

返回:
  • str – 解析后的缓存文件名。

源代码位于 beancount/loader.py
def get_cache_filename(pattern: str, filename: str) -> str:
    """Compute the cache filename from a given pattern and the top-level filename.

    Args:
      pattern: A cache filename or pattern. If the pattern contains '{filename}' this
        will get replaced by the top-level filename. This may be absolute or relative.
      filename: The top-level filename.
    Returns:
      The resolved cache filename.
    """
    abs_filename = path.abspath(filename)
    if path.isabs(pattern):
        abs_pattern = pattern
    else:
        abs_pattern = path.join(path.dirname(abs_filename), pattern)
    return abs_pattern.format(filename=path.basename(filename))

beancount.loader.initialize(use_cache, cache_filename=None)

初始化加载器。

源代码位于 beancount/loader.py
def initialize(use_cache: bool, cache_filename: Optional[str] = None):
    """Initialize the loader."""

    # Unless an environment variable disables it, use the pickle load cache
    # automatically. Note that this works across all Python programs running the
    # loader which is why it's located here.
    # pylint: disable=invalid-name
    global _load_file

    # Make a function to compute the cache filename.
    cache_pattern = (cache_filename or
                     os.getenv('BEANCOUNT_LOAD_CACHE_FILENAME') or
                     PICKLE_CACHE_FILENAME)
    cache_getter = functools.partial(get_cache_filename, cache_pattern)

    if use_cache:
        _load_file = pickle_cache_function(cache_getter, PICKLE_CACHE_THRESHOLD,
                                           _uncached_load_file)
    else:
        if cache_filename is not None:
            logging.warning("Cache disabled; "
                            "Explicitly overridden cache filename %s will be ignored.",
                            cache_filename)
        _load_file = delete_cache_function(cache_getter,
                                           _uncached_load_file)

beancount.loader.load_doc(expect_errors=False)

一个装饰器工厂,用于加载文档字符串并使用条目调用函数。

这是一个编写大量测试的极方便工具。使用标准的 TestCase 类编写单元测试,并将输入条目写入函数的文档字符串中。

参数:
  • expect_errors – 布尔值或 None,其语义如下:True:预期存在错误,若无错误则失败;False:预期无错误,若存在错误则失败;None:不执行任何检查。

返回:
  • 一个接受单个 'self' 参数的包装方法。

源代码位于 beancount/loader.py
def load_doc(expect_errors=False):
    """A factory of decorators that loads the docstring and calls the function with entries.

    This is an incredibly convenient tool to write lots of tests. Write a
    unittest using the standard TestCase class and put the input entries in the
    function's docstring.

    Args:
      expect_errors: A boolean or None, with the following semantics,
        True: Expect errors and fail if there are none.
        False: Expect no errors and fail if there are some.
        None: Do nothing, no check.
    Returns:
      A wrapped method that accepts a single 'self' argument.
    """
    def decorator(fun):
        """A decorator that parses the function's docstring as an argument.

        Args:
          fun: A callable method, that accepts the three return arguments that
              load() returns.
        Returns:
          A decorated test function.
        """
        @functools.wraps(fun)
        def wrapper(self):
            entries, errors, options_map = load_string(fun.__doc__, dedent=True)

            if expect_errors is not None:
                if expect_errors is False and errors:
                    oss = io.StringIO()
                    printer.print_errors(errors, file=oss)
                    self.fail("Unexpected errors found:\n{}".format(oss.getvalue()))
                elif expect_errors is True and not errors:
                    self.fail("Expected errors, none found:")

            # Note: Even if we expected no errors, we call this function with an
            # empty 'errors' list. This is so that the interface does not change
            # based on the arguments to the decorator, which would be somewhat
            # ugly and which would require explanation.
            return fun(self, entries, errors, options_map)

        wrapper.__input__ = wrapper.__doc__
        wrapper.__doc__ = None
        return wrapper

    return decorator

beancount.loader.load_encrypted_file(filename, log_timings=None, log_errors=None, extra_validations=None, dedent=False, encoding=None)

加载一个加密的 Beancount 输入文件。

参数:
  • filename – 要解析的加密文件的名称。

  • log_timings – 参见 load_string()。

  • log_errors – 参见 load_string()。

  • extra_validations – 参见 load_string()。

  • dedent – 请参见 load_string()。

  • encoding – 请参见 load_string()。

返回:
  • 一个三元组 (entries, errors, option_map),其中 "entries" 是来自文件的按日期排序的条目列表,"errors" 是在解析和验证文件时生成的错误对象列表,"options_map" 是从文件中解析出的选项字典。

源代码位于 beancount/loader.py
def load_encrypted_file(filename, log_timings=None, log_errors=None, extra_validations=None,
                        dedent=False, encoding=None):
    """Load an encrypted Beancount input file.

    Args:
      filename: The name of an encrypted file to be parsed.
      log_timings: See load_string().
      log_errors: See load_string().
      extra_validations: See load_string().
      dedent: See load_string().
      encoding: See load_string().
    Returns:
      A triple of (entries, errors, option_map) where "entries" is a date-sorted
      list of entries from the file, "errors" a list of error objects generated
      while parsing and validating the file, and "options_map", a dict of the
      options parsed from the file.
    """
    contents = encryption.read_encrypted_file(filename)
    return load_string(contents,
                       log_timings=log_timings,
                       log_errors=log_errors,
                       extra_validations=extra_validations,
                       encoding=encoding)

beancount.loader.load_file(filename, log_timings=None, log_errors=None, extra_validations=None, encoding=None)

打开一个 Beancount 输入文件,解析它,运行转换并进行验证。

参数:
  • filename – 要解析的文件名。

  • log_timings – 用于写入计时信息的文件对象或函数,或为 None(表示不输出)。注意:此参数旨在使用日志方法,且不会自动添加换行符。

  • log_errors – 用于写入错误信息的文件对象或函数,或为 None(表示不输出)。

  • extra_validations – 在加载条目列表后要运行的额外验证函数列表。

  • encoding – 字符串或 None,用于解码输入文件的编码。

返回:
  • 一个三元组 (entries, errors, option_map),其中 "entries" 是来自文件的按日期排序的条目列表,"errors" 是在解析和验证文件时生成的错误对象列表,"options_map" 是从文件中解析出的选项字典。

源代码位于 beancount/loader.py
def load_file(filename, log_timings=None, log_errors=None, extra_validations=None,
              encoding=None):
    """Open a Beancount input file, parse it, run transformations and validate.

    Args:
      filename: The name of the file to be parsed.
      log_timings: A file object or function to write timings to,
        or None, if it should remain quiet. (Note that this is intended to use
        the logging methods and does not insert a newline.)
      log_errors: A file object or function to write errors to,
        or None, if it should remain quiet.
      extra_validations: A list of extra validation functions to run after loading
        this list of entries.
      encoding: A string or None, the encoding to decode the input filename with.
    Returns:
      A triple of (entries, errors, option_map) where "entries" is a date-sorted
      list of entries from the file, "errors" a list of error objects generated
      while parsing and validating the file, and "options_map", a dict of the
      options parsed from the file.
    """
    filename = path.expandvars(path.expanduser(filename))
    if not path.isabs(filename):
        filename = path.normpath(path.join(os.getcwd(), filename))

    if encryption.is_encrypted_file(filename):
        # Note: Caching is not supported for encrypted files.
        entries, errors, options_map = load_encrypted_file(
            filename,
            log_timings, log_errors,
            extra_validations, False, encoding)
    else:
        entries, errors, options_map = _load_file(
            filename, log_timings,
            extra_validations, encoding)
        _log_errors(errors, log_errors)
    return entries, errors, options_map

beancount.loader.load_string(string, log_timings=None, log_errors=None, extra_validations=None, dedent=False, encoding=None)

打开一个 Beancount 输入字符串,解析它,运行转换并进行验证。

参数:
  • string – 一个 Beancount 输入字符串。

  • log_timings – 用于写入计时信息的文件对象或函数,或为 None(表示不输出)。

  • log_errors – 用于写入错误信息的文件对象或函数,或为 None(表示不输出)。

  • extra_validations – 在加载条目列表后要运行的额外验证函数列表。

  • dedent – 布尔值,若设置为 True,则移除各行前面的空白字符。

  • encoding – 字符串或 None,用于解码输入字符串的编码。

返回:
  • 一个三元组 (entries, errors, option_map),其中 "entries" 是来自字符串的按日期排序的条目列表,"errors" 是在解析和验证字符串时生成的错误对象列表,"options_map" 是从字符串中解析出的选项字典。

源代码位于 beancount/loader.py
def load_string(string, log_timings=None, log_errors=None, extra_validations=None,
                dedent=False, encoding=None):

    """Open a Beancount input string, parse it, run transformations and validate.

    Args:
      string: A Beancount input string.
      log_timings: A file object or function to write timings to,
        or None, if it should remain quiet.
      log_errors: A file object or function to write errors to,
        or None, if it should remain quiet.
      extra_validations: A list of extra validation functions to run after loading
        this list of entries.
      dedent: A boolean, if set, remove the whitespace in front of the lines.
      encoding: A string or None, the encoding to decode the input string with.
    Returns:
      A triple of (entries, errors, option_map) where "entries" is a date-sorted
      list of entries from the string, "errors" a list of error objects
      generated while parsing and validating the string, and "options_map", a
      dict of the options parsed from the string.
    """
    if dedent:
        string = textwrap.dedent(string)
    entries, errors, options_map = _load([(string, False)], log_timings,
                                         extra_validations, encoding)
    _log_errors(errors, log_errors)
    return entries, errors, options_map

beancount.loader.needs_refresh(options_map)

一个谓词,当至少一个输入文件可能已更改时返回 True。

参数:
  • options_map – 如解析器所定义的选项字典。

  • mtime – 修改时间,用于检查是否覆盖了 options_map 中的包含文件。

返回:
  • 布尔值,若输入因输入文件的更改而过期则返回 True。

源代码位于 beancount/loader.py
def needs_refresh(options_map):
    """Predicate that returns true if at least one of the input files may have changed.

    Args:
      options_map: An options dict as per the parser.
      mtime: A modified time, to check if it covers the include files in the options_map.
    Returns:
      A boolean, true if the input is obsoleted by changes in the input files.
    """
    if options_map is None:
        return True
    input_hash = compute_input_hash(options_map['include'])
    return 'input_hash' not in options_map or input_hash != options_map['input_hash']

beancount.loader.pickle_cache_function(cache_getter, time_threshold, function)

装饰一个加载函数,使其从 pickle 缓存中加载结果。

此函数将第一个参数视为顶级文件名,并假设被缓存的函数返回一个 (entries, errors, options_map) 三元组。我们使用 'include' 选项值来检查任何包含文件是否已更改。这本质上是针对磁盘缓存的特例。如果任何包含文件的修改时间比缓存更新,则重新计算函数并刷新缓存。

参数:
  • cache_getter – 一个接受单个参数(顶级文件名)的函数,返回对应的缓存文件名。

  • time_threshold – 浮点数,低于此秒数时我们不进行缓存。

  • function – 需要被装饰以支持缓存的函数对象。

返回:
  • 一个装饰后的函数,若缓存文件存在则从中获取结果。

源代码位于 beancount/loader.py
def pickle_cache_function(cache_getter, time_threshold, function):
    """Decorate a loader function to make it loads its result from a pickle cache.

    This considers the first argument as a top-level filename and assumes the
    function to be cached returns an (entries, errors, options_map) triple. We
    use the 'include' option value in order to check whether any of the included
    files has changed. It's essentially a special case for an on-disk memoizer.
    If any of the included files are more recent than the cache, the function is
    recomputed and the cache refreshed.

    Args:
      cache_getter: A function of one argument, the top-level filename, which
        will return the name of the corresponding cache file.
      time_threshold: A float, the number of seconds below which we don't bother
        caching.
      function: A function object to decorate for caching.
    Returns:
      A decorated function which will pull its result from a cache file if
      it is available.
    """
    @functools.wraps(function)
    def wrapped(toplevel_filename, *args, **kw):
        cache_filename = cache_getter(toplevel_filename)

        # Read the cache if it exists in order to get the list of files whose
        # timestamps to check.
        exists = path.exists(cache_filename)
        if exists:
            with open(cache_filename, 'rb') as file:
                try:
                    result = pickle.load(file)
                except Exception as exc:
                    # Note: Not a big fan of doing this, but here we handle all
                    # possible exceptions because unpickling of an old or
                    # corrupted pickle file manifests as a variety of different
                    # exception types.

                    # The cache file is corrupted; ignore it and recompute.
                    logging.error("Cache file is corrupted: %s; recomputing.", exc)
                    result = None

                else:
                    # Check that the latest timestamp has not been written after the
                    # cache file.
                    entries, errors, options_map = result
                    if not needs_refresh(options_map):
                        # All timestamps are legit; cache hit.
                        return result

        # We failed; recompute the value.
        if exists:
            try:
                os.remove(cache_filename)
            except OSError as exc:
                # Warn for errors on read-only filesystems.
                logging.warning("Could not remove picklecache file %s: %s",
                                cache_filename, exc)

        time_before = time.time()
        result = function(toplevel_filename, *args, **kw)
        time_after = time.time()

        # Overwrite the cache file if the time it takes to compute it
        # justifies it.
        if time_after - time_before > time_threshold:
            try:
                with open(cache_filename, 'wb') as file:
                    pickle.dump(result, file)
            except Exception as exc:
                logging.warning("Could not write to picklecache file %s: %s",
                                cache_filename, exc)

        return result
    return wrapped

beancount.loader.run_transformations(entries, parse_errors, options_map, log_timings)

对条目运行各种转换。

在此处合成条目、进行检查、运行插件等。

参数:
  • entries – 从解析器读取的指令列表。

  • parse_errors – 到目前为止的错误列表。

  • options_map – 从解析器读取的选项字典。

  • log_timings – 用于写入计时日志条目的函数,或为 None(表示静默模式)。

返回:
  • 修改后的条目列表和错误列表,也可能已被修改。

源代码位于 beancount/loader.py
def run_transformations(entries, parse_errors, options_map, log_timings):
    """Run the various transformations on the entries.

    This is where entries are being synthesized, checked, plugins are run, etc.

    Args:
      entries: A list of directives as read from the parser.
      parse_errors: A list of errors so far.
      options_map: An options dict as read from the parser.
      log_timings: A function to write timing log entries to, or None, if it
        should be quiet.
    Returns:
      A list of modified entries, and a list of errors, also possibly modified.
    """
    # A list of errors to extend (make a copy to avoid modifying the input).
    errors = list(parse_errors)

    # Process the plugins.
    if options_map['plugin_processing_mode'] == 'raw':
        plugins_iter = options_map["plugin"]
    elif options_map['plugin_processing_mode'] == 'default':
        plugins_iter = itertools.chain(PLUGINS_PRE,
                                       options_map["plugin"],
                                       PLUGINS_AUTO,
                                       PLUGINS_POST)
    else:
        assert "Invalid value for plugin_processing_mode: {}".format(
            options_map['plugin_processing_mode'])

    for plugin_name, plugin_config in plugins_iter:

        # Issue a warning on a renamed module.
        renamed_name = RENAMED_MODULES.get(plugin_name, None)
        if renamed_name:
            warnings.warn("Deprecation notice: Module '{}' has been renamed to '{}'; "
                          "please adjust your plugin directive.".format(
                              plugin_name, renamed_name))
            plugin_name = renamed_name

        # Try to import the module.
        #
        # Note: We intercept import errors and continue but let other plugin
        # import time exceptions fail a run, by choice.
        try:
            module = importlib.import_module(plugin_name)
            if not hasattr(module, '__plugins__'):
                continue
        except ImportError:
            # Upon failure, just issue an error.
            formatted_traceback = traceback.format_exc().replace("\n", "\n  ")
            errors.append(LoadError(data.new_metadata("<load>", 0),
                                    'Error importing "{}": {}'.format(
                                        plugin_name, formatted_traceback), None))
            continue

        # Apply it.
        with misc_utils.log_time(plugin_name, log_timings, indent=2):
            # Run each transformer function in the plugin.
            for function_name in module.__plugins__:
                if isinstance(function_name, str):
                    # Support plugin functions provided by name.
                    callback = getattr(module, function_name)
                else:
                    # Support function types directly, not just names.
                    callback = function_name

                # Provide arguments if config is provided.
                # TODO(blais): Make this consistent in v3, not conditional.
                args = () if plugin_config is None else (plugin_config,)

                # Catch all exceptions raised in running the plugin, except exits.
                try:
                    entries, plugin_errors = callback(entries, options_map, *args)
                    errors.extend(plugin_errors)
                except Exception as exc:
                    # Allow the user to exit in a plugin.
                    if isinstance(exc, SystemExit):
                        raise

                    # Upon failure, just issue an error.
                    formatted_traceback = traceback.format_exc().replace("\n", "\n  ")
                    errors.append(LoadError(data.new_metadata("<load>", 0),
                                            'Error applying plugin "{}": {}'.format(
                                                plugin_name, formatted_traceback), None))
                    continue

            # Ensure that the entries are sorted. Don't trust the plugins
            # themselves.
            entries.sort(key=data.entry_sortkey)

    return entries, errors