跳到主要内容

针对存储在 CSV 中的数据运行 SQL 查询

鱼雪

读取 CSV 文件当做,然后以 SQL 方式对进行操作。

安装依赖

cargo add tokio --features rt-multi-thread
cargo add datafusion

DataFusion 中运行 SQL 查询

  1. 引入库
use datafusion::prelude::*;
use datafusion::error::Result;
  1. 注册表
let ctx = SessionContext::new();
ctx.register_csv("example", "assets/example.csv", CsvReadOption::new()).await?;

在 SQL 语句中执行时,将 CSV 文件注册成一个表。

  • 源码
pub async fn register_csv(
&self,
name: &str,
table_path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());

self.register_listing_table(
name,
table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;

Ok(())
}
  • register_csv函数参数:

    • name: &str: 注册成的表名,表名example在后面查询中使用

    • table_path: &str: CSV 文件路径

    • options: CsvReadOptions: 读取 CSV 文件的可选项设置

      • has_header: bool: 如果对没有标题的文件进行模式推断,将创建默认的列名。
      • delimiter: u8: 可选的列分隔符。默认为 b','
      • quote: u8: 可选的引号字符。默认为 b'"'
      • escape: Option<u8>: 可选的转义字符。默认为 None
      • comment: Option<u8>: 如果启用,以这个字节开头的行将被忽略。。
      • schema: Option<&'a Schema>: 可选的表示 CSV 文件的模式。如果为 None,CSV 读取器将尝试根据文件中的数据推断模式。
      • schema_infer_max_records: usize: 如果需要进行模式推断,从 CSV 文件中读取的最大行数。默认为 DEFAULT_SCHEMA_INFER_MAX_RECORD
      • file_extension: &'a str: 文件扩展名;只有具有此扩展名的文件才会被选择用于数据输入。默认为 FileType::CSV.get_ext().as_str()
      • table_partition_cols: Vec<(String, DataType)>: 部分列
      • file_compression_type: FileCompressionType: 文件压缩类型
      • file_sort_order: Vec<Vec<Expr>>: 指示文件的排序方式
  1. 创建一个计划运行 SQL 查询
let df = ctx.sql("SELECT a, MIN(b), FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
self.sql_with_options(sql, SQLOptions::new()).await
}

sql函数参数:

  • sql: 待执行的 SQL 语句
  1. 打印结果
df.show().await?;
  • 源码
pub async fn show(self) -> Result<()> {
let results = self.collect().await?;
Ok(pretty::print_batches(&results)?)
}
  • 返回值:

    • results的类型为Vec<RecordBatch>

执行结果

DataFusion