Skip to main content

2 posts tagged with "DataFusion"

View All Tags

本文介绍使用datafusion读取parquet文件的相关代码和说明。

Cargo.toml依赖库

[package]
name = "datafusion_read_parquet"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
datafusion = { version = "40.0.0", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] }
tokio = { version = "1.38.1", features = ["rt", "rt-multi-thread"] }

代码说明

use anyhow::Result;
use datafusion::{arrow::array::AsArray, execution::context::SessionContext};

const PQ_FILE: &str = "../assets/sample.parquet";

#[tokio::main]
async fn main() -> Result<()> {
read_with_datafusion(PQ_FILE).await?;
Ok(())
}

async fn read_with_datafusion(file: &str) -> Result<()> {
let ctx = SessionContext::new();
ctx.register_parquet("stats", file, Default::default()).await?;

let ret = ctx
.sql("SELECT name::text name, email::text email FROM stats limit 3")
.await?
.collect()
.await?;

for batch in ret {
let names = batch.column(0).as_string::<i32>();
let emails = batch.column(1).as_string::<i32>();

for (name, email) in names.iter().zip(emails.iter()) {
let (name, email) = (name.unwrap(), email.unwrap());
println!("{}: {}", name, email);
}
}
Ok(())
}
  • 首先使用SessionContext::new()创建一个上下文会话(Session), 将数据转换成表以 及执行表查询都需要这个上下文对象ctx
  • SQL语句SELECT name::text name, email::text email FROM stats limit 3,这里获取 3条数据,每条数据包含nameemail,这些需要为这两个字段加上类型说明,否则执 行会报类型转换的错误"thread 'main' panicked at /path/to/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-52.1.0/src/cast.rs:769:29"
  • 另外需要注意batch.column(column_index)这里是根据列索引取数据,需要跟SQL语句 SELECT取的字段相对应,否则会出现信息对应错误的问题
  • Rust中迭代器以及可以同时迭代多个集合的zip方法

运行方式

# cd arrow-examples/datafusion_read_parquet
cargo run
  • 运行结果 Rust DataFusion库读取parquet文件运行结果

链接

鱼雪

读取 CSV 文件当做,然后以 SQL 方式对进行操作。

安装依赖

cargo add tokio --features rt-multi-thread
cargo add datafusion

DataFusion 中运行 SQL 查询

  1. 引入库
use datafusion::prelude::*;
use datafusion::error::Result;
  1. 注册表
let ctx = SessionContext::new();
ctx.register_csv("example", "assets/example.csv", CsvReadOption::new()).await?;

在 SQL 语句中执行时,将 CSV 文件注册成一个表。

  • 源码
pub async fn register_csv(
&self,
name: &str,
table_path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());

self.register_listing_table(
name,
table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;

Ok(())
}
  • register_csv函数参数:

    • name: &str: 注册成的表名,表名example在后面查询中使用

    • table_path: &str: CSV 文件路径

    • options: CsvReadOptions: 读取 CSV 文件的可选项设置

      • has_header: bool: 如果对没有标题的文件进行模式推断,将创建默认的列名。
      • delimiter: u8: 可选的列分隔符。默认为 b','
      • quote: u8: 可选的引号字符。默认为 b'"'
      • escape: Option<u8>: 可选的转义字符。默认为 None
      • comment: Option<u8>: 如果启用,以这个字节开头的行将被忽略。。
      • schema: Option<&'a Schema>: 可选的表示 CSV 文件的模式。如果为 None,CSV 读取器将尝试根据文件中的数据推断模式。
      • schema_infer_max_records: usize: 如果需要进行模式推断,从 CSV 文件中读取的最大行数。默认为 DEFAULT_SCHEMA_INFER_MAX_RECORD
      • file_extension: &'a str: 文件扩展名;只有具有此扩展名的文件才会被选择用于数据输入。默认为 FileType::CSV.get_ext().as_str()
      • table_partition_cols: Vec<(String, DataType)>: 部分列
      • file_compression_type: FileCompressionType: 文件压缩类型
      • file_sort_order: Vec<Vec<Expr>>: 指示文件的排序方式
  1. 创建一个计划运行 SQL 查询
let df = ctx.sql("SELECT a, MIN(b), FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
self.sql_with_options(sql, SQLOptions::new()).await
}

sql函数参数:

  • sql: 待执行的 SQL 语句
  1. 打印结果
df.show().await?;
  • 源码
pub async fn show(self) -> Result<()> {
let results = self.collect().await?;
Ok(pretty::print_batches(&results)?)
}
  • 返回值:

    • results的类型为Vec<RecordBatch>

执行结果

DataFusion

鱼雪