beancount.scripts
来自 bin 的各种脚本的实现。
这种结构是为了将所有重要代码集中在一个目录下,便于分析、搜索和单元测试。
beancount.scripts.deps
检查安装依赖项并报告每个依赖项的版本号。
此功能旨在用作错误诊断工具。
beancount.scripts.deps.check_cdecimal()
检查是否安装了 Python 3.3 或更高版本。
| 返回: |
|
|---|
源代码位于 beancount/scripts/deps.py
def check_cdecimal():
"""Check that Python 3.3 or above is installed.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
# Note: this code mirrors and should be kept in-sync with that at the top of
# beancount.core.number.
# Try the built-in installation.
import decimal
if is_fast_decimal(decimal):
return ('cdecimal', '{} (built-in)'.format(decimal.__version__), True)
# Try an explicitly installed version.
try:
import cdecimal
if is_fast_decimal(cdecimal):
return ('cdecimal', getattr(cdecimal, '__version__', 'OKAY'), True)
except ImportError:
pass
# Not found.
return ('cdecimal', None, False)
beancount.scripts.deps.check_dependencies()
检查运行时依赖项并报告其版本号。
| 返回: |
|
|---|
源代码位于 beancount/scripts/deps.py
def check_dependencies():
"""Check the runtime dependencies and report their version numbers.
Returns:
A list of pairs of (package-name, version-number, sufficient) whereby if a
package has not been installed, its 'version-number' will be set to None.
Otherwise, it will be a string with the version number in it. 'sufficient'
will be True if the version if sufficient for this installation of
Beancount.
"""
return [
# Check for a complete installation of Python itself.
check_python(),
check_cdecimal(),
# Modules we really do need installed.
check_import('dateutil'),
check_import('ply', module_name='ply.yacc', min_version='3.4'),
# Optionally required to upload data to Google Drive.
# TODO(blais, 2023-11-18): oauth2client is deprecated.
check_import('googleapiclient'),
check_import('oauth2client'),
check_import('httplib2'),
# Optionally required to support various price source fetchers.
check_import('requests', min_version='2.0'),
# Optionally required to support imports (identify, extract, file) code.
check_python_magic(),
]
beancount.scripts.deps.check_import(package_name, min_version=None, module_name=None)
检查特定模块是否已安装。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/deps.py
def check_import(package_name, min_version=None, module_name=None):
"""Check that a particular module name is installed.
Args:
package_name: A string, the name of the package and module to be
imported to verify this works. This should have a __version__
attribute on it.
min_version: If not None, a string, the minimum version number
we require.
module_name: The name of the module to import if it differs from the
package name.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
if module_name is None:
module_name = package_name
try:
__import__(module_name)
module = sys.modules[module_name]
if min_version is not None:
version = module.__version__
assert isinstance(version, str)
is_sufficient = (parse_version(version) >= parse_version(min_version)
if min_version else True)
else:
version, is_sufficient = None, True
except ImportError:
version, is_sufficient = None, False
return (package_name, version, is_sufficient)
beancount.scripts.deps.check_python()
检查是否安装了 Python 3.3 或更高版本。
| 返回: |
|
|---|
源代码位于 beancount/scripts/deps.py
def check_python():
"""Check that Python 3.3 or above is installed.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
return ('python3',
'.'.join(map(str, sys.version_info[:3])),
sys.version_info[:2] >= (3, 3))
beancount.scripts.deps.check_python_magic()
检查是否安装了足够新版本的 python-magic。
python-magic 是 libmagic 的接口,libmagic 被 'file' 工具和 UNIX 用于识别文件类型。请注意,存在两个 Python 封装库提供 'magic' 导入:python-magic 和 filemagic。我们需要的是前者,因为它维护得更频繁。
| 返回: |
|
|---|
源代码位于 beancount/scripts/deps.py
def check_python_magic():
"""Check that a recent-enough version of python-magic is installed.
python-magic is an interface to libmagic, which is used by the 'file' tool
and UNIX to identify file types. Note that there are two Python wrappers
which provide the 'magic' import: python-magic and filemagic. The former is
what we need, which appears to be more recently maintained.
Returns:
A triple of (package-name, version-number, sufficient) as per
check_dependencies().
"""
try:
import magic
# Check that python-magic and not filemagic is installed.
if not hasattr(magic, 'from_file'):
# 'filemagic' is installed; install python-magic.
raise ImportError
return ('python-magic', 'OK', True)
except (ImportError, OSError):
return ('python-magic', None, False)
beancount.scripts.deps.is_fast_decimal(decimal_module)
如果安装了快速的 C 语言 decimal 实现,则返回 True。
源代码位于 beancount/scripts/deps.py
def is_fast_decimal(decimal_module):
"Return true if a fast C decimal implementation is installed."
return isinstance(decimal_module.Decimal().sqrt, types.BuiltinFunctionType)
beancount.scripts.deps.list_dependencies(file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)
检查依赖项,并在指定文件中输出列表。
| 参数: |
|
|---|
源代码位于 beancount/scripts/deps.py
def list_dependencies(file=sys.stderr):
"""Check the dependencies and produce a listing on the given file.
Args:
file: A file object to write the output to.
"""
print("Dependencies:")
for package, version, sufficient in check_dependencies():
print(" {:16}: {} {}".format(
package,
version or 'NOT INSTALLED',
"(INSUFFICIENT)" if version and not sufficient else ""),
file=file)
beancount.scripts.deps.parse_version(version_str)
将版本字符串解析为可比较的元组。
源代码位于 beancount/scripts/deps.py
def parse_version(version_str: str) -> str:
"""Parse the version string into a comparable tuple."""
return [int(v) for v in version_str.split('.')]
beancount.scripts.directories
检查文档目录是否正确地反映账户列表。
beancount.scripts.directories.ValidateDirectoryError (异常)
目录验证错误。
beancount.scripts.directories.validate_directories(entries, document_dirs)
将目录层次结构与账本的账户名称进行验证。
读取账本的账户名称列表,并检查给定根目录下所有大写的子目录名称是否与账户名称匹配。
| 参数: |
|
|---|
源代码位于 beancount/scripts/directories.py
def validate_directories(entries, document_dirs):
"""Validate a directory hierarchy against a ledger's account names.
Read a ledger's list of account names and check that all the capitalized
subdirectory names under the given roots match the account names.
Args:
entries: A list of directives.
document_dirs: A list of string, the directory roots to walk and validate.
"""
# Get the list of accounts declared in the ledge.
accounts = getters.get_accounts(entries)
# For each of the roots, validate the hierarchy of directories.
for document_dir in document_dirs:
errors = validate_directory(accounts, document_dir)
for error in errors:
print("ERROR: {}".format(error))
beancount.scripts.directories.validate_directory(accounts, document_dir)
将目录层次结构与有效的账户列表进行验证。
遍历目录层次结构,对于所有名称与账户匹配(将 ":" 替换为 "/")的目录,检查它们是否引用了给定列表中声明的账户名称。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/directories.py
def validate_directory(accounts, document_dir):
"""Check a directory hierarchy against a list of valid accounts.
Walk the directory hierarchy, and for all directories with names matching
that of accounts (with ":" replaced with "/"), check that they refer to an
account name declared in the given list.
Args:
account: A set or dict of account names.
document_dir: A string, the root directory to walk and validate.
Returns:
An errors for each invalid directory name found.
"""
# Generate all parent accounts in the account_set we're checking against, so
# that parent directories with no corresponding account don't warn.
accounts_with_parents = set(accounts)
for account_ in accounts:
while True:
parent = account.parent(account_)
if not parent:
break
if parent in accounts_with_parents:
break
accounts_with_parents.add(parent)
account_ = parent
errors = []
for directory, account_name, _, _ in account.walk(document_dir):
if account_name not in accounts_with_parents:
errors.append(ValidateDirectoryError(
"Invalid directory '{}': no corresponding account '{}'".format(
directory, account_name)))
return errors
beancount.scripts.doctor
用于调试 Beancount 中问题的工具。
该工具能够转储词法分析器/解析器的状态,并提供其他调试服务。
beancount.scripts.doctor.FileLocation (参数类型)
beancount.scripts.doctor.FileLocation.convert(self, value, param, ctx)
将值转换为正确的类型。如果值为 None(缺失值),则不会调用此方法。
此方法必须能接受命令行传入的字符串值,以及已为正确类型的值,也可能转换其他兼容类型。
在某些情况下(例如转换提示输入时),param 和 ctx 参数可能为 None。
如果无法转换该值,请使用描述性消息调用 :meth:fail。
:param value: 要转换的值。:param param: 使用此类型转换其值的参数。可能为 None。:param ctx: 到达此值的当前上下文。可能为 None。
源代码位于 beancount/scripts/doctor.py
def convert(self, value, param, ctx):
match = re.match(r"(?:(.+):)?(\d+)$", value)
if not match:
self.fail("{!r} is not a valid location".format(value), param, ctx)
filename, lineno = match.groups()
if filename:
filename = os.path.abspath(filename)
return filename, int(lineno)
beancount.scripts.doctor.FileRegion (参数类型)
beancount.scripts.doctor.FileRegion.convert(self, value, param, ctx)
将值转换为正确的类型。如果值为 None(缺失值),则不会调用此方法。
此方法必须能接受命令行传入的字符串值,以及已为正确类型的值,也可能转换其他兼容类型。
在某些情况下(例如转换提示输入时),param 和 ctx 参数可能为 None。
如果无法转换该值,请使用描述性消息调用 :meth:fail。
:param value: 要转换的值。:param param: 使用此类型转换其值的参数。可能为 None。:param ctx: 到达此值的当前上下文。可能为 None。
源代码位于 beancount/scripts/doctor.py
def convert(self, value, param, ctx):
match = re.match(r"(?:(.+):)?(\d+):(\d+)$", value)
if not match:
self.fail("{!r} is not a valid region".format(value), param, ctx)
filename, start_lineno, end_lineno = match.groups()
if filename:
filename = os.path.abspath(filename)
return filename, int(start_lineno), int(end_lineno)
beancount.scripts.doctor.Group (组)
beancount.scripts.doctor.Group.command(self, *args, *, alias=None, **kwargs)
用于声明并附加命令到组的快捷装饰器。此装饰器接受与 :func:command 相同的参数,并立即通过调用 :meth:add_command 将创建的命令注册到该组中。
要自定义所使用的命令类,请设置 :attr:command_class 属性。
.. versionchanged:: 8.1 此装饰器可以不加括号使用。
.. versionchanged:: 8.0 添加了 :attr:command_class 属性。
源代码位于 beancount/scripts/doctor.py
def command(self, *args, alias=None, **kwargs):
wrap = click.Group.command(self, *args, **kwargs)
def decorator(f):
cmd = wrap(f)
if alias:
self.aliases[alias] = cmd.name
return cmd
return decorator
beancount.scripts.doctor.Group.get_command(self, ctx, cmd_name)
给定一个上下文和命令名称,如果存在则返回一个 :class:Command 对象,否则返回 None。
源代码位于 beancount/scripts/doctor.py
def get_command(self, ctx, cmd_name):
# aliases
name = self.aliases.get(cmd_name, cmd_name)
# allow to use '_' or '-' in command names.
name = name.replace('_', '-')
return click.Group.get_command(self, ctx, name)
beancount.scripts.doctor.RenderError (元组)
RenderError(source, message, entry)
beancount.scripts.doctor.RenderError.__getnewargs__(self)
特殊
将自身返回为一个普通元组。供 copy 和 pickle 使用。
源代码位于 beancount/scripts/doctor.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
beancount.scripts.doctor.RenderError.__new__(_cls, source, message, entry)
特殊
静态方法
创建 RenderError(source, message, entry) 的新实例
beancount.scripts.doctor.RenderError.__replace__(/, self, **kwds)
特殊
返回一个新的 RenderError 对象,用指定的新值替换某些字段
源代码位于 beancount/scripts/doctor.py
def _replace(self, /, **kwds):
result = self._make(_map(kwds.pop, field_names, self))
if kwds:
raise TypeError(f'Got unexpected field names: {list(kwds)!r}')
return result
beancount.scripts.doctor.RenderError.__repr__(self)
特殊
返回一个格式良好的表示字符串
源代码位于 beancount/scripts/doctor.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
beancount.scripts.doctor.find_linked_entries(entries, links, follow_links)
查找所有关联的条目。
请注意,这里有一个选项:您可以只查看最近条目上的链接,也可以包含已链接交易的链接。您选择哪种方式取决于您如何使用链接。最佳做法是在存在大量链接时(在 Emacs 中)向用户查询。
源代码位于 beancount/scripts/doctor.py
def find_linked_entries(entries, links, follow_links: bool):
"""Find all linked entries.
Note that there is an option here: You can either just look at the links
on the closest entry, or you can include the links of the linked
transactions as well. Whichever one you want depends on how you use your
links. Best would be to query the user (in Emacs) when there are many
links present.
"""
linked_entries = []
if not follow_links:
linked_entries = [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.links and
entry.links & links)]
else:
links = set(links)
linked_entries = []
while True:
num_linked = len(linked_entries)
linked_entries = [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.links and
entry.links & links)]
if len(linked_entries) == num_linked:
break
for entry in linked_entries:
if entry.links:
links.update(entry.links)
return linked_entries
beancount.scripts.doctor.find_tagged_entries(entries, tag)
查找所有带有指定标签的条目。
源代码位于 beancount/scripts/doctor.py
def find_tagged_entries(entries, tag):
"""Find all entries with the given tag."""
return [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
entry.tags and
tag in entry.tags)]
beancount.scripts.doctor.render_mini_balances(entries, options_map, conversion=None, price_map=None)
渲染给定交易的平衡树状列表。
| 参数: |
|
|---|
源代码位于 beancount/scripts/doctor.py
def render_mini_balances(entries, options_map, conversion=None, price_map=None):
"""Render a treeified list of the balances for the given transactions.
Args:
entries: A list of selected transactions to render.
options_map: The parsed options.
conversion: Conversion method string, None, 'value' or 'cost'.
price_map: A price map from the original entries. If this isn't provided,
the inventories are rendered directly. If it is, their contents are
converted to market value.
"""
# Render linked entries (in date order) as errors (for Emacs).
errors = [RenderError(entry.meta, '', entry)
for entry in entries]
printer.print_errors(errors)
# Print out balances.
real_root = realization.realize(entries)
dformat = options_map['dcontext'].build(alignment=Align.DOT, reserved=2)
# TODO(blais): I always want to be able to convert at cost. We need
# arguments capability.
#
# TODO(blais): Ideally this conversion inserts a new transactions to
# 'Unrealized' to account for the difference between cost and market value.
# Insert one and update the realization. Add an update() method to the
# realization, given a transaction.
acctypes = options.get_account_types(options_map)
if conversion == 'value':
assert price_map is not None
# Warning: Mutate the inventories in-place, converting them to market
# value.
balance_diff = inventory.Inventory()
for real_account in realization.iter_children(real_root):
balance_cost = real_account.balance.reduce(convert.get_cost)
balance_value = real_account.balance.reduce(convert.get_value, price_map)
real_account.balance = balance_value
balance_diff.add_inventory(balance_cost)
balance_diff.add_inventory(-balance_value)
if not balance_diff.is_empty():
account_unrealized = account.join(acctypes.income,
options_map["account_unrealized_gains"])
unrealized = realization.get_or_create(real_root, account_unrealized)
unrealized.balance.add_inventory(balance_diff)
elif conversion == 'cost':
for real_account in realization.iter_children(real_root):
real_account.balance = real_account.balance.reduce(convert.get_cost)
realization.dump_balances(real_root, dformat, file=sys.stdout)
# Print out net income change.
net_income = inventory.Inventory()
for real_node in realization.iter_children(real_root):
if account_types.is_income_statement_account(real_node.account, acctypes):
net_income.add_inventory(real_node.balance)
print()
print('Net Income: {}'.format(-net_income))
beancount.scripts.doctor.resolve_region_to_entries(entries, filename, region)
将文件名和区域解析为条目列表。
源代码位于 beancount/scripts/doctor.py
def resolve_region_to_entries(
entries: List[data.Entries],
filename: str,
region: Tuple[str, int, int]
) -> List[data.Entries]:
"""Resolve a filename and region to a list of entries."""
search_filename, first_lineno, last_lineno = region
if search_filename is None:
search_filename = filename
# Find all the entries in the region. (To be clear, this isn't like the
# 'linked' command, none of the links are followed.)
region_entries = [
entry
for entry in data.filter_txns(entries)
if (entry.meta['filename'] == search_filename and
first_lineno <= entry.meta['lineno'] <= last_lineno)]
return region_entries
beancount.scripts.example
beancount.scripts.example.LiberalDate (ParamType)
beancount.scripts.example.LiberalDate.convert(self, value, param, ctx)
将值转换为正确的类型。如果值为 None(缺失值),则不会调用此方法。
此方法必须能接受命令行传入的字符串值,以及已为正确类型的值,也可能转换其他兼容类型。
在某些情况下(例如转换提示输入时),param 和 ctx 参数可能为 None。
如果无法转换该值,请使用描述性消息调用 :meth:fail。
:param value: 要转换的值。:param param: 使用此类型转换其值的参数。可能为 None。:param ctx: 到达此值的当前上下文。可能为 None。
源代码位于 beancount/scripts/example.py
def convert(self, value, param, ctx):
try:
date_utils.parse_date_liberally(value)
except ValueError:
self.fail("{!r} is not a valid date".format(value), param, ctx)
beancount.scripts.example.check_non_negative(entries, account, currency)
检查指定账户的余额是否从未变为负数。
| 参数: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/scripts/example.py
def check_non_negative(entries, account, currency):
"""Check that the balance of the given account never goes negative.
Args:
entries: A list of directives.
account: An account string, the account to check the balance for.
currency: A string, the currency to check minimums for.
Raises:
AssertionError: if the balance goes negative.
"""
previous_date = None
for txn_posting, balances in postings_for(data.sorted(entries), [account], before=True):
balance = balances[account]
date = txn_posting.txn.date
if date != previous_date:
assert all(pos.units.number >= ZERO for pos in balance.get_positions()), (
"Negative balance: {} at: {}".format(balance, txn_posting.txn.date))
previous_date = date
beancount.scripts.example.compute_trip_dates(date_begin, date_end)
在给定时间段内生成合理的旅行日期间隔。
| 参数: |
|
|---|
生成:时间段内旅行的日期对。
源代码位于 beancount/scripts/example.py
def compute_trip_dates(date_begin, date_end):
"""Generate dates at reasonable intervals for trips during the given time period.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Pairs of dates for the trips within the period.
"""
# Min and max number of days remaining at home.
days_at_home = (4*30, 13*30)
# Length of trip.
days_trip = (8, 22)
# Number of days to ensure no trip at the beginning and the end.
days_buffer = 21
date_begin += datetime.timedelta(days=days_buffer)
date_end -= datetime.timedelta(days=days_buffer)
date = date_begin
while 1:
duration_at_home = datetime.timedelta(days=random.randint(*days_at_home))
duration_trip = datetime.timedelta(days=random.randint(*days_trip))
date_trip_begin = date + duration_at_home
date_trip_end = date_trip_begin + duration_trip
if date_trip_end >= date_end:
break
yield (date_trip_begin, date_trip_end)
date = date_trip_end
beancount.scripts.example.contextualize_file(contents, employer)
将生成文件中的通用字符串替换为更真实的字符串。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def contextualize_file(contents, employer):
"""Replace generic strings in the generated file with realistic strings.
Args:
contents: A string, the generic file contents.
Returns:
A string, the contextualized version.
"""
replacements = {
'CC': 'US',
'Bank1': 'BofA',
'Bank1_Institution': 'Bank of America',
'Bank1_Address': '123 America Street, LargeTown, USA',
'Bank1_Phone': '+1.012.345.6789',
'CreditCard1': 'Chase:Slate',
'CreditCard2': 'Amex:BlueCash',
'Employer1': employer,
'Retirement': 'Vanguard',
'Retirement_Institution': 'Vanguard Group',
'Retirement_Address': "P.O. Box 1110, Valley Forge, PA 19482-1110",
'Retirement_Phone': "+1.800.523.1188",
'Investment': 'ETrade',
# Commodities
'CCY': 'USD',
'VACHR': 'VACHR',
'DEFCCY': 'IRAUSD',
'MFUND1': 'VBMPX',
'MFUND2': 'RGAGX',
'STK1': 'ITOT',
'STK2': 'VEA',
'STK3': 'VHT',
'STK4': 'GLD',
}
new_contents = replace(contents, replacements)
return new_contents, replacements
beancount.scripts.example.date_iter(date_begin, date_end)
生成一系列日期。
| 参数: |
|
|---|
生成:datetime.date 实例。
源代码位于 beancount/scripts/example.py
def date_iter(date_begin, date_end):
"""Generate a sequence of dates.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Instances of datetime.date.
"""
assert date_begin <= date_end
date = date_begin
one_day = datetime.timedelta(days=1)
while date < date_end:
date += one_day
yield date
beancount.scripts.example.date_random_seq(date_begin, date_end, days_min, days_max)
生成一系列日期,每次迭代随机增加若干天。
| 参数: |
|
|---|
生成:datetime.date 实例。
源代码位于 beancount/scripts/example.py
def date_random_seq(date_begin, date_end, days_min, days_max):
"""Generate a sequence of dates with some random increase in days.
Args:
date_begin: The start date.
date_end: The end date.
days_min: The minimum number of days to advance on each iteration.
days_max: The maximum number of days to advance on each iteration.
Yields:
Instances of datetime.date.
"""
assert days_min > 0
assert days_min <= days_max
date = date_begin
while date < date_end:
nb_days_forward = random.randint(days_min, days_max)
date += datetime.timedelta(days=nb_days_forward)
if date >= date_end:
break
yield date
beancount.scripts.example.delay_dates(date_iter, delay_days_min, delay_days_max)
将给定迭代器中的日期延迟一个均匀随机抽取的天数。
| 参数: |
|
|---|
返回:datetime.date 实例。
源代码位于 beancount/scripts/example.py
def delay_dates(date_iter, delay_days_min, delay_days_max):
"""Delay the dates from the given iterator by some uniformly drawn number of days.
Args:
date_iter: An iterator of datetime.date instances.
delay_days_min: The minimum amount of advance days for the transaction.
delay_days_max: The maximum amount of advance days for the transaction.
Yields:
datetime.date instances.
"""
dates = list(date_iter)
last_date = dates[-1]
last_date = last_date.date() if isinstance(last_date, datetime.datetime) else last_date
for dtime in dates:
date = dtime.date() if isinstance(dtime, datetime.datetime) else dtime
date += datetime.timedelta(days=random.randint(delay_days_min, delay_days_max))
if date >= last_date:
break
yield date
beancount.scripts.example.generate_balance_checks(entries, account, date_iter)
生成指定频率的余额检查条目。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_balance_checks(entries, account, date_iter):
"""Generate balance check entries to the given frequency.
Args:
entries: A list of directives that contain all the transactions for the
accounts.
account: The name of the account for which to generate.
date_iter: Iterator of dates. We generate balance checks at these dates.
Returns:
A list of balance check entries.
"""
balance_checks = []
date_iter = iter(date_iter)
next_date = next(date_iter)
with misc_utils.swallow(StopIteration):
for txn_posting, balance in postings_for(entries, [account], before=True):
while txn_posting.txn.date >= next_date:
amount = balance[account].get_currency_units('CCY').number
balance_checks.extend(parse(f"""
{next_date} balance {account} {amount} CCY
"""))
next_date = next(date_iter)
return balance_checks
beancount.scripts.example.generate_banking(entries, date_begin, date_end, amount_initial)
生成支票账户的开户条目。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_banking(entries, date_begin, date_end, amount_initial):
"""Generate a checking account opening.
Args:
entries: A list of entries which affect this account.
date_begin: A date instance, the beginning date.
date_end: A date instance, the end date.
amount_initial: A Decimal instance, the amount to initialize the checking
account with.
Returns:
A list of directives.
"""
amount_initial_neg = -amount_initial
new_entries = parse(f"""
{date_begin} open Assets:CC:Bank1
institution: "Bank1_Institution"
address: "Bank1_Address"
phone: "Bank1_Phone"
{date_begin} open Assets:CC:Bank1:Checking CCY
account: "00234-48574897"
;; {date_begin} open Assets:CC:Bank1:Savings CCY
{date_begin} * "Opening Balance for checking account"
Assets:CC:Bank1:Checking {amount_initial} CCY
Equity:Opening-Balances {amount_initial_neg} CCY
""")
date_balance = date_begin + datetime.timedelta(days=1)
account = 'Assets:CC:Bank1:Checking'
for txn_posting, balances in postings_for(data.sorted(entries + new_entries),
[account], before=True):
if txn_posting.txn.date >= date_balance:
break
amount_balance = balances[account].get_currency_units('CCY').number
bal_entries = parse(f"""
{date_balance} balance Assets:CC:Bank1:Checking {amount_balance} CCY
""")
return new_entries + bal_entries
beancount.scripts.example.generate_banking_expenses(date_begin, date_end, account, rent_amount)
从支票账户中生成支出,通常是生活费用。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_banking_expenses(date_begin, date_end, account, rent_amount):
"""Generate expenses paid out of a checking account, typically living expenses.
Args:
date_begin: The start date.
date_end: The end date.
account: The checking account to generate expenses to.
rent_amount: The amount of rent.
Returns:
A list of directives.
"""
fee_expenses = generate_periodic_expenses(
rrule.rrule(rrule.MONTHLY, bymonthday=4, dtstart=date_begin, until=date_end),
"BANK FEES", "Monthly bank fee",
account, 'Expenses:Financial:Fees',
lambda: D('4.00'))
rent_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 2, 5),
"RiverBank Properties", "Paying the rent",
account, 'Expenses:Home:Rent',
lambda: random.normalvariate(float(rent_amount), 0))
electricity_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 7, 8),
"EDISON POWER", "",
account, 'Expenses:Home:Electricity',
lambda: random.normalvariate(65, 0))
internet_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 20, 22),
"Wine-Tarner Cable", "",
account, 'Expenses:Home:Internet',
lambda: random.normalvariate(80, 0.10))
phone_expenses = generate_periodic_expenses(
delay_dates(rrule.rrule(rrule.MONTHLY, dtstart=date_begin, until=date_end), 17, 19),
"Verizon Wireless", "",
account, 'Expenses:Home:Phone',
lambda: random.normalvariate(60, 10))
return data.sorted(fee_expenses +
rent_expenses +
electricity_expenses +
internet_expenses +
phone_expenses)
beancount.scripts.example.generate_clearing_entries(date_iter, payee, narration, entries, account_clear, account_from)
生成用于清空账户余额的条目。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_clearing_entries(date_iter,
payee, narration,
entries, account_clear, account_from):
"""Generate entries to clear the value of an account.
Args:
date_iter: An iterator of datetime.date instances.
payee: A string, the payee name to use on the transactions.
narration: A string, the narration to use on the transactions.
entries: A list of entries.
account_clear: The account to clear.
account_from: The source account to clear 'account_clear' from.
Returns:
A list of directives.
"""
new_entries = []
# The next date we're looking for.
date_iter = iter(date_iter)
next_date = next(date_iter, None)
if not next_date:
return new_entries
# Iterate over all the postings of the account to clear.
for txn_posting, balances in postings_for(entries, [account_clear]):
balance_clear = balances[account_clear]
# Check if we need to clear.
if next_date <= txn_posting.txn.date:
pos_amount = balance_clear.get_currency_units('CCY')
neg_amount = -pos_amount
new_entries.extend(parse(f"""
{next_date} * "{payee}" "{narration}"
{account_clear} {neg_amount.number:.2f} CCY
{account_from} {pos_amount.number:.2f} CCY
"""))
balance_clear.add_amount(neg_amount)
# Advance to the next date we're looking for.
next_date = next(date_iter, None)
if not next_date:
break
return new_entries
beancount.scripts.example.generate_commodity_entries(date_birth)
为所有使用的货币创建 Commodity 条目列表。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_commodity_entries(date_birth):
"""Create a list of Commodity entries for all the currencies we're using.
Args:
date_birth: A datetime.date instance, the date of birth of the user.
Returns:
A list of Commodity entries for all the commodities in use.
"""
return parse(f"""
1792-01-01 commodity USD
name: "US Dollar"
export: "CASH"
{date_birth} commodity VACHR
name: "Employer Vacation Hours"
export: "IGNORE"
{date_birth} commodity IRAUSD
name: "US 401k and IRA Contributions"
export: "IGNORE"
2009-05-01 commodity RGAGX
name: "American Funds The Growth Fund of America Class R-6"
export: "MUTF:RGAGX"
price: "USD:google/MUTF:RGAGX"
1995-09-18 commodity VBMPX
name: "Vanguard Total Bond Market Index Fund Institutional Plus Shares"
export: "MUTF:VBMPX"
price: "USD:google/MUTF:VBMPX"
2004-01-20 commodity ITOT
name: "iShares Core S&P Total U.S. Stock Market ETF"
export: "NYSEARCA:ITOT"
price: "USD:google/NYSEARCA:ITOT"
2007-07-20 commodity VEA
name: "Vanguard FTSE Developed Markets ETF"
export: "NYSEARCA:VEA"
price: "USD:google/NYSEARCA:VEA"
2004-01-26 commodity VHT
name: "Vanguard Health Care ETF"
export: "NYSEARCA:VHT"
price: "USD:google/NYSEARCA:VHT"
2004-11-01 commodity GLD
name: "SPDR Gold Trust (ETF)"
export: "NYSEARCA:GLD"
price: "USD:google/NYSEARCA:GLD"
1900-01-01 commodity VMMXX
export: "MUTF:VMMXX (MONEY:USD)"
""")
beancount.scripts.example.generate_employment_income(employer_name, employer_address, annual_salary, account_deposit, account_retirement, date_begin, date_end)
生成每两周一次的工资收入条目。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_employment_income(employer_name,
employer_address,
annual_salary,
account_deposit,
account_retirement,
date_begin,
date_end):
"""Generate bi-weekly entries for payroll salary income.
Args:
employer_name: A string, the human-readable name of the employer.
employer_address: A string, the address of the employer.
annual_salary: A Decimal, the annual salary of the employee.
account_deposit: An account string, the account to deposit the salary to.
account_retirement: An account string, the account to deposit retirement
contributions to.
date_begin: The start date.
date_end: The end date.
Returns:
A list of directives, including open directives for the account.
"""
preamble = parse(f"""
{date_begin} event "employer" "{employer_name}, {employer_address}"
{date_begin} open Income:CC:Employer1:Salary CCY
;{date_begin} open Income:CC:Employer1:AnnualBonus CCY
{date_begin} open Income:CC:Employer1:GroupTermLife CCY
{date_begin} open Income:CC:Employer1:Vacation VACHR
{date_begin} open Assets:CC:Employer1:Vacation VACHR
{date_begin} open Expenses:Vacation VACHR
{date_begin} open Expenses:Health:Life:GroupTermLife
{date_begin} open Expenses:Health:Medical:Insurance
{date_begin} open Expenses:Health:Dental:Insurance
{date_begin} open Expenses:Health:Vision:Insurance
;{date_begin} open Expenses:Vacation:Employer
""")
date_prev = None
contrib_retirement = ZERO
contrib_socsec = ZERO
biweekly_pay = annual_salary / 26
gross = biweekly_pay
medicare = gross * D('0.0231')
federal = gross * D('0.2303')
state = gross * D('0.0791')
city = gross * D('0.0379')
sdi = D('1.12')
lifeinsurance = D('24.32')
dental = D('2.90')
medical = D('27.38')
vision = D('42.30')
fixed = (medicare + federal + state + city + sdi +
dental + medical + vision)
# Calculate vacation hours per-pay.
with decimal.localcontext() as ctx:
ctx.prec = 4
vacation_hrs = (ANNUAL_VACATION_DAYS * D('8')) / D('26')
transactions = []
for dtime in misc_utils.skipiter(
rrule.rrule(rrule.WEEKLY, byweekday=rrule.TH,
dtstart=date_begin, until=date_end), 2):
date = dtime.date()
year = date.year
if not date_prev or date_prev.year != date.year:
contrib_retirement = RETIREMENT_LIMITS.get(date.year, RETIREMENT_LIMITS[None])
contrib_socsec = D('7000')
date_prev = date
retirement_uncapped = math.ceil((gross * D('0.25')) / 100) * 100
retirement = min(contrib_retirement, retirement_uncapped)
contrib_retirement -= retirement
socsec_uncapped = gross * D('0.0610')
socsec = min(contrib_socsec, socsec_uncapped)
contrib_socsec -= socsec
with decimal.localcontext() as ctx:
ctx.prec = 6
deposit = (gross - retirement - fixed - socsec)
retirement_neg = -retirement
gross_neg = -gross
lifeinsurance_neg = -lifeinsurance
vacation_hrs_neg = -vacation_hrs
transactions.extend(parse(f"""
{date} * "{employer_name}" "Payroll"
{account_deposit} {deposit:.2f} CCY
""" + ("" if retirement == ZERO else f"""\
{account_retirement} {retirement:.2f} CCY
Assets:CC:Federal:PreTax401k {retirement_neg:.2f} DEFCCY
Expenses:Taxes:Y{year}:CC:Federal:PreTax401k {retirement:.2f} DEFCCY
""") + f"""\
Income:CC:Employer1:Salary {gross_neg:.2f} CCY
Income:CC:Employer1:GroupTermLife {lifeinsurance_neg:.2f} CCY
Expenses:Health:Life:GroupTermLife {lifeinsurance:.2f} CCY
Expenses:Health:Dental:Insurance {dental} CCY
Expenses:Health:Medical:Insurance {medical} CCY
Expenses:Health:Vision:Insurance {vision} CCY
Expenses:Taxes:Y{year}:CC:Medicare {medicare:.2f} CCY
Expenses:Taxes:Y{year}:CC:Federal {federal:.2f} CCY
Expenses:Taxes:Y{year}:CC:State {state:.2f} CCY
Expenses:Taxes:Y{year}:CC:CityNYC {city:.2f} CCY
Expenses:Taxes:Y{year}:CC:SDI {sdi:.2f} CCY
Expenses:Taxes:Y{year}:CC:SocSec {socsec:.2f} CCY
Assets:CC:Employer1:Vacation {vacation_hrs:.2f} VACHR
Income:CC:Employer1:Vacation {vacation_hrs_neg:.2f} VACHR
"""))
return preamble + transactions
beancount.scripts.example.generate_expense_accounts(date_birth)
生成支出账户的指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_expense_accounts(date_birth):
"""Generate directives for expense accounts.
Args:
date_birth: Birth date of the character.
Returns:
A list of directives.
"""
return parse(f"""
{date_birth} open Expenses:Food:Groceries
{date_birth} open Expenses:Food:Restaurant
{date_birth} open Expenses:Food:Coffee
{date_birth} open Expenses:Food:Alcohol
{date_birth} open Expenses:Transport:Tram
{date_birth} open Expenses:Home:Rent
{date_birth} open Expenses:Home:Electricity
{date_birth} open Expenses:Home:Internet
{date_birth} open Expenses:Home:Phone
{date_birth} open Expenses:Financial:Fees
{date_birth} open Expenses:Financial:Commissions
""")
beancount.scripts.example.generate_open_entries(date, accounts, currency=None)
为给定账户生成一组 Open 指令:
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_open_entries(date, accounts, currency=None):
"""Generate a list of Open entries for the given accounts:
Args:
date: A datetime.date instance for the open entries.
accounts: A list of account strings.
currency: An optional currency constraint.
Returns:
A list of Open directives.
"""
assert isinstance(accounts, (list, tuple))
return parse(''.join(
'{date} open {account} {currency}\n'.format(date=date,
account=account,
currency=currency or '')
for account in accounts))
beancount.scripts.example.generate_outgoing_transfers(entries, account, account_out, transfer_minimum, transfer_threshold, transfer_increment)
生成从账户转出累积资金的转账指令。
监控账户余额,当余额超过阈值时,自动生成从该账户向另一账户的转账。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_outgoing_transfers(entries,
account,
account_out,
transfer_minimum,
transfer_threshold,
transfer_increment):
"""Generate transfers of accumulated funds out of an account.
This monitors the balance of an account and when it is beyond a threshold,
generate out transfers form that account to another account.
Args:
entries: A list of existing entries that affect this account so far.
The generated entries will also affect this account.
account: An account string, the account to monitor.
account_out: An account string, the savings account to make transfers to.
transfer_minimum: The minimum amount of funds to always leave in this account
after a transfer.
transfer_threshold: The minimum amount of funds to be able to transfer out without
breaking the minimum.
transfer_increment: A Decimal, the increment to round transfers to.
Returns:
A list of new directives, the transfers to add to the given account.
"""
last_date = entries[-1].date
# Reverse the balance amounts taking into account the minimum balance for
# all time in the future.
amounts = [(balances[account].get_currency_units('CCY').number, txn_posting)
for txn_posting, balances in postings_for(entries, [account])]
reversed_amounts = []
last_amount, _ = amounts[-1]
for current_amount, _ in reversed(amounts):
if current_amount < last_amount:
reversed_amounts.append(current_amount)
last_amount = current_amount
else:
reversed_amounts.append(last_amount)
capped_amounts = reversed(reversed_amounts)
# Create transfers outward where the future allows it.
new_entries = []
offset_amount = ZERO
for current_amount, (_, txn_posting) in zip(capped_amounts, amounts):
if txn_posting.txn.date >= last_date:
break
adjusted_amount = current_amount - offset_amount
if adjusted_amount > (transfer_minimum + transfer_threshold):
amount_transfer = round_to(adjusted_amount - transfer_minimum,
transfer_increment)
date = txn_posting.txn.date + datetime.timedelta(days=1)
amount_transfer_neg = -amount_transfer
new_entries.extend(parse(f"""
{date} * "Transfering accumulated savings to other account"
{account} {amount_transfer_neg:2f} CCY
{account_out} {amount_transfer:2f} CCY
"""))
offset_amount += amount_transfer
return new_entries
beancount.scripts.example.generate_periodic_expenses(date_iter, payee, narration, account_from, account_to, amount_generator)
生成周期性支出交易。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_periodic_expenses(date_iter,
payee, narration,
account_from, account_to,
amount_generator):
"""Generate periodic expense transactions.
Args:
date_iter: An iterator for dates or datetimes.
payee: A string, the payee name to use on the transactions, or a set of such strings
to randomly choose from
narration: A string, the narration to use on the transactions.
account_from: An account string the debited account.
account_to: An account string the credited account.
amount_generator: A callable object to generate variates.
Returns:
A list of directives.
"""
new_entries = []
for dtime in date_iter:
date = dtime.date() if isinstance(dtime, datetime.datetime) else dtime
amount = D(amount_generator())
txn_payee = (payee
if isinstance(payee, str)
else random.choice(payee))
txn_narration = (narration
if isinstance(narration, str)
else random.choice(narration))
amount_neg = -amount
new_entries.extend(parse(f"""
{date} * "{txn_payee}" "{txn_narration}"
{account_from} {amount_neg:.2f} CCY
{account_to} {amount:.2f} CCY
"""))
return new_entries
beancount.scripts.example.generate_prices(date_begin, date_end, currencies, cost_currency)
为给定的货币生成每周或每月的价格条目。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_prices(date_begin, date_end, currencies, cost_currency):
"""Generate weekly or monthly price entries for the given currencies.
Args:
date_begin: The start date.
date_end: The end date.
currencies: A list of currency strings to generate prices for.
cost_currency: A string, the cost currency.
Returns:
A list of Price directives.
"""
digits = D('0.01')
entries = []
counter = itertools.count()
for currency in currencies:
start_price = random.uniform(30, 200)
growth = random.uniform(0.02, 0.13) # %/year
mu = growth * (7 / 365)
sigma = random.uniform(0.005, 0.02) # Vol
for dtime, price_float in zip(rrule.rrule(rrule.WEEKLY, byweekday=rrule.FR,
dtstart=date_begin, until=date_end),
price_series(start_price, mu, sigma)):
price = D(price_float).quantize(digits)
meta = data.new_metadata(generate_prices.__name__, next(counter))
entry = data.Price(meta, dtime.date(), currency,
amount.Amount(price, cost_currency))
entries.append(entry)
return entries
beancount.scripts.example.generate_regular_credit_expenses(date_birth, date_begin, date_end, account_credit, account_checking)
生成从信用卡账户支付的支出,包括对信用卡的还款。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_regular_credit_expenses(date_birth, date_begin, date_end,
account_credit,
account_checking):
"""Generate expenses paid out of a credit card account, including payments to the
credit card.
Args:
date_birth: The user's birth date.
date_begin: The start date.
date_end: The end date.
account_credit: The credit card account to generate expenses against.
account_checking: The checking account to generate payments from.
Returns:
A list of directives.
"""
restaurant_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 1, 5),
RESTAURANT_NAMES, RESTAURANT_NARRATIONS,
account_credit, 'Expenses:Food:Restaurant',
lambda: min(random.lognormvariate(math.log(30), math.log(1.5)),
random.randint(200, 220)))
groceries_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 5, 20),
GROCERIES_NAMES, "Buying groceries",
account_credit, 'Expenses:Food:Groceries',
lambda: min(random.lognormvariate(math.log(80), math.log(1.3)),
random.randint(250, 300)))
subway_expenses = generate_periodic_expenses(
date_random_seq(date_begin, date_end, 27, 33),
"Metro Transport Authority", "Tram tickets",
account_credit, 'Expenses:Transport:Tram',
lambda: D('120.00'))
credit_expenses = data.sorted(restaurant_expenses +
groceries_expenses +
subway_expenses)
# Entries to open accounts.
credit_preamble = generate_open_entries(date_birth, [account_credit], 'CCY')
return data.sorted(credit_preamble + credit_expenses)
beancount.scripts.example.generate_retirement_employer_match(entries, account_invest, account_income)
生成雇主对退休账户的匹配供款。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_retirement_employer_match(entries, account_invest, account_income):
"""Generate employer matching contributions into a retirement account.
Args:
entries: A list of directives that cover the retirement account.
account_invest: The name of the retirement cash account.
account_income: The name of the income account.
Returns:
A list of new entries generated for employer contributions.
"""
match_frac = D('0.50')
new_entries = parse(f"""
{entries[0].date} open {account_income} CCY
""")
for txn_posting, balances in postings_for(entries, [account_invest]):
amount = txn_posting.posting.units.number * match_frac
amount_neg = -amount
date = txn_posting.txn.date + ONE_DAY
new_entries.extend(parse(f"""
{date} * "Employer match for contribution"
{account_invest} {amount:.2f} CCY
{account_income} {amount_neg:.2f} CCY
"""))
return new_entries
beancount.scripts.example.generate_retirement_investments(entries, account, commodities_items, price_map)
将存入指定退休账户的资金进行投资。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_retirement_investments(entries, account, commodities_items, price_map):
"""Invest money deposited to the given retirement account.
Args:
entries: A list of directives
account: The root account for all retirement investment sub-accounts.
commodities_items: A list of (commodity, fraction to be invested in) items.
price_map: A dict of prices, as per beancount.core.prices.build_price_map().
Returns:
A list of new directives for the given investments. This also generates account
opening directives for the desired investment commodities.
"""
open_entries = []
account_cash = join(account, 'Cash')
date_origin = entries[0].date
open_entries.extend(parse(f"""
{date_origin} open {account} CCY
institution: "Retirement_Institution"
address: "Retirement_Address"
phone: "Retirement_Phone"
{date_origin} open {account_cash} CCY
number: "882882"
"""))
for currency, _ in commodities_items:
open_entries.extend(parse(f"""
{date_origin} open {account}:{currency} {currency}
number: "882882"
"""))
new_entries = []
for txn_posting, balances in postings_for(entries, [account_cash]):
balance = balances[account_cash]
amount_to_invest = balance.get_currency_units('CCY').number
# Find the date the following Monday, the date to invest.
txn_date = txn_posting.txn.date
while txn_date.weekday() != calendar.MONDAY:
txn_date += ONE_DAY
amount_invested = ZERO
for commodity, fraction in commodities_items:
amount_fraction = amount_to_invest * D(fraction)
# Find the price at that date.
_, price = prices.get_price(price_map, (commodity, 'CCY'), txn_date)
units = (amount_fraction / price).quantize(D('0.001'))
amount_cash = (units * price).quantize(D('0.01'))
amount_cash_neg = -amount_cash
new_entries.extend(parse(f"""
{txn_date} * "Investing {fraction:.0%} of cash in {commodity}"
{account}:{commodity} {units:.3f} {commodity} {{{price:.2f} CCY}}
{account}:Cash {amount_cash_neg:.2f} CCY
"""))
balance.add_amount(amount.Amount(-amount_cash, 'CCY'))
return data.sorted(open_entries + new_entries)
beancount.scripts.example.generate_tax_accounts(year, date_max)
为特定税务年度生成账户和供款指令。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_tax_accounts(year, date_max):
"""Generate accounts and contribution directives for a particular tax year.
Args:
year: An integer, the year we're to generate this for.
date_max: The maximum date to produce an entry for.
Returns:
A list of directives.
"""
date_year = datetime.date(year, 1, 1)
date_filing = (datetime.date(year + 1, 3, 20) +
datetime.timedelta(days=random.randint(0, 5)))
date_federal = (date_filing + datetime.timedelta(days=random.randint(0, 4)))
date_state = (date_filing + datetime.timedelta(days=random.randint(0, 4)))
quantum = D('0.01')
amount_federal = D(max(random.normalvariate(500, 120), 12)).quantize(quantum)
amount_federal_neg = -amount_federal
amount_state = D(max(random.normalvariate(300, 100), 10)).quantize(quantum)
amount_state_neg = -amount_state
amount_payable = -(amount_federal + amount_state)
amount_limit = RETIREMENT_LIMITS.get(year, RETIREMENT_LIMITS[None])
amount_limit_neg = -amount_limit
entries = parse(f"""
;; Open tax accounts for that year.
{date_year} open Expenses:Taxes:Y{year}:CC:Federal:PreTax401k DEFCCY
{date_year} open Expenses:Taxes:Y{year}:CC:Medicare CCY
{date_year} open Expenses:Taxes:Y{year}:CC:Federal CCY
{date_year} open Expenses:Taxes:Y{year}:CC:CityNYC CCY
{date_year} open Expenses:Taxes:Y{year}:CC:SDI CCY
{date_year} open Expenses:Taxes:Y{year}:CC:State CCY
{date_year} open Expenses:Taxes:Y{year}:CC:SocSec CCY
;; Check that the tax amounts have been fully used.
{date_year} balance Assets:CC:Federal:PreTax401k 0 DEFCCY
{date_year} * "Allowed contributions for one year"
Income:CC:Federal:PreTax401k {amount_limit_neg} DEFCCY
Assets:CC:Federal:PreTax401k {amount_limit} DEFCCY
{date_filing} * "Filing taxes for {year}"
Expenses:Taxes:Y{year}:CC:Federal {amount_federal:.2f} CCY
Expenses:Taxes:Y{year}:CC:State {amount_state:.2f} CCY
Liabilities:AccountsPayable {amount_payable:.2f} CCY
{date_federal} * "FEDERAL TAXPYMT"
Assets:CC:Bank1:Checking {amount_federal_neg:.2f} CCY
Liabilities:AccountsPayable {amount_federal:.2f} CCY
{date_state} * "STATE TAX & FINANC PYMT"
Assets:CC:Bank1:Checking {amount_state_neg:.2f} CCY
Liabilities:AccountsPayable {amount_state:.2f} CCY
""")
return [entry for entry in entries if entry.date < date_max]
beancount.scripts.example.generate_tax_preamble(date_birth)
生成与任何特定年度无关的税务声明。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_tax_preamble(date_birth):
"""Generate tax declarations not specific to any particular year.
Args:
date_birth: A date instance, the birth date of the character.
Returns:
A list of directives.
"""
return parse(f"""
;; Tax accounts not specific to a year.
{date_birth} open Income:CC:Federal:PreTax401k DEFCCY
{date_birth} open Assets:CC:Federal:PreTax401k DEFCCY
""")
beancount.scripts.example.generate_taxable_investment(date_begin, date_end, entries, price_map, stocks)
为投资账户生成期初指令和交易记录。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_taxable_investment(date_begin, date_end, entries, price_map, stocks):
"""Generate opening directives and transactions for an investment account.
Args:
date_begin: A date instance, the beginning date.
date_end: A date instance, the end date.
entries: A list of entries that contains at least the transfers to the investment
account's cash account.
price_map: A dict of prices, as per beancount.core.prices.build_price_map().
stocks: A list of strings, the list of commodities to invest in.
Returns:
A list of directives.
"""
account = 'Assets:CC:Investment'
income = 'Income:CC:Investment'
account_cash = join(account, 'Cash')
account_gains = '{income}:PnL'.format(income=income)
dividends = 'Dividend'
accounts_stocks = ['Assets:CC:Investment:{}'.format(commodity)
for commodity in stocks]
open_entries = parse(f"""
{date_begin} open {account}:Cash CCY
{date_begin} open {account_gains} CCY
""")
for stock in stocks:
open_entries.extend(parse(f"""
{date_begin} open {account}:{stock} {stock}
{date_begin} open {income}:{stock}:{dividends} CCY
"""))
# Figure out dates at which dividends should be distributed, near the end of
# each quarter.
days_to = datetime.timedelta(days=3*90-10)
dividend_dates = []
for quarter_begin in iter_quarters(date_begin, date_end):
end_of_quarter = quarter_begin + days_to
if not (date_begin < end_of_quarter < date_end):
continue
dividend_dates.append(end_of_quarter)
# Iterate over all the dates, but merging in the postings for the cash
# account.
min_amount = D('1000.00')
round_amount = D('100.00')
commission = D('8.95')
round_units = D('1')
frac_invest = D('1.00')
frac_dividend = D('0.004')
p_daily_buy = 1./15 # days
p_daily_sell = 1./90 # days
stocks_inventory = inventory.Inventory()
new_entries = []
dividend_date_iter = iter(dividend_dates)
next_dividend_date = next(dividend_date_iter, None)
for date, balances in iter_dates_with_balance(date_begin, date_end,
entries, [account_cash]):
# Check if we should insert a dividend. Note that we could not factor
# this out because we want to explicitly reinvest the cash dividends and
# we also want the dividends to be proportional to the amount of
# invested stock, so one feeds on the other and vice-versa.
if next_dividend_date and date > next_dividend_date:
# Compute the total balances for the stock accounts in order to
# create a realistic dividend.
total = inventory.Inventory()
for account_stock in accounts_stocks:
total.add_inventory(balances[account_stock])
# Create an entry offering dividends of 1% of the portfolio.
portfolio_cost = total.reduce(convert.get_cost).get_currency_units('CCY').number
amount_cash = (frac_dividend * portfolio_cost).quantize(D('0.01'))
amount_cash_neg = -amount_cash
stock = random.choice(stocks)
cash_dividend = parse(f"""
{next_dividend_date} * "Dividends on portfolio"
{account}:Cash {amount_cash:.2f} CCY
{income}:{stock}:{dividends} {amount_cash_neg:.2f} CCY
""")[0]
new_entries.append(cash_dividend)
# Advance the next dividend date.
next_dividend_date = next(dividend_date_iter, None)
# If the balance is high, buy with high probability.
balance = balances[account_cash]
total_cash = balance.get_currency_units('CCY').number
assert total_cash >= ZERO, ('Cash balance is negative: {}'.format(total_cash))
invest_cash = total_cash * frac_invest - commission
if invest_cash > min_amount:
if random.random() < p_daily_buy:
commodities = random.sample(stocks, random.randint(1, len(stocks)))
lot_amount = round_to(invest_cash / len(commodities), round_amount)
invested_amount = ZERO
for stock in commodities:
# Find the price at that date.
_, price = prices.get_price(price_map, (stock, 'CCY'), date)
units = round_to((lot_amount / price), round_units)
if units <= ZERO:
continue
amount_cash = -(units * price + commission)
# logging.info('Buying %s %s @ %s CCY = %s CCY',
# units, stock, price, units * price)
buy = parse(f"""
{date} * "Buy shares of {stock}"
{account}:Cash {amount_cash:.2f} CCY
{account}:{stock} {units:.0f} {stock} {{{price:.2f} CCY}}
Expenses:Financial:Commissions {commission:.2f} CCY
""")[0]
new_entries.append(buy)
account_stock = ':'.join([account, stock])
balances[account_cash].add_position(buy.postings[0])
balances[account_stock].add_position(buy.postings[1])
stocks_inventory.add_position(buy.postings[1])
# Don't sell on days you buy.
continue
# Otherwise, sell with low probability.
if not stocks_inventory.is_empty() and random.random() < p_daily_sell:
# Choose the lot with the highest gain or highest loss.
gains = []
for position in stocks_inventory.get_positions():
base_quote = (position.units.currency, position.cost.currency)
_, price = prices.get_price(price_map, base_quote, date)
if price == position.cost.number:
continue # Skip lots without movement.
market_value = position.units.number * price
book_value = convert.get_cost(position).number
gain = market_value - book_value
gains.append((gain, market_value, price, position))
if not gains:
continue
# Sell either biggest winner or biggest loser.
biggest = bool(random.random() < 0.5)
lot_tuple = sorted(gains)[0 if biggest else -1]
gain, market_value, price, sell_position = lot_tuple
#logging.info('Selling {} for {}'.format(sell_position, market_value))
sell_position = -sell_position
stock = sell_position.units.currency
amount_cash = market_value - commission
amount_gain = -gain
sell = parse(f"""
{date} * "Sell shares of {stock}"
{account}:{stock} {sell_position} @ {price:.2f} CCY
{account}:Cash {amount_cash:.2f} CCY
Expenses:Financial:Commissions {commission:.2f} CCY
{account_gains} {amount_gain:.2f} CCY
""")[0]
new_entries.append(sell)
balances[account_cash].add_position(sell.postings[1])
stocks_inventory.add_position(sell.postings[0])
continue
return open_entries + new_entries
beancount.scripts.example.generate_trip_entries(date_begin, date_end, tag, config, trip_city, home_city, account_credit)
为一次旅行生成更密集的支出记录。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def generate_trip_entries(date_begin, date_end,
tag, config,
trip_city, home_city,
account_credit):
"""Generate more dense expenses for a trip.
Args:
date_begin: A datetime.date instance, the beginning of the trip.
date_end: A datetime.date instance, the end of the trip.
tag: A string, the name of the tag.
config: A list of (payee name, account name, (mu, 3sigma)), where
mu is the mean of the prices to generate and 3sigma is 3 times
the standard deviation.
trip_city: A string, the capitalized name of the destination city.
home_city: A string, the name of the home city.
account_credit: A string, the name of the credit card account to pay
the expenses from.
Returns:
A list of entries for the trip, all tagged with the given tag.
"""
p_day_generate = 0.3
new_entries = []
for date in date_iter(date_begin, date_end):
for payee, account_expense, (mu, sigma3) in config:
if random.random() < p_day_generate:
amount = random.normalvariate(mu, sigma3 / 3.)
amount_neg = -amount
new_entries.extend(parse(f"""
{date} * "{payee}" "" #{tag}
{account_credit} {amount_neg:.2f} CCY
{account_expense} {amount:.2f} CCY
"""))
# Consume the vacation days.
vacation_hrs = (date_end - date_begin).days * 8 # hrs/day
new_entries.extend(parse(f"""
{date_end} * "Consume vacation days"
Assets:CC:Employer1:Vacation -{vacation_hrs:.2f} VACHR
Expenses:Vacation {vacation_hrs:.2f} VACHR
"""))
# Generate events for the trip.
new_entries.extend(parse(f"""
{date_begin} event "location" "{trip_city}"
{date_end} event "location" "{home_city}"
"""))
return new_entries
beancount.scripts.example.get_minimum_balance(entries, account, currency)
根据条目历史计算指定账户的最低余额。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def get_minimum_balance(entries, account, currency):
"""Compute the minimum balance of the given account according to the entries history.
Args:
entries: A list of directives.
account: An account string.
currency: A currency string, for which we want to compute the minimum.
Returns:
A Decimal number, the minimum amount throughout the history of this account.
"""
min_amount = ZERO
for _, balances in postings_for(entries, [account]):
balance = balances[account]
current = balance.get_currency_units(currency).number
if current < min_amount:
min_amount = current
return min_amount
beancount.scripts.example.iter_dates_with_balance(date_begin, date_end, entries, accounts)
迭代日期,并包含 postings 迭代器的余额。
| 参数: |
|
|---|
生成:包含 (data, balances) 对的序列,其中 date:一个 datetime.date 实例;balances:一个 Inventory 对象,表示当前余额。您可以修改该 Inventory 对象以反馈余额的变更。
源代码位于 beancount/scripts/example.py
def iter_dates_with_balance(date_begin, date_end, entries, accounts):
"""Iterate over dates, including the balances of the postings iterator.
Args:
postings_iter: An iterator of postings as per postings_for().
date_begin: The start date.
date_end: The end date.
Yields:
Pairs of (data, balances) objects, with
date: A datetime.date instance
balances: An Inventory object, representing the current balance.
You can modify the inventory object to feed back changes in the
balance.
"""
balances = collections.defaultdict(inventory.Inventory)
merged_txn_postings = iter(merge_postings(entries, accounts))
txn_posting = next(merged_txn_postings, None)
for date in date_iter(date_begin, date_end):
while txn_posting and txn_posting.txn.date == date:
posting = txn_posting.posting
balances[posting.account].add_position(posting)
txn_posting = next(merged_txn_postings, None)
yield date, balances
beancount.scripts.example.iter_quarters(date_begin, date_end)
迭代开始日期和结束日期之间的所有季度。
| 参数: |
|
|---|
生成:每个季度起始日的 datetime.date 实例。这将包含起始日期和结束日期所在的季度。
源代码位于 beancount/scripts/example.py
def iter_quarters(date_begin, date_end):
"""Iterate over all quarters between begin and end dates.
Args:
date_begin: The start date.
date_end: The end date.
Yields:
Instances of datetime.date at the beginning of the quarters. This will
include the quarter of the beginning date and of the end date.
"""
quarter = (date_begin.year, (date_begin.month-1)//3)
quarter_last = (date_end.year, (date_end.month-1)//3)
assert quarter <= quarter_last
while True:
year, trimester = quarter
yield datetime.date(year, trimester*3 + 1, 1)
if quarter == quarter_last:
break
trimester = (trimester + 1) % 4
if trimester == 0:
year += 1
quarter = (year, trimester)
beancount.scripts.example.merge_postings(entries, accounts)
合并指定账户名称的所有过账项。
| 参数: |
|
|---|
生成:所有账户的 TxnPosting 列表,按顺序排列。
源代码位于 beancount/scripts/example.py
def merge_postings(entries, accounts):
"""Merge all the postings from the given account names.
Args:
entries: A list of directives.
accounts: A list of account strings to get the balances for.
Yields:
A list of TxnPosting's for all the accounts, in sorted order.
"""
real_root = realization.realize(entries)
merged_postings = []
for account in accounts:
real_account = realization.get(real_root, account)
if real_account is None:
continue
merged_postings.extend(txn_posting
for txn_posting in real_account.txn_postings
if isinstance(txn_posting, data.TxnPosting))
merged_postings.sort(key=lambda txn_posting: txn_posting.txn.date)
return merged_postings
beancount.scripts.example.parse(input_string)
解析某个输入字符串,并确保无错误。
此解析函数不仅创建对象,还会触发本地插值以填充缺失的金额。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def parse(input_string):
"""Parse some input string and assert no errors.
This parse function does not just create the object, it also triggers local
interpolation to fill in the missing amounts.
Args:
input_string: Beancount input text.
Returns:
A list of directive objects.
"""
entries, errors, options_map = parser.parse_string(textwrap.dedent(input_string))
if errors:
printer.print_errors(errors, file=sys.stderr)
raise ValueError("Parsed text has errors")
# Interpolation.
entries, unused_balance_errors = booking.book(entries, options_map)
return data.sorted(entries)
beancount.scripts.example.postings_for(entries, accounts, before=False)
实现条目并获取指定账户的过账列表。
所有非过账指令已被过滤掉。
| 参数: |
|
|---|
生成:元组,包含:posting:TxnPosting 实例;balances:指定账户在应用过账后的库存余额字典。这些库存对象可被修改,以调整因后续生成交易而需应用的余额。
源代码位于 beancount/scripts/example.py
def postings_for(entries, accounts, before=False):
"""Realize the entries and get the list of postings for the given accounts.
All the non-Posting directives are already filtered out.
Args:
entries: A list of directives.
accounts: A list of account strings to get the balances for.
before: A boolean, if true, yield the balance before the position is applied.
The default is to yield the balance after applying the position.
Yields:
Tuples of:
posting: An instance of TxnPosting
balances: A dict of Inventory balances for the given accounts _after_
applying the posting. These inventory objects can be mutated to adjust
the balance due to generated transactions to be applied later.
"""
assert isinstance(accounts, list)
merged_txn_postings = merge_postings(entries, accounts)
balances = collections.defaultdict(inventory.Inventory)
for txn_posting in merged_txn_postings:
if before:
yield txn_posting, balances
posting = txn_posting.posting
balances[posting.account].add_position(posting)
if not before:
yield txn_posting, balances
beancount.scripts.example.price_series(start, mu, sigma)
基于简单的随机模型生成价格序列。
| 参数: |
|
|---|
生成:每一步的浮点数。
源代码位于 beancount/scripts/example.py
def price_series(start, mu, sigma):
"""Generate a price series based on a simple stochastic model.
Args:
start: The beginning value.
mu: The per-step drift, in units of value.
sigma: Volatility of the changes.
Yields:
Floats, at each step.
"""
value = start
while 1:
yield value
value += random.normalvariate(mu, sigma) * value
beancount.scripts.example.replace(string, replacements, strip=False)
对缩进字符串应用带单词边界限制的正则表达式替换。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/example.py
def replace(string, replacements, strip=False):
"""Apply word-boundaried regular expression replacements to an indented string.
Args:
string: Some input template string.
replacements: A dict of regexp to replacement value.
strip: A boolean, true if we should strip the input.
Returns:
The input string with the replacements applied to it, with the indentation removed.
"""
output = textwrap.dedent(string)
if strip:
output = output.strip()
for from_, to_ in replacements.items():
if not isinstance(to_, str) and not callable(to_):
to_ = str(to_)
output = re.sub(r'\b{}\b'.format(from_), to_, output)
return output
beancount.scripts.example.validate_output(contents, positive_accounts, currency)
检查输出文件是否通过验证。
| 参数: |
|
|---|
| 异常: |
|
|---|
源代码位于 beancount/scripts/example.py
def validate_output(contents, positive_accounts, currency):
"""Check that the output file validates.
Args:
contents: A string, the output file.
positive_accounts: A list of strings, account names to check for
non-negative balances.
currency: A string, the currency to check minimums for.
Raises:
AssertionError: If the output does not validate.
"""
loaded_entries, _, _ = loader.load_string(
contents,
log_errors=sys.stderr,
extra_validations=validation.HARDCORE_VALIDATIONS)
# Sanity checks: Check that the checking balance never goes below zero.
for account in positive_accounts:
check_non_negative(loaded_entries, account, currency)
beancount.scripts.example.write_example_file(date_birth, date_begin, date_end, reformat, file)
生成示例文件。
| 参数: |
|
|---|
源代码位于 beancount/scripts/example.py
def write_example_file(date_birth, date_begin, date_end, reformat, file):
"""Generate the example file.
Args:
date_birth: A datetime.date instance, the birth date of our character.
date_begin: A datetime.date instance, the beginning date at which to generate
transactions.
date_end: A datetime.date instance, the end date at which to generate
transactions.
reformat: A boolean, true if we should apply global reformatting to this file.
file: A file object, where to write out the output.
"""
# The following code entirely writes out the output to generic names, such
# as "Employer1", "Bank1", and "CCY" (for principal currency). Those names
# are purposely chosen to be unique, and only near the very end do we make
# renamings to more specific and realistic names.
# Name of the checking account.
account_opening = 'Equity:Opening-Balances'
account_payable = 'Liabilities:AccountsPayable'
account_checking = 'Assets:CC:Bank1:Checking'
account_credit = 'Liabilities:CC:CreditCard1'
account_retirement = 'Assets:CC:Retirement'
account_investing = 'Assets:CC:Investment:Cash'
# Commodities.
commodity_entries = generate_commodity_entries(date_birth)
# Estimate the rent.
rent_amount = round_to(ANNUAL_SALARY / RENT_DIVISOR, RENT_INCREMENT)
# Get a random employer.
employer_name, employer_address = random.choice(EMPLOYERS)
logging.info("Generating Salary Employment Income")
income_entries = generate_employment_income(employer_name, employer_address,
ANNUAL_SALARY,
account_checking,
join(account_retirement, 'Cash'),
date_begin, date_end)
logging.info("Generating Expenses from Banking Accounts")
banking_expenses = generate_banking_expenses(date_begin, date_end,
account_checking, rent_amount)
logging.info("Generating Regular Expenses via Credit Card")
credit_regular_entries = generate_regular_credit_expenses(
date_birth, date_begin, date_end, account_credit, account_checking)
logging.info("Generating Credit Card Expenses for Trips")
trip_entries = []
destinations = sorted(TRIP_DESTINATIONS.items())
destinations.extend(destinations)
random.shuffle(destinations)
for (date_trip_begin, date_trip_end), (destination_name, config) in zip(
compute_trip_dates(date_begin, date_end), destinations):
# Compute a suitable tag.
tag = 'trip-{}-{}'.format(destination_name.lower().replace(' ', '-'),
date_trip_begin.year)
#logging.info("%s -- %s %s", tag, date_trip_begin, date_trip_end)
# Remove regular entries during this trip.
credit_regular_entries = [entry
for entry in credit_regular_entries
if not(date_trip_begin <= entry.date < date_trip_end)]
# Generate entries for the trip.
this_trip_entries = generate_trip_entries(
date_trip_begin, date_trip_end,
tag, config,
destination_name.replace('-', ' ').title(), HOME_NAME,
account_credit)
trip_entries.extend(this_trip_entries)
logging.info("Generating Credit Card Payment Entries")
credit_payments = generate_clearing_entries(
delay_dates(rrule.rrule(rrule.MONTHLY,
dtstart=date_begin, until=date_end, bymonthday=7), 0, 4),
"CreditCard1", "Paying off credit card",
credit_regular_entries,
account_credit, account_checking)
credit_entries = credit_regular_entries + trip_entries + credit_payments
logging.info("Generating Tax Filings and Payments")
tax_preamble = generate_tax_preamble(date_birth)
# Figure out all the years we need tax accounts for.
years = set()
for account_name in getters.get_accounts(income_entries):
match = re.match(r'Expenses:Taxes:Y(\d\d\d\d)', account_name)
if match:
years.add(int(match.group(1)))
taxes = [(year, generate_tax_accounts(year, date_end)) for year in sorted(years)]
tax_entries = tax_preamble + functools.reduce(operator.add,
(entries
for _, entries in taxes))
logging.info("Generating Opening of Banking Accounts")
# Open banking accounts and gift the checking account with a balance that
# will offset all the amounts to ensure a positive balance throughout its
# lifetime.
entries_for_banking = data.sorted(income_entries +
banking_expenses +
credit_entries +
tax_entries)
minimum = get_minimum_balance(entries_for_banking,
account_checking, 'CCY')
banking_entries = generate_banking(entries_for_banking,
date_begin, date_end,
max(-minimum, ZERO))
logging.info("Generating Transfers to Investment Account")
banking_transfers = generate_outgoing_transfers(
data.sorted(income_entries +
banking_entries +
banking_expenses +
credit_entries +
tax_entries),
account_checking,
account_investing,
transfer_minimum=D('200'),
transfer_threshold=D('3000'),
transfer_increment=D('500'))
logging.info("Generating Prices")
# Generate price entries for investment currencies and create a price map to
# use for later for generating investment transactions.
funds_allocation = {'MFUND1': 0.40, 'MFUND2': 0.60}
stocks = ['STK1', 'STK2', 'STK3', 'STK4']
price_entries = generate_prices(date_begin, date_end,
sorted(funds_allocation.keys()) + stocks, 'CCY')
price_map = prices.build_price_map(price_entries)
logging.info("Generating Employer Match Contribution")
account_match = 'Income:US:Employer1:Match401k'
retirement_match = generate_retirement_employer_match(income_entries,
join(account_retirement, 'Cash'),
account_match)
logging.info("Generating Retirement Investments")
retirement_entries = generate_retirement_investments(
income_entries + retirement_match, account_retirement,
sorted(funds_allocation.items()),
price_map)
logging.info("Generating Taxes Investments")
investment_entries = generate_taxable_investment(date_begin, date_end,
banking_transfers, price_map,
stocks)
logging.info("Generating Expense Accounts")
expense_accounts_entries = generate_expense_accounts(date_birth)
logging.info("Generating Equity Accounts")
equity_entries = generate_open_entries(date_birth, [account_opening,
account_payable])
logging.info("Generating Balance Checks")
credit_checks = generate_balance_checks(credit_entries, account_credit,
date_random_seq(date_begin, date_end, 20, 30))
banking_checks = generate_balance_checks(data.sorted(income_entries +
banking_entries +
banking_expenses +
banking_transfers +
credit_entries +
tax_entries),
account_checking,
date_random_seq(date_begin, date_end, 20, 30))
logging.info("Outputting and Formatting Entries")
dcontext = display_context.DisplayContext()
default_int_digits = 8
for currency, precision in {'USD': 2,
'CAD': 2,
'VACHR':0,
'IRAUSD': 2,
'VBMPX': 3,
'RGAGX': 3,
'ITOT': 0,
'VEA': 0,
'VHT': 0,
'GLD': 0}.items():
int_digits = default_int_digits
if precision > 0:
int_digits += 1 + precision
dcontext.update(D('{{:0{}.{}f}}'.format(int_digits, precision).format(0)), currency)
output = io.StringIO()
def output_section(title, entries):
output.write('\n\n\n{}\n\n'.format(title))
printer.print_entries(data.sorted(entries), dcontext, file=output)
output.write(FILE_PREAMBLE.format(**locals()))
output_section('* Commodities', commodity_entries)
output_section('* Equity Accounts', equity_entries)
output_section('* Banking', data.sorted(banking_entries +
banking_expenses +
banking_transfers +
banking_checks))
output_section('* Credit-Cards', data.sorted(credit_entries +
credit_checks))
output_section('* Taxable Investments', investment_entries)
output_section('* Retirement Investments', data.sorted(retirement_entries +
retirement_match))
output_section('* Sources of Income', income_entries)
output_section('* Taxes', tax_preamble)
for year, entries in taxes:
output_section('** Tax Year {}'.format(year), entries)
output_section('* Expenses', expense_accounts_entries)
output_section('* Prices', price_entries)
output_section('* Cash', [])
logging.info("Contextualizing to Realistic Names")
contents, replacements = contextualize_file(output.getvalue(), employer_name)
if reformat:
contents = format.align_beancount(contents)
logging.info("Writing contents")
file.write(contents)
logging.info("Validating Results")
validate_output(contents,
[replace(account, replacements)
for account in [account_checking]],
replace('CCY', replacements))
beancount.scripts.format
beancount.scripts.format.align_beancount(contents, prefix_width=None, num_width=None, currency_column=None)
重新格式化 Beancount 输入,使所有数字对齐到同一列。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/format.py
def align_beancount(contents, prefix_width=None, num_width=None, currency_column=None):
"""Reformat Beancount input to align all the numbers at the same column.
Args:
contents: A string, Beancount input syntax to reformat.
prefix_width: An integer, the width in characters to render the account
name to. If this is not specified, a good value is selected
automatically from the contents of the file.
num_width: An integer, the width to render each number. If this is not
specified, a good value is selected automatically from the contents of
the file.
currency_column: An integer, the column at which to align the currencies.
If given, this overrides the other options.
Returns:
A string, reformatted Beancount input with all the number aligned.
No other changes than whitespace changes should be present between that
return value and the input contents.
"""
# Find all lines that have a number in them and calculate the maximum length
# of the stripped prefix and the number.
match_pairs = []
for line in contents.splitlines():
match = regex.match(
rf'(^\d[^";]*?|\s+{account.ACCOUNT_RE})\s+'
rf'({PARENTHESIZED_BINARY_OP_RE}|{NUMBER_RE})\s+'
rf'((?:{amount.CURRENCY_RE})\b.*)',
line,
)
if match:
prefix, number, rest = match.groups()
match_pairs.append((prefix, number, rest))
else:
match_pairs.append((line, None, None))
# Normalize whitespace before lines that has some indent and an account
# name.
norm_match_pairs = normalize_indent_whitespace(match_pairs)
if currency_column:
output = io.StringIO()
for prefix, number, rest in norm_match_pairs:
if number is None:
output.write(prefix)
else:
num_of_spaces = currency_column - len(prefix) - len(number) - 4
spaces = ' ' * num_of_spaces
output.write(prefix + spaces + ' ' + number + ' ' + rest)
output.write('\n')
return output.getvalue()
# Compute the maximum widths.
filtered_pairs = [(prefix, number)
for prefix, number, _ in match_pairs
if number is not None]
if filtered_pairs:
max_prefix_width = max(len(prefix) for prefix, _ in filtered_pairs)
max_num_width = max(len(number) for _, number in filtered_pairs)
else:
max_prefix_width = 0
max_num_width = 0
# Use user-supplied overrides, if available
if prefix_width:
max_prefix_width = prefix_width
if num_width:
max_num_width = num_width
# Create a format that will admit the maximum width of all prefixes equally.
line_format = '{{:<{prefix_width}}} {{:>{num_width}}} {{}}'.format(
prefix_width=max_prefix_width,
num_width=max_num_width)
# Process each line to an output buffer.
output = io.StringIO()
for prefix, number, rest in norm_match_pairs:
if number is None:
output.write(prefix)
else:
output.write(line_format.format(prefix.rstrip(), number, rest))
output.write('\n')
formatted_contents = output.getvalue()
# Ensure that the file before and after have only whitespace differences.
# This is a sanity check, to make really sure we never change anything but whitespace,
# so it's safe.
# open('/tmp/before', 'w').write(regex.sub(r'[ \t]+', ' ', contents))
# open('/tmp/after', 'w').write(regex.sub(r'[ \t]+', ' ', formatted_contents))
old_stripped = regex.sub(r'[ \t\n]+', ' ', contents.rstrip())
new_stripped = regex.sub(r'[ \t\n]+', ' ', formatted_contents.rstrip())
assert (old_stripped == new_stripped), (old_stripped, new_stripped)
return formatted_contents
beancount.scripts.format.compute_most_frequent(iterable)
计算给定元素的频率,并返回出现最频繁的元素。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/format.py
def compute_most_frequent(iterable):
"""Compute the frequencies of the given elements and return the most frequent.
Args:
iterable: A collection of hashable elements.
Returns:
The most frequent element. If there are no elements in the iterable,
return None.
"""
frequencies = collections.Counter(iterable)
if not frequencies:
return None
counts = sorted((count, element)
for element, count in frequencies.items())
# Note: In case of a tie, this chooses the longest width.
# We could eventually make this an option.
return counts[-1][1]
beancount.scripts.format.normalize_indent_whitespace(match_pairs)
规范化具有缩进和账户名称的行前的空白字符。
| 参数: |
|
|---|
| 返回: |
|
|---|
源代码位于 beancount/scripts/format.py
def normalize_indent_whitespace(match_pairs):
"""Normalize whitespace before lines that has some indent and an account name.
Args:
match_pairs: A list of (prefix, number, rest) tuples.
Returns:
Another list of (prefix, number, rest) tuples, where prefix may have been
adjusted with a different whitespace prefix.
"""
# Compute most frequent account name prefix.
match_posting = regex.compile(r'([ \t]+)({}.*)'.format(account.ACCOUNT_RE)).match
width = compute_most_frequent(
len(match.group(1))
for match in (match_posting(prefix)
for prefix, _, _ in match_pairs)
if match is not None)
norm_format = ' ' * (width or 0) + '{}'
# Make the necessary adjustments.
adjusted_pairs = []
for tup in match_pairs:
prefix, number, rest = tup
match = match_posting(prefix)
if match is not None:
tup = (norm_format.format(match.group(2)), number, rest)
adjusted_pairs.append(tup)
return adjusted_pairs