Skip to main content

2 posts tagged with "DataFusion"

View All Tags

本文介绍使用datafusion读取parquet文件的相关代码和说明。

Cargo.toml依赖库

[package]
name = "datafusion_read_parquet"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
datafusion = { version = "40.0.0", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] }
tokio = { version = "1.38.1", features = ["rt", "rt-multi-thread"] }

代码说明

use anyhow::Result;
use datafusion::{arrow::array::AsArray, execution::context::SessionContext};

const PQ_FILE: &str = "../assets/sample.parquet";

#[tokio::main]
async fn main() -> Result<()> {
read_with_datafusion(PQ_FILE).await?;
Ok(())
}

async fn read_with_datafusion(file: &str) -> Result<()> {
let ctx = SessionContext::new();
ctx.register_parquet("stats", file, Default::default()).await?;

let ret = ctx
.sql("SELECT name::text name, email::text email FROM stats limit 3")
.await?
.collect()
.await?;

for batch in ret {
let names = batch.column(0).as_string::<i32>();
let emails = batch.column(1).as_string::<i32>();

for (name, email) in names.iter().zip(emails.iter()) {
let (name, email) = (name.unwrap(), email.unwrap());
println!("{}: {}", name, email);
}
}
Ok(())
}
  • 首先使用SessionContext::new()创建一个上下文会话(Session), 将数据转换成表以 及执行表查询都需要这个上下文对象ctx
  • SQL语句SELECT name::text name, email::text email FROM stats limit 3,这里获取 3条数据,每条数据包含nameemail,这些需要为这两个字段加上类型说明,否则执 行会报类型转换的错误"thread 'main' panicked at /path/to/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-52.1.0/src/cast.rs:769:29"
  • 另外需要注意batch.column(column_index)这里是根据列索引取数据,需要跟SQL语句 SELECT取的字段相对应,否则会出现信息对应错误的问题
  • Rust中迭代器以及可以同时迭代多个集合的zip方法

运行方式

# cd arrow-examples/datafusion_read_parquet
cargo run
  • 运行结果 Rust DataFusion库读取parquet文件运行结果

链接

鱼雪

在这篇文章中,我们将探讨如何使用DataFusion在Rust中读取CSV文件并运行SQL查询。DataFusion是一个高性能的开源数据处理引擎,支持SQL查询。

安装依赖

首先,确保在项目中添加以下依赖:

cargo add tokio --features rt-multi-thread
cargo add datafusion

在DataFusion中运行SQL查询

1. 引入库

在Rust代码中引入DataFusion库:

use datafusion::prelude::*;
use datafusion::error::Result;

2. 注册CSV文件为表

使用以下代码将CSV文件注册为表:

let ctx = SessionContext::new();
ctx.register_csv("example", "assets/example.csv", CsvReadOption::new()).await?;

register_csv函数

register_csv函数用于将CSV文件注册为DataFusion中的表。其参数包括:

  • name: &str: 表名
  • table_path: &str: CSV文件路径
  • options: CsvReadOptions: 读取CSV文件的选项

3. 创建SQL查询计划

使用以下代码创建SQL查询计划:

let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

sql函数

sql函数用于执行SQL查询,其参数为待执行的SQL语句。

4. 打印查询结果

使用以下代码打印查询结果:

df.show().await?;

show函数

show函数用于显示查询结果,其返回值为Vec<RecordBatch>

执行结果

以下是执行结果的示例图:

DataFusion执行结果

通过这些步骤,您可以在Rust中使用DataFusion读取CSV文件并运行SQL查询。希望这篇指南能帮助您快速上手DataFusion。

相关链接

鱼雪