读取 CSV 文件当做表,然后以 SQL 方式对表进行操作。
安装依赖
cargo add tokio --features rt-multi-thread
cargo add datafusion
DataFusion 中运行 SQL 查询
- 引入库
use datafusion::prelude::*;
use datafusion::error::Result;
- 注册表
let ctx = SessionContext::new();
ctx.register_csv("example", "assets/example.csv", CsvReadOption::new()).await?;
在 SQL 语句中执行时,将 CSV 文件注册成一个表。
- 源码
pub async fn register_csv(
&self,
name: &str,
table_path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());
self.register_listing_table(
name,
table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;
Ok(())
}
-
register_csv
函数参数:-
name: &str
: 注册成的表名,表名example
在后面查询中使用 -
table_path: &str
: CSV 文件路径 -
options: CsvReadOptions
: 读取 CSV 文件的可选项设置has_header: bool
: 如果对没有标题的文件进行模式推断,将创建默认的列名。delimiter: u8
: 可选的列分隔符。默认为b','
quote: u8
: 可选的引号字符。默认为b'"'
escape: Option<u8>
: 可选的转义字符。默认为None
。comment: Option<u8>
: 如果启用,以这个字节开头的行将被忽略。。schema: Option<&'a Schema>
: 可选的表示 CSV 文件的模式。如果为None
,CSV 读取器将尝试根据文件中的数据推断模式。schema_infer_max_records: usize
: 如果需要进行模式推断,从 CSV 文件中读取的最大行数。默认为DEFAULT_SCHEMA_INFER_MAX_RECORD
file_extension: &'a str
: 文件扩展名;只有具有此扩展名的文件才会被选择用于数据输入。默认为FileType::CSV.get_ext().as_str()
table_partition_cols: Vec<(String, DataType)>
: 部分列file_compression_type: FileCompressionType
: 文件压缩类型file_sort_order: Vec<Vec<Expr>>
: 指示文件的排序方式
-
- 创建一个计划运行 SQL 查询
let df = ctx.sql("SELECT a, MIN(b), FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
self.sql_with_options(sql, SQLOptions::new()).await
}
sql
函数参数:
sql
: 待执行的 SQL 语句
- 打印结果
df.show().await?;
- 源码
pub async fn show(self) -> Result<()> {
let results = self.collect().await?;
Ok(pretty::print_batches(&results)?)
}
-
返回值:
results
的类型为Vec<RecordBatch>