注意
本文档适用于 Ceph 开发版本。
Ceph s3 select
概述
The S3 Select engine creates an efficient pipe between clients and Ceph back end nodes. The S3 Select engine works best when implemented as closely as possible to back end storage.
The S3 Select engine makes it possible to use an SQL-like syntax to select a restricted subset of data stored in an S3 object. The S3 Select engine facilitates the use of higher level, analytic applications (for example: SPARK-SQL). The ability of the S3 Select engine to target a proper subset of structed data within an S3 object decreases latency and increases throughput.
For example: assume that a user needs to extract a single column that is
filtered by another column, and that these colums are stored in a CSV file in
an S3 object that is several GB in size. The following query performs this
extraction: select customer-id from s3Object where age>30 and age<65;
Without the use of S3 Select, the whole S3 object must be retrieved from an OSD via RGW before the data is filtered and extracted. Significant network and CPU overhead are saved by “pushing down” the query into radosgw.
The bigger the object and the more accurate the query, the better the performance of s3select.
基本工作流程
S3 Select queries are sent to RGW via AWS-CLI
S3 Select passes the authentication and permission parameters as an incoming
message (POST). RGWSelectObj_ObjStore_S3::send_response_data
is the entry
point and handles each fetched chunk according to the object key that was
input. send_response_data
is the first to handle the input query: it
extracts the query and other CLI parameters.
RGW executes an S3 Select query on each new fetched chunk (up to 4 MB). The current implementation supports CSV objects. CSV rows are sometimes “cut” in the middle by the limits of the chunks, and those broken-lines (the first or last per chunk) are skipped while processing the query. Such broken lines are stored and later merged with the next broken line (which belongs to the next chunk), and only then processed.
For each processed chunk, an output message is formatted according to aws
specification
and sent back to the client. RGW supports the following response:
{:event-type,records} {:content-type,application/octet-stream}
{:message-type,event}
. For aggregation queries, the last chunk should be
identified as the end of input.
基本功能
S3select具有一套符合 AWS 的明确功能。
实现的软件架构支持基本算术表达式、逻辑和比较表达式,包括嵌套函数调用和类型转换操作符,这使用户具有很大的灵活性。
错误处理
检测到错误时,RGW 返回 400-Bad-Request 并向客户端发送一个特定的错误消息。
语法错误: s3select 解析器拒绝与解析器语法定义不一致的用户请求,如本文档所述。
处理时间错误: 运行时引擎可能会检测到仅在处理时发生的错误,对于此类错误,将使用不同的错误消息来描述。
功能支持
目前仅实现了 AWS select 命令的一部分,下表描述了当前支持的内容。AWS select command is implemented, table below describes what is currently supported.
下表描述了 s3-select 功能的当前实现:
功能 |
详细 |
示例 / 描述 |
---|---|---|
算术操作符 |
^ * % / + - ( ) |
select (int(_1)+int(_2))*int(_9) from s3object; |
|
select count(*) from s3object where cast(_1 as int)%2 = 0; |
|
|
select cast(2^10 as int) from s3object; |
|
比较操作符 |
> < >= <= = != |
select _1,_2 from s3object where (int(_1)+int(_3))>int(_5); |
逻辑操作符 |
AND OR NOT |
select count(*) from s3object where not (int(_1)>123 and int(_5)<200); |
逻辑操作符 |
is null |
在表达式中返回 true/false 以指示空值 |
逻辑操作符 |
is not null |
在表达式中返回 true/false 以指示空值 |
逻辑操作符和 NULL |
未知状态 |
审阅null-handle观察逻辑操作符与 NULL 的结果。0. select count(*) from s3object where null and (3>2); |
算术操作符与 NULL |
未知状态 |
审阅null-handle观察与 NULL 的二元运算结果0. select count(*) from s3object where (null+1) and (3>2); |
与 NULL 比较 |
未知状态 |
审阅null-handle观察与 NULL 的比较操作结果0. select count(*) from s3object where (null*1.5) != 3; |
缺失列 |
未知状态 |
select count(*) from s3object where _1 is null; |
查询正在过滤返回非空结果的行。 |
select count(*) from s3object where (_1 > 12 and _2 = 0) is not null; |
|
投影列 |
类似于 |
select case |
投影列 |
类似于 |
select case |
逻辑操作符 |
select coalesce(nullif(5,5),nullif(1,1.0),age+12) from s3object; |
|
逻辑操作符 |
select nullif(cast(_1 as int),cast(_2 as int)) from s3object; |
|
逻辑操作符 |
select count(*) from s3object |
|
逻辑操作符 |
select count(*) from s3object |
|
逻辑操作符 |
select count(*) from s3object where first_name like ‘%de_’; select count(*) from s3object where _1 like "%a[r-s]; |
|
逻辑操作符 |
select count(*) from s3object where “jok_ai” like “%#_ai” escape “#”; |
|
true / false |
select (cast(_1 as int)>123 = true) from s3object |
|
别名作为谓词投影 |
select (_1 like “_3_”) aslikealias,_1 from s3objectlikealias= true and cast(_1 as int) between 800 and 900; |
|
类型转换操作符 |
select cast(123 as int)%2 from s3object; |
|
类型转换操作符 |
select cast(123.456 as float)%2 from s3object; |
|
类型转换操作符 |
select cast(‘ABC0-9’ as string),cast(substr(‘ab12cd’,3,2) as int)*4 from s3object; |
|
类型转换操作符 |
select cast(5 as bool) from s3object; |
|
类型转换操作符 |
select cast(substring(‘publish on 2007-01-01’,12,10) as timestamp) from s3object; |
|
非 AWS 类型转换操作符 |
select int(_1),int( 1.2 + 3.4) from s3object; |
|
非 AWS 类型转换操作符 |
select float(1.2) from s3object; |
|
非 AWS 类型转换操作符 |
select to_timestamp(‘1999-10-10T12:23:44Z’) from s3object; |
|
聚合函数 |
每个OSD的PG副本之和 |
select sum(int(_1)) from s3object; |
聚合函数 |
平均值 |
select avg(cast(_1 a float) + cast(_2 as int)) from s3object; |
聚合函数 |
min |
select min( int(_1) * int(_5) ) from s3object; |
聚合函数 |
max |
select max(float(_1)),min(int(_5)) from s3object; |
聚合函数 |
count |
select count(*) from s3object where (int(_1)+int(_3))>int(_5); |
时间戳函数 |
提取 |
select count(*) from s3object where |
时间戳函数 |
date_add |
select count(0) from s3object where |
时间戳函数 |
date_diff |
select count(0) from s3object where |
时间戳函数 |
utcnow |
select count(0) from s3object where |
时间戳函数 |
to_string |
select to_string(
|
字符串函数 |
substring |
select count(0) from s3object where |
substring with |
select substring(“123456789” from -4) from s3object; |
|
substring with |
select substring(“123456789” from 0 for 100) from s3object; |
|
字符串函数 |
trim |
select trim(’ foobar ‘) from s3object; |
字符串函数 |
trim |
select trim(trailing from ‘ foobar ‘) from s3object; |
字符串函数 |
trim |
select trim(leading from ‘ foobar ‘) from s3object; |
字符串函数 |
trim |
select trim(both ‘12’ from ‘1112211foobar22211122’) from s3objects; |
字符串函数 |
lower/upper |
select lower(‘ABcD12#$e’) from s3object; |
字符串函数 |
char_length |
select count(*) from s3object where char_length(_3)=3; |
复杂查询 |
select sum(cast(_1 as int)), |
|
别名支持 |
select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 |
NULL
NULL 是 ceph-s3select 系统中的一个合法值,类似于其他 DB 系统,即系统需要处理值缺失的情况。
在我们的上下文中,NULL 的定义是缺失/未知,因此NULL 在任何算术运算中都不能产生值( a + NULL 将产生 NULL 值)。
情况与算术比较相同,任何与 NULL 的比较都是 NULL,即未知。
A 是 NULL |
结果 (NULL=未知) |
---|---|
NOT A |
NULL |
A OR False |
NULL |
A OR True |
True |
A OR A |
NULL |
A AND False |
False |
A AND True |
NULL |
A and A |
NULL |
S3-select 函数接口
时间戳函数
AWS-specs 中描述的时间戳功能已完全实现。AWS-specs is fully implemented.
to_timestamp( string )
: 类型转换操作符将字符串转换为时间戳基本类型。 to_timestamp 操作符能够将以下字符串格式转换为时间戳。在字符串格式中,如果时间(或其一部分)缺失,则用零替换缺失的部分。对于缺失的月份和日期,1 是它们的默认值。时区部分采用格式YYYY-MM-DDTHH:mm:ss.SSSSSS+/-HH:mm
,YYYY-MM-DDTHH:mm:ss.SSSSSSZ
,YYYY-MM-DDTHH:mm:ss+/-HH:mm
,YYYY-MM-DDTHH:mm:ssZ
,YYYY-MM-DDTHH:mm+/-HH:mm
,YYYY-MM-DDTHH:mmZ
,YYYY-MM-DDT
或YYYYT
string formats into timestamp. Where time (or part of it) is missing in the string format, zero’s are replacing the missing parts. And for missing month and day, 1 is default value for them. Timezone part is in format+/-HH:mm
或Z
,其中字母“Z”表示协调世界时(UTC)。时区值范围在 -12:00 到 +14:00 之间。
extract(date-part from timestamp)
: 该函数从输入时间戳中提取日期部分并作为整数返回。支持的日期部分:年、月、周、日、小时、分钟、秒、时区小时、时区分钟。
date_add(date-part, quantity, timestamp)
: 该函数将数量(整数)添加到时间戳的日期部分并返回结果作为时间戳。它还包括时区计算。支持的日期部分:年、月、日、小时、分钟、秒。
date_diff(date-part, timestamp, timestamp)
: 该函数返回一个整数,它是根据日期部分计算出的两个时间戳之间的差值。它包括时区计算。支持的日期部分:年、月、日、小时、分钟、秒。
utcnow()
: 返回当前时间的 时间戳。
to_string(timestamp, format_pattern)
: 返回输入时间戳的字符串表示,格式为给定的输入字符串格式。
to_string 参数
格式 |
示例 |
描述 |
---|---|---|
yy |
69 |
两位数的年份 |
y |
1969 |
四位数的年份 |
yyyy |
1969 |
零填充的四位数年份 |
M |
1 |
年份中的月份 |
MM |
01 |
零填充的月份 |
MMM |
Jan |
缩写月份年份名称 |
MMMM |
January |
全月份名称 |
MMMMM |
J |
年份中的月份首字母(注意:不适用于与 to_timestamp 函数一起使用) |
d |
2 |
月份中的日(1-31) |
dd |
02 |
AM |
a |
AM |
AM 或 PM |
h |
3 |
小时的分钟(0-59) |
hh |
03 |
零填充的小时(01-12) |
H |
3 |
小时的分钟(0-23) |
HH |
03 |
零填充的小时(00-23) |
m |
4 |
分钟的秒(0-59) |
mm |
04 |
零填充的分钟(00-59) |
s |
5 |
秒的秒(0-59) |
ss |
05 |
零填充的秒(00-59) |
S |
0 |
秒的小数部分(精度:0.1,范围:0.0-0.9) |
SS |
6 |
秒的小数部分(精度:0.01,范围:0.0-0.99) |
SSS |
60 |
秒的小数部分(最大精度:1 纳秒,范围:0.0-0999999999) |
SSSSSS |
60000000 |
秒的小数部分(最大精度:1 纳秒,范围:0.0-0999999999) |
n |
60000000 |
纳秒 |
X |
+07 或 Z |
小时偏移或“Z”(如果偏移为 0) |
XX 或 XXXX |
+0700 或 Z |
小时和分钟的偏移或“Z”(如果偏移为 22:00 |
XXX or XXXXX |
+07:00 or Z |
小时和分钟的偏移或“Z”(如果偏移为 22:00 |
X |
7 |
Offset in hours |
xx or xxxx |
700 |
小时和分钟的偏移 |
xxx 或 xxxxx |
+07:00 |
小时和分钟的偏移 |
聚合函数
count()
: 根据符合条件(如果存在)的行数返回整数。
sum(expression)
: 返回所有符合条件(如果存在)的行的表达式摘要。
avg(expression)
: 返回所有符合条件(如果存在)的表达式的平均值。
max(expression)
: 返回所有符合条件(如果存在)的表达式的最大结果。
min(expression)
: 返回所有符合条件(如果存在)的表达式的最小结果。
字符串函数
substring(string,from,to)
: substring( stringfrom
start [for
length ] )substring(string from )
substring(string from for)
char_length
: 返回字符串中的字符数(2723b3: 执行相同的操作)。character_length
does the same).
trim
: trim ( [[leading
| trailing
| both
移除字符]from
] 字符串 )
upper\lower
: 将字符转换为小写/大写。
SQL 限制操作符
SQL 限制操作符用于限制查询处理的行数。
别名
别名编程结构是 s3-select 语言的重要组成部分,它在使用包含许多列的对象或复杂查询的情况下,使用户能够更好地编程。
在解析包含别名结构的语句时,它将别名替换为正确的投影列的引用,在查询执行时间,该引用被评估为任何其他表达式。
存在自我(或循环)引用可能性的风险,这可能导致堆栈溢出(无限循环),为此,在评估别名时,会验证循环引用。
别名还维护一个结果缓存,这意味着给定别名的连续使用不会再次评估表达式。相反,结果从缓存中返回。
每行新数据都会使缓存失效,因为结果可能不同。
测试
s3select
包含多个测试框架,为它的功能提供了广泛的覆盖。
(1) 与可信引擎进行比较,这意味着 C/C++ 编译器是一个可信的表达式评估器,因为算术和逻辑表达式的语法是相同的(s3select 与 C 比较),框架运行等价表达式并验证它们的结果。
(2) 比较语法不同但语义等价的查询的结果。
例如,在一个包含随机数字(1-1000)的数据集中,以下查询将产生相同的结果。select count(*) from s3object where char_length(_3)=3;
select count(*) from s3object where cast(_3 as int)>99 and cast(_3 as int)<1000;
常量数据集,传统的测试方法。查询正在处理一个常量数据集,其结果与常量结果进行验证。
附加语法支持
S3select 语法支持表别名select s._1 from s3object s where s._2 = ‘4’;
S3select 语法支持不区分大小写Select SUM(Cast(_1 as int)) FROM S3Object;
S3select 语法支持没有关闭分号的语句select count(*) from s3object
向 RGW 发送查询
任何 HTTP 客户端都可以向 RGW 发送s3-select
请求,该请求必须符合AWS 请求语法.
使用 AWS CLI 向 RGW 发送请求时,客户端必须遵循s3-select
request to RGW using AWS CLI, clients must follow AWS 命令参考。
aws --endpoint-url http://localhost:8000 s3api select-object-content
--bucket {BUCKET-NAME}
--expression-type 'SQL'
--scan-range '{"Start" : 1000, "End" : 1000000}'
--input-serialization
'{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}'
--output-serialization '{"CSV": {"FieldDelimiter": ":", "RecordDelimiter":"\t", "QuoteFields": "ALWAYS"}}'
--key {OBJECT-NAME}
--request-progress '{"Enabled": True}'
--expression "select count(0) from s3object where int(_1)<10;" output.csv
输入序列化
FileHeaderInfo-> (string)
NONE: 第一行不是标题。IGNORE: 第一行是标题,但你不能用标题值来指示表达式中的列。SELECT s._1 FROM S3OBJECT s
).
USE: 第一行是标题,并且你可以使用标题值来标识表达式中的列 (SELECT column_name FROM S3OBJECT
).
QuoteEscapeCharacter-> (string)
RecordDelimiter-> (string)
FieldDelimiter-> (string)
输出序列化
AWS CLI 示例
aws s3api select-object-content --bucket “mybucket” --key keyfile1 --expression “SELECT * FROM s3object s” --expression-type ‘SQL’ --request-progress ‘{“Enabled”: false}’ --input-serialization ‘{“CSV”: {“FieldDelimiter”: “,”}, “CompressionType”: “NONE”}’ --output-serialization ‘{“CSV”: {“Delimiter”: “:”, “RecordDelimiter”:”\t”, “QuoteFields”: “ALWAYS”}}’ /dev/stdout
QuoteFields-> (string)ALWAYS: 始终使用引号输出字段。ASNEEDED(未实现):当需要时使用引号输出字段。
RecordDelimiter-> (string)
FieldDelimiter-> (string)
扫描范围选项
AWS-CLI 的扫描范围选项使客户端能够扫描和处理对象的部分。
CSV 解析行为
The
s3-select
引擎包含一个 CSV 解析器,它按照以下方式解析 s3-objects。row-delimiter
结尾。field-separator
分隔相邻列,连续实例field separator
定义空列。quote-character
覆盖field separator
,这意味着field separator
被视为引号内的任何字符。escape character
禁用特殊字符的解释,除了row delimiter
.以下是一些 CSV 解析规则的示例。
功能 |
描述 |
输入 ==> tokens |
---|---|---|
NULL |
连续字段分隔符 |
,,1,,2, ==> {null}{null}{1}{null}{2}{null} |
QUOTE |
引号字符 |
11,22,”a,b,c,d”,last ==> {11}{22}{“a,b,c,d”}{last} |
Escape |
转义字符 |
11,22,str=\”abcd\”\,str2=\”123\”,last |
行分隔符 |
没有关闭引号, |
11,22,a=”str,44,55,66 |
csv 头信息 |
FileHeaderInfo |
“USE” 值表示第一行上的每个标记都是列名,IGNORE” 值表示跳过第一行 |
JSON
已集成了 JSON 读取器,允许客户端使用 SQL 语句扫描和提取 JSON 文档中的信息。s3select-engine
, which allows the client to use SQL statements to scan and extract information from JSON documents.
It should be noted that the data readers and parsers for CSV, Parquet, and JSON documents are separated from the SQL engine itself, so all of these readers use the same SQL engine.
应该注意的是,JSON 文档中的值可以以各种方式嵌套,例如在对象或数组中。
SQL 引擎以基于行的方式处理 SELECT 语句。
使用 SQL 查询 JSON 文档时,SELECT 语句中的 FROM 子句定义了行边界。
注意:查询 JSON 文档的语义可能会改变,可能与当前描述的方法不同。
TODO:对象和数组值的示例。
一个 JSON 查询示例
{
"firstName": "Joe",
"lastName": "Jackson",
"gender": "male",
"age": "twenty",
"address": {
"streetAddress": "101",
"city": "San Diego",
"state": "CA"
},
"firstName": "Joe_2",
"lastName": "Jackson_2",
"gender": "male",
"age": 21,
"address": {
"streetAddress": "101",
"city": "San Diego",
"state": "CA"
},
"phoneNumbers": [
{ "type": "home1", "number": "734928_1","addr": 11 },
{ "type": "home2", "number": "734928_2","addr": 22 },
{ "type": "home3", "number": "734928_3","addr": 33 },
{ "type": "home4", "number": "734928_4","addr": 44 },
{ "type": "home5", "number": "734928_5","addr": 55 },
{ "type": "home6", "number": "734928_6","addr": 66 },
{ "type": "home7", "number": "734928_7","addr": 77 },
{ "type": "home8", "number": "734928_8","addr": 88 },
{ "type": "home9", "number": "734928_9","addr": 99 },
{ "type": "home10", "number": "734928_10","addr": 100 }
],
"key_after_array": "XXX",
"description" : {
"main_desc" : "value_1",
"second_desc" : "value_2"
}
}
# the from-clause define a single row.
# _1 points to root object level.
# _1.age appears twice in Documnet-row, the last value is used for the operation.
query = "select _1.firstname,_1.key_after_array,_1.age+4,_1.description.main_desc,_1.description.second_desc from s3object[*];";
expected_result = Joe_2,XXX,25,value_1,value_2
# the from-clause points the phonenumbers array (it defines the _1)
# each element in phoneNumbers array define a row.
# in this case each element is an object contains 3 keys/values.
# the query "can not access" values outside phonenumbers array, the query can access only values appears on _1.phonenumbers path.
query = "select cast(substring(_1.number,1,6) as int) *10 from s3object[*].phonenumbers where _1.type='home2';";
expected_result = 7349280
BOTO3
由于 AWS-cli 的支持,使用 BOTO3 是“自然”且简单的。
import pprint
def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
s3 = boto3.client('s3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
region_name=region_name,
aws_secret_access_key=secret_key)
result = ""
try:
r = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"},
OutputSerialization = {"CSV": {}},
Expression=query,
RequestProgress = {"Enabled": progress})
except ClientError as c:
result += str(c)
return result
for event in r['Payload']:
if 'Records' in event:
result = ""
records = event['Records']['Payload'].decode('utf-8')
result += records
if 'Progress' in event:
print("progress")
pprint.pprint(event['Progress'],width=1)
if 'Stats' in event:
print("Stats")
pprint.pprint(event['Stats'],width=1)
if 'End' in event:
print("End")
pprint.pprint(event['End'],width=1)
return result
run_s3select(
"my_bucket",
"my_csv_object",
"select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from s3object where a3>100 and a3<300;")
S3 SELECT 响应
错误响应
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchKey</Code>
<Message>The resource you requested does not exist</Message>
<Resource>/mybucket/myfoto.jpg</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>
报告响应
HTTP/1.1 200
<?xml version="1.0" encoding="UTF-8"?>
<Payload>
<Records>
<Payload>blob</Payload>
</Records>
<Stats>
<Details>
<BytesProcessed>long</BytesProcessed>
<BytesReturned>long</BytesReturned>
<BytesScanned>long</BytesScanned>
</Details>
</Stats>
<Progress>
<Details>
<BytesProcessed>long</BytesProcessed>
<BytesReturned>long</BytesReturned>
<BytesScanned>long</BytesScanned>
</Details>
</Progress>
<Cont>
</Cont>
<End>
</End>
</Payload>
响应描述
对于 CEPH S3 Select,响应可以是以下类型的消息:
记录消息: 可以包含单个记录、部分记录或多个记录。根据结果的大小,响应可以包含一个或多个这些消息。
错误消息: 检测到错误时,RGW 返回 400 Bad Request,并返回一个特定的错误消息给客户端,根据其类型。
连续消息: Ceph S3 定期发送此消息以保持 TCP 连接打开。
进度消息: 如果请求,CEPH S3 定期发送此消息。它包含有关已启动但尚未完成的查询的进度信息。
统计消息: Ceph S3 在请求结束时发送此消息。它包含有关查询的统计数据。
结束消息: 指示请求已完成,不再发送更多消息。你不应该假设请求已完成,直到客户端收到结束消息。
由 Ceph 基金会带给您
Ceph 文档是一个社区资源,由非盈利的 Ceph 基金会资助和托管Ceph Foundation. 如果您想支持这一点和我们的其他工作,请考虑加入现在加入.