Skip to main content

78 posts tagged with "Rust"

View All Tags

掌握 Cargo.toml 的格式规则,避免挫败感

在 JavaScript 和其他语言中,我们称令人惊讶或不一致的行为为“Wat!”(即“什么!?”)。 例如,在 JavaScript 中,空数组加空数组会产生一个空字符串,[] + [] === ""。Wat!

在另一个极端,某种语言有时会表现出令人惊讶的一致性。我称之为“Wat Not”。

Rust 通常比 JavaScript 更加一致。然而,一些与 Rust 相关的格式会带来惊喜。 具体来说,本文将介绍 Cargo.toml 中的九个 wats 和 wat nots。

回想一下,Cargo.toml 是定义 Rust 项目配置和依赖项的清单文件。 其格式 TOML(Tom's Obvious, Minimal Language)表示嵌套的键/值对和/或数组。 JSON 和 YAML 是类似的格式。与 JSON 不同的是,TOML 被设计为易于人类阅读和编写。

这九个 wats 和 wat nots 的旅程不会像 JavaScript 的怪癖那样有趣(谢天谢地)。 然而,如果你曾经对 Cargo.toml 的格式感到困惑,我希望本文能让你感觉更好。 最重要的是,当你了解了这九个 wats 和 wat nots 后,希望你能更轻松有效地编写 Cargo.toml

本文不是关于“修复” Cargo.toml。该文件格式在其主要用途上非常出色:指定 Rust 项目的配置和依赖项。 相反,本文旨在理解其格式及其怪癖。

Wat 1:依赖项 vs. 配置文件部分名称

你可能知道如何在 Cargo.toml 中添加 [dependencies] 部分。这样的部分指定了发布依赖项,例如:

[dependencies]
serde = "1.0"

同样,你可以使用 [dev-dependencies] 部分指定开发依赖项,使用 [build-dependencies] 部分指定构建依赖项

你可能还需要设置编译器选项,例如优化级别和是否包含调试信息。你可以通过发布开发构建的配置文件部分来设置这些选项。

你能猜出这三个部分的名称吗?是 [profile][dev-profile][build-profile] 吗?

不!它们是 [profile.release][profile.dev][profile.build]。Wat!

[dev-profile] 会比 [profile.dev] 更好吗?[dependencies.dev] 会比 [dev-dependencies] 更好吗?

我个人更喜欢带点的名称。(在“Wat Not 9”中,我们将看到点的强大之处。)然而,我愿意记住依赖项和配置文件的工作方式不同。

Wat 2:依赖项继承

你可能会认为点适用于配置文件,而连字符更适用于依赖项,因为 [dev-dependencies] 继承自 [dependencies]。换句话说,[dependencies] 中的依赖项在 [dev-dependencies] 中也可用。那么,这是否意味着 [build-dependencies] 也继承自 [dependencies]

不![build-dependencies] 不继承自 [dependencies]。Wat!

我发现这种 Cargo.toml 的行为既方便又令人困惑。

Wat 3:默认键

你可能知道,可以这样写:

[dependencies]
serde = { version = "1.0" }

也可以这样写:

[dependencies]
serde = "1.0"

这里的原则是什么?一般的 TOML 中如何指定一个键为默认键?

你不能!一般的 TOML 没有默认键。Wat!

Cargo TOML 对 [dependencies] 部分中的 version 键进行了特殊处理。这是 Cargo 特有的功能,而不是一般的 TOML 功能。据我所知,Cargo TOML 没有其他默认键。

Wat 4:子功能

使用 Cargo.toml[features],你可以创建依赖项不同的项目版本。这些依赖项本身的功能也可能不同,我们称之为子功能

在这里,我们创建了两个项目版本。默认版本依赖于带有默认功能的 getrandomwasm 版本依赖于带有 js 子功能的 getrandom

[features]
default = []
wasm = ["getrandom-js"]

[dependencies]
rand = { version = "0.8" }
getrandom = { version = "0.2", optional = true }

[dependencies.getrandom-js]
package = "getrandom"
version = "0.2"
optional = true
features = ["js"]

在这个例子中,wasm 是我们项目的一个功能,依赖于依赖项别名 getrandom-rs,它代表带有 js 子功能的 getrandom crate 版本。

那么,如何在避免冗长的 [dependencies.getrandom-js] 部分的情况下给出相同的规范?

[features] 中,将 getrandom-js 替换为 "getrandom/js"。我们可以这样写:

[features]
default = []
wasm = ["getrandom/js"]

[dependencies]
rand = { version = "0.8" }
getrandom = { version = "0.2", optional = true }

一般来说,在 Cargo.toml 中,功能规范(如 wasm = ["getrandom/js"])可以列出:

  • 其他功能
  • 依赖项别名
  • 依赖项
  • 一个或多个依赖项“斜杠”一个子功能

这不是标准的 TOML,而是 Cargo.toml 特有的简写。

附加:你如何用简写表示你的 wasm 功能应包括带有两个子功能的 getrandom:js 和 test-in-browser

答案:列出依赖项两次。

wasm = ["getrandom/js","getrandom/test-in-browser"]

Wat 5:目标的依赖项

我们已经看到如何指定发布调试构建的依赖项。

[dependencies]
#...
[dev-dependencies]
#...
[build-dependencies]
#...

我们已经看到如何指定各种功能的依赖项:

[features]
default = []
wasm = ["getrandom/js"]

你会怎么猜测我们如何为各种目标(例如某个版本的 Linux、Windows 等)指定依赖项?

我们在 [dependencies] 前加上 target.TARGET_EXPRESSION 前缀,例如:

[target.x86_64-pc-windows-msvc.dependencies]
winapi = { version = "0.3.9", features = ["winuser"] }

按照一般 TOML 的规则,我们也可以这样说:

[target]
x86_64-pc-windows-msvc.dependencies={winapi = { version = "0.3.9", features = ["winuser"] }}

我觉得这种前缀语法很奇怪,但我无法提出更好的替代方案。不过,我确实想知道为什么功能不能以相同的方式处理:

# 不允许
[feature.wasm.dependencies]
getrandom = { version = "0.2", features=["js"]}

Wat Not 6:目标 cfg 表达式

这是我们的第一个“Wat Not”,即它是一种让我感到惊讶的一致性。

除了具体目标(如 x86_64-pc-windows-msvc),你还可以在单引号中使用 cfg 表达式。例如:

[target.'cfg(all(windows, target_arch = "x86_64"))'.dependencies]

我不认为这是一个“wat!”。我认为这很棒。

回想一下,cfg 是“配置”的缩写,是 Rust 通常用于条件编译代码的机制。例如,在我们的 main.rs 中,我们可以这样说:

if cfg!(target_os = "linux") {
println!("This is Linux!");
}

Cargo.toml 中,在目标表达式中,几乎支持整个 cfg 迷你语言。

  • all()any()not()
  • target_arch
  • target_feature
  • target_os
  • target_family
  • target_env
  • target_abi
  • target_endian
  • target_pointer_width
  • target_vendor
  • target_has_atomic
  • unix
  • windows

cfg 迷你语言唯一不支持的部分(我认为)是你不能使用 --cfg 命令行参数设置值。此外,一些 cfg 值(如 test)没有意义。

Wat 7:目标的配置文件

回想一下 Wat 1 中,你可以通过 [profile.release][profile.dev][profile.build] 设置编译器选项。例如:

[profile.dev]
opt-level = 0

你如何为特定目标(如 Windows)设置编译器选项?是这样吗?

[target.'cfg(windows)'.profile.dev]
opt-level = 0

不。相反,你需要创建一个名为 .cargo/config.toml 的新文件,并添加以下内容:

[target.'cfg(windows)']
rustflags = ["-C", "opt-level=0"]

Wat!

一般来说,Cargo.toml 只支持 target.TARGET_EXPRESSION 作为依赖项部分的前缀。你不能为配置文件部分加前缀。然而,在 .cargo/config.toml 中,你可以有 [target.TARGET_EXPRESSION] 部分。在这些部分中,你可以设置环境变量来设置编译器选项。

Wat Not 8:TOML 列表

Cargo.toml 支持两种列表语法:

  • 内联数组
  • 表数组

这个例子使用了两者:

[package]
name = "cargo-wat"
version = "0.1.0"
edition = "2021"

[dependencies]
rand = { version = "0.8" }
# 内联数组 'features'
getrandom = { version = "0.2", features = ["std", "test-in-browser"] }

# 表数组 'bin'
[[bin]]
name = "example"
path = "src/bin/example.rs"

[[bin]]
name = "another"
path = "src/bin/another.rs"

我们可以将表数组更改为内联数组吗?可以!

# 内联数组 'bin'
bins = [
{ name = "example", path = "src/bin/example.rs" },
{ name = "another", path = "src/bin/another.rs" },
]

[package]
name = "cargo-wat"
version = "0.1.0"
edition = "2021"

[dependencies]
rand = { version = "0.8" }
# 内联数组 'features'
getrandom = { version = "0.2", features = ["std", "test-in-browser"] }

我们可以将功能的内联数组更改为表数组吗?

不可以。简单值(此处为字符串)的内联数组不能表示为表数组。然而,我认为这是一个“wat not”,而不是“wat!”,因为这是一般 TOML 的限制,而不仅仅是 Cargo.toml 的限制。

附带说明:YAML 格式与 TOML 格式一样,提供两种列表语法。然而,YAML 的两种语法都适用于简单值。

Wat Not 9:TOML 内联、部分和点

这是一个典型的 Cargo.toml。它混合了部分语法(如 [dependencies])和内联语法(如 getrandom = {version = "0.2", features = ["std", "test-in-browser"]})。

[package]
name = "cargo-wat"
version = "0.1.0"
edition = "2021"

[dependencies]
rand = "0.8"
getrandom = { version = "0.2", features = ["std", "test-in-browser"] }

[target.x86_64-pc-windows-msvc.dependencies]
winapi = { version = "0.3.9", features = ["winuser"] }

[[bin]]
name = "example"
path = "src/bin/example.rs"

[[bin]]
name = "another"
path = "src/bin/another.rs"

我们可以将其完全重写为 100% 内联吗?可以。

package = { name = "cargo-wat", version = "0.1.0", edition = "2021" }

dependencies = { rand = "0.8", getrandom = { version = "0.2", features = [
"std",
"test-in-browser",
] } }

target = { 'cfg(target_os = "windows")'.dependencies = { winapi = { version = "0.3.9", features = [
"winuser",
] } } }

bins = [
{ name = "example", path = "src/bin/example.rs" },
{ name = "another", path = "src/bin/another.rs" },
]

我们也可以将其重写为最大部分:

[package]
name = "cargo-wat"
version = "0.1.0"
edition = "2021"

[dependencies.rand]
version = "0.8"

[dependencies.getrandom]
version = "0.2"
features = ["std", "test-in-browser"]

[target.x86_64-pc-windows-msvc.dependencies.winapi]
version = "0.3.9"
features = ["winuser"]

[[bin]]
name = "example"
path = "src/bin/example.rs"

[[bin]]
name = "another"
path = "src/bin/another.rs"

最后,让我们谈谈点。

在 TOML 中,点用于分隔嵌套表中的键。例如,a.b.c 是表 a 中表 b 中的键 c。我们可以用“很多点”重写我们的例子吗?可以:

package.name = "cargo-wat"
package.version = "0.1.0"
package.edition = "2021"
dependencies.rand = "0.8"
dependencies.getrandom.version = "0.2"
dependencies.getrandom.features = ["std", "test-in-browser"]
target.x86_64-pc-windows-msvc.dependencies.winapi.version = "0.3.9"
target.x86_64-pc-windows-msvc.dependencies.winapi.features = ["winuser"]
bins = [
{ name = "example", path = "src/bin/example.rs" },
{ name = "another", path = "src/bin/another.rs" },
]

我欣赏 TOML 在部分、内联和点方面的灵活性。我认为这种灵活性是一个“wat not”。你可能会发现所有这些选择令人困惑。然而,我喜欢 Cargo.toml 让我们使用 TOML 的全部功能。

结论

Cargo.toml 是 Rust 生态系统中的一个重要工具,提供了简单性和灵活性的平衡,既适合初学者也适合经验丰富的开发人员。通过我们探讨的九个 wats 和 wat nots,我们看到了这个配置文件有时会因其特性而令人惊讶,但同时也因其一致性和强大而令人印象深刻。

理解这些怪癖可以让你避免潜在的挫败感,并使你能够充分利用 Cargo.toml。从管理依赖项和配置文件到处理特定目标的配置和功能,这些见解将帮助你编写更高效和有效的 Cargo.toml 文件。

总之,虽然 Cargo.toml 可能有其独特之处,但这些特性往往源于实用的设计选择,优先考虑功能性和可读性。

接受这些怪癖,你会发现 Cargo.toml 不仅能满足你的项目需求,还能提升你的 Rust 开发体验

鱼雪

本文将介绍如何使用 Axum 框架实现 JWT 授权,包括从生成密钥对、创建和验证 Token 到在 Axum 中实现授权中间件,全面讲解构建安全 API 的流程。

Axum 是一个基于 Hyper 的 Rust Web 框架,它提供了高效、灵活的方式来构建现代 Web 应用。在大多数实际应用中,我们需要对 API 端点进行授权,以确保只有授权用户才能访问受保护的资源。**JWT(JSON Web Token)**是一种流行的授权机制,非常适合用于这种场景,能够实现无状态、跨平台的身份验证。

本文大体分为三个部分:

  1. 生成一个新的 Ed25519 公私钥对
  2. 生成和验证 JWT Token
  3. 在 Axum 中集成授权中间件

1. 生成 Ed25519 的公私钥

在我们的授权系统中,我们使用 Ed25519 算法来生成公私钥对。这是一种现代的、安全的数字签名算法,提供了高安全性和高性能,非常适合用在 JWT 授权中。

生成公私钥的代码示例

use anyhow::Result;
use jwt_simple::prelude::*;
use std::fs::File;

fn main() -> Result<()> {
generate_and_save_keys()?;
Ok(())
}

fn generate_and_save_keys() -> Result<()> {
let key_pair = Ed25519KeyPair::generate();

// 保存私钥
let private_key_pem = key_pair.to_pem();
let mut private_key_file = File::create("private_key.pem")?;
private_key_file.write_all(private_key_pem.as_bytes())?;

// 保存公钥
let public_key_pem = key_pair.public_key().to_pem();
let mut public_key_file = File::create("public_key.pem")?;
public_key_file.write_all(public_key_pem.as_bytes())?;

Ok(())
}

代码说明:

  • 使用 Ed25519KeyPair::generate() 生成密钥对,并分别保存公钥和私钥。
  • 私钥用于生成 Token,必须严格保密,而公钥用于验证 Token,可以公开发布。
  • PEM 格式是一种常用的密钥存储格式,易于管理。
tip

将公钥和私钥分开存储可以保证系统安全性。私钥通常只在生成 Token 时使用,而公钥则用于对 Token 的验证,可以公开提供给需要验证身份的服务。

2. 生成 JWT Token 和验证 Token

在这一步,我们会学习如何生成和验证 JWT Token,帮助我们在 Web 应用中实现授权控制。

生成 Token 的实现

fn sign(user: impl Into<User>) -> Result<String> {
let private_key_pem = read_to_string("private_key.pem")?;
let key_pair = Ed25519KeyPair::from_pem(&private_key_pem)?;

let user = user.into();
let claims = Claims::with_custom_claims(user, Duration::from_secs(JWT_DURATION));
let claims = claims.with_issuer(JWT_ISS).with_audience(JWT_AUD);

let token = key_pair.sign(claims)?;
Ok(token)
}

代码说明:

  1. 从文件中读取私钥,并创建 Ed25519KeyPair
  2. 创建一个包含用户信息的 claims 对象,并设置 Token 的有效期、发行者和受众。
  3. 使用私钥对 claims 进行签名,生成 Token。

验证 Token 的实现

fn verify(token: &str) -> Result<User, Box<dyn std::error::Error>> {
let public_key_pem = read_to_string("public_key.pem")?;
let public_key = Ed25519PublicKey::from_pem(&public_key_pem)?;

let options = VerificationOptions {
allowed_issuers: Some(HashSet::from_strings(&[JWT_ISS])),
allowed_audiences: Some(HashSet::from_strings(&[JWT_AUD])),
..Default::default()
};

let claims = public_key.verify_token::<User>(token, Some(options))?;
Ok(claims.custom)
}

验证步骤:

  • 使用公钥验证 Token 的签名,并检查是否符合预期的发行者受众
  • 通过验证后,返回自定义的 User 数据。

要点:

  • 生成 Token 时使用私钥,验证 Token 时使用公钥。
  • 设置有效期发行者受众,以提高安全性。

3. 在 Axum 中集成 JWT 授权中间件

为了确保只有经过身份验证的用户才能访问受保护的 API,我们需要将 JWT 的生成和验证集成到 Axum 框架中。

定义 AuthUser 结构体并实现 FromRequestParts Trait

struct AuthUser(User);

#[async_trait]
impl<S> FromRequestParts<S> for AuthUser
where
S: Send + Sync,
{
type Rejection = StatusCode;

async fn from_request_parts<'life0, 'life1>(
parts: &'life0 mut Parts,
_state: &'life1 S,
) -> Result<Self, Self::Rejection>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
let token = parts
.headers
.get("Authorization")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.strip_prefix("Bearer "))
.ok_or(StatusCode::UNAUTHORIZED)?;
let user = verify(token).map_err(|_| StatusCode::UNAUTHORIZED)?;
Ok(AuthUser(user))
}
}

该实现可以让我们方便地从请求中提取出认证信息。

创建授权中间件

async fn auth_middleware(
AuthUser(user): AuthUser,
req: Request<Body>,
next: Next,
) -> impl IntoResponse {
info!("Authenticated user: {}", user.username);
next.run(req).await
}

这个中间件会记录用户信息,然后继续处理请求,确保只有认证过的用户可以访问受保护的路由。

在 Axum 中使用中间件

let app = Router::new()
.route("/login", post(get_token))
.route("/protected", get(protected_route).layer(from_fn(auth_middleware)));

我们定义了两个路由:

  • /login:用于登录并获取 Token,不需要认证。
  • /protected:受保护的路由,需要通过授权中间件的认证。

登录并获取 Token 的实现

async fn get_token(
Json(payload): Json<TokenRequest>
) -> Result<Json<TokenResponse>, StatusCode> {
let user = User {
username: payload.username,
created_at: Utc::now(),
scope: vec!["read".to_string(), "write".to_string()],
};
let token = sign(user).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(TokenResponse { token }))
}

该函数用于处理登录请求,生成并返回 JWT Token。

受保护路由的实现

async fn protected_route(AuthUser(user): AuthUser) -> impl IntoResponse {
format!("Hello, {}! Your scopes are: {:?}", user.username, user.scope)
}

该路由只有在用户通过认证后才能访问,返回用户的相关信息。

要点:

  • 使用 FromRequestParts Trait 从请求中提取认证信息,便于集成 JWT 认证。
  • 中间件统一处理认证逻辑,提高了代码的模块化和可维护性。

测试 JWT 授权

使用 REST 客户端(例如 VSCode REST Client 插件)可以测试整个授权流程:

  1. 获取 Token:首先发送 POST 请求到 /login
  2. 访问受保护的路由:使用获得的 Token,设置 Authorization 请求头访问 /protected
### Get Token
POST http://localhost:3000/login
Content-Type: application/json

{
"username": "admin"
}

@token = {{signin.response.body.token}}

### Auth user
GET http://localhost:3000/protected
Authorization: Bearer {{token}}
note

在这里,我们使用了 REST Client 插件来进行测试,方便地将获取的 Token 用于后续请求,避免手动复制粘贴。

完整代码示例

Cargo.toml 依赖库

[package]
name = "axum-jwt-auth"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
axum = "0.7.5"
chrono = { version = "0.4.38", features = ["serde"] }
jwt-simple = "0.12.9"
serde = { version = "1.0.204", features = ["derive"] }
tokio = { version = "1.39.1", features = ["net", "rt", "rt-multi-thread"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

主要 Rust 代码示例

use anyhow::Result;
use chrono::{DateTime, Utc};
use jwt_simple::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;

use tracing::{info, level_filters::LevelFilter};
use tracing_subscriber::{
fmt::Layer, layer::SubscriberExt,
util::SubscriberInitExt,
Layer as _
};

use std::{
collections::HashSet,
fs::{read_to_string, File},
io::Write,
net::SocketAddr,
};

use axum::{
async_trait, body::Body,
extract::FromRequestParts,
http::{request::Parts, Request,StatusCode},
middleware::{from_fn, Next},
response::IntoResponse,
routing::{get, post},
Json, Router
};

// 省略部分代码,可参考上文详细内容

结论

通过这篇文章,我们了解了如何使用 Axum 框架在 Rust 中实现基于 JWT 的授权系统。具体内容包括:

  • 使用 Ed25519 算法生成密钥对,确保私钥的安全性。
  • 生成和验证 JWT Token,以实现无状态的授权。
  • 将 Token 签发和验证集成到 Axum 中,确保对 API 端点进行保护。

这种方法为我们提供了一个灵活、安全的 API 保护机制。在实际应用中,你还可以引入更多的安全措施,如 Token 刷新权限管理密钥轮换等,以进一步提升安全性。

参考链接

鱼雪

本文介绍使用datafusion读取parquet文件的相关代码和说明。

Cargo.toml依赖库

[package]
name = "datafusion_read_parquet"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
datafusion = { version = "40.0.0", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] }
tokio = { version = "1.38.1", features = ["rt", "rt-multi-thread"] }

代码说明

use anyhow::Result;
use datafusion::{arrow::array::AsArray, execution::context::SessionContext};

const PQ_FILE: &str = "../assets/sample.parquet";

#[tokio::main]
async fn main() -> Result<()> {
read_with_datafusion(PQ_FILE).await?;
Ok(())
}

async fn read_with_datafusion(file: &str) -> Result<()> {
let ctx = SessionContext::new();
ctx.register_parquet("stats", file, Default::default()).await?;

let ret = ctx
.sql("SELECT name::text name, email::text email FROM stats limit 3")
.await?
.collect()
.await?;

for batch in ret {
let names = batch.column(0).as_string::<i32>();
let emails = batch.column(1).as_string::<i32>();

for (name, email) in names.iter().zip(emails.iter()) {
let (name, email) = (name.unwrap(), email.unwrap());
println!("{}: {}", name, email);
}
}
Ok(())
}
  • 首先使用SessionContext::new()创建一个上下文会话(Session), 将数据转换成表以 及执行表查询都需要这个上下文对象ctx
  • SQL语句SELECT name::text name, email::text email FROM stats limit 3,这里获取 3条数据,每条数据包含nameemail,这些需要为这两个字段加上类型说明,否则执 行会报类型转换的错误"thread 'main' panicked at /path/to/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-52.1.0/src/cast.rs:769:29"
  • 另外需要注意batch.column(column_index)这里是根据列索引取数据,需要跟SQL语句 SELECT取的字段相对应,否则会出现信息对应错误的问题
  • Rust中迭代器以及可以同时迭代多个集合的zip方法

运行方式

# cd arrow-examples/datafusion_read_parquet
cargo run
  • 运行结果 Rust DataFusion库读取parquet文件运行结果

链接

鱼雪

Cargo.toml依赖库

[package]
name = "polars_read_parquet"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
polars = { version = "0.41.3", features = ["parquet", "timezones", "sql", "lazy"] }

其中:

  • parquet features: 是用来加载parquet文件的特性
  • timizones features: 是用来处理数据中的时区
  • sql features: 是以SQL的方式操作数据
  • lazy features: 是加载数据是执行懒加载,不是从一开始就把数据都加载到内存,而 是需要数据做运算或者查询时才会加载数据,以LazyFrame替换DataFrame

代码

use anyhow::Result;
use polars::{prelude::*, sql::SQLContext};

const PQ_FILE: &str = "../assets/sample.parquet";

fn main() -> Result<()> {
read_with_polars(PQ_FILE)?;
Ok(())
}

fn read_with_polars(file: &str) -> Result<()> {
let df = LazyFrame::scan_parquet(file, Default::default())?;
let mut ctx = SQLContext::new();
ctx.register("stats", df);
let df = ctx
.execute("SELECT name::text name, email::text email FROM stats")?
.collect()?;
println!("{:?}", df);

Ok(())
}
  • 首先是使用lazy方式扫描加载parquet文件,返回一个LazyFrame的对象
  • 使用SQLContext创建可以使用SQL操作DataFrame数据的上下文对象
  • ctx.register()方法可以将读取到的DataFrame与执行名称的"表名"关联起来,可以 理解为读取到DataFrame数据取一个名称,表格数据也可以直接当做表来操作,在后文中 的SQL语句中使用这个名字
  • 使用ctx执行SQL查询语句

运行方式

# cd arrow-examples/polars_read_parquet
cargo run
  • 运行结果如下 Polars读取parquet文件

链接

鱼雪

本文介绍Arrow生态持久化存储文件的读取方式之一,使用Rust parquet库读取parquet文件。

Cargo.toml中依赖库如下

[package]
name = "parquet_rs_read_parquet"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
arrow = { version = "52.1.0", features = ["prettyprint"] }
parquet = "52.1.0"

代码如下

use anyhow::Result;
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;

const PQ_FILE: &str = "../assets/sample.parquet";

fn main() -> Result<()> {
read_with_parquet(PQ_FILE)?;
Ok(())
}

fn read_with_parquet(file: &str) -> Result<()> {
let file = File::open(file)?;
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
.with_batch_size(8192)
.with_limit(3)
.build()?;

for record_batch in reader {
let record_batch = record_batch?;
let emails = record_batch.column(0).as_binary::<i32>();

for email in emails {
let email = email.unwrap();
println!("{:?}", String::from_utf8_lossy(email));
}
}
Ok(())
}
  • PQ_FILE: 表示parquet文件的路径,当前我们使用的相对目录,这个目录是运行时的相 对目录,即cargo run命令运行时的目录即为当前目录
  • record_batch.column(0).as_binary::<i32>(): 这里需要用到 arrow::array::AsArray Trait
  • ParquetRecordBatchReaderBuilder: build()函数返回一个ParquetRecordBatchReader
  • String::from_utf8_lossy(): 将二进制转换为字符串

运行

# `cd parquet_rs_read_parquet`, otherwise, the `sample.parquet` file cannot be found.
cargo run
  • 执行结果

Rust parquet库读取parquet文件运行结果

链接

鱼雪

在这篇文章中,我们将探讨如何使用DataFusion在Rust中读取CSV文件并运行SQL查询。DataFusion是一个高性能的开源数据处理引擎,支持SQL查询。

安装依赖

首先,确保在项目中添加以下依赖:

cargo add tokio --features rt-multi-thread
cargo add datafusion

在DataFusion中运行SQL查询

1. 引入库

在Rust代码中引入DataFusion库:

use datafusion::prelude::*;
use datafusion::error::Result;

2. 注册CSV文件为表

使用以下代码将CSV文件注册为表:

let ctx = SessionContext::new();
ctx.register_csv("example", "assets/example.csv", CsvReadOption::new()).await?;

register_csv函数

register_csv函数用于将CSV文件注册为DataFusion中的表。其参数包括:

  • name: &str: 表名
  • table_path: &str: CSV文件路径
  • options: CsvReadOptions: 读取CSV文件的选项

3. 创建SQL查询计划

使用以下代码创建SQL查询计划:

let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

sql函数

sql函数用于执行SQL查询,其参数为待执行的SQL语句。

4. 打印查询结果

使用以下代码打印查询结果:

df.show().await?;

show函数

show函数用于显示查询结果,其返回值为Vec<RecordBatch>

执行结果

以下是执行结果的示例图:

DataFusion执行结果

通过这些步骤,您可以在Rust中使用DataFusion读取CSV文件并运行SQL查询。希望这篇指南能帮助您快速上手DataFusion。

相关链接

鱼雪

LanceDB是一种使用持久存储构建的用于矢量搜索的开源数据库,极大简化了检索、过滤和嵌入管理。 LanceDB的主要功能包括:

  1. 生产规模的矢量搜索,无需管理服务器。
  2. 存储、查询和过滤矢量、元数据和多模态数据(文本、图像、视频、点云等)。
  3. 支持矢量相似性搜索、全文搜索和SQL。
  4. 本地支持Python和Javascript/Typescript。
  5. 零拷贝、自动版本管理,管理数据版本而无需额外基础设施。
  6. GPU支持构建矢量索引(*)。
  7. 生态系统集成
  • LangChain🦜️🔗
  • LlamaIndex🦙
  • Apache-Arrow
  • Pandas
  • Polars
  • DuckDB

关于LanceDB中Table介绍

Table方法

  • name(): 获取表的名称
  • schema(): 获取表的模式
  • count_rows(): 获取表中的行数
  • add(): 添加记录添加到表中,但传入的参数是需要实现IntoArrow的类型
  • query(): 查询表中的记录
  • update(): 更新表中的记录
  • delete(): 删除表中的记录
  • create_index(): 创建索引
  • merge_insert(): 合并插入
  • vector_search(): 矢量搜索
  • optimize(): 优化表
  • add_columns(): 添加(多)列
  • alter_columns(): 修改(多)列
  • drop_columns(): 删除(多)列
  • version(): 获取表的版本,由于LanceDB使用版本控制变化
  • checkpoint(): 根据指定版本获取检查点
  • checkpoint_latest(): 获取最新检查点
  • restore(): 恢复到指定版本
  • list_indices(): 列出表的索引

Schema模式定义Table定义方式

LanceDB Schema关系图

LanceDB Table依赖关系图

创建空表

1 完整代码

use arrow_schema::{DataType, Field, Schema};
use lancedb::{connect, Result};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let created_empty_talbe = create_empty_table().await?;
println!(
"Created empty table: {}, Table name: {}",
created_empty_talbe,
created_empty_talbe.name()
);
Ok(())
}

#[allow(unused)]
async fn create_empty_table() -> Result<LanceDbTable> {
// 创建模式定义
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
// 创建数据库URI目录
let uri = "data/sample-lancedb";
// 连接数据库
let db = connect(uri).execute().await?;
// 创建一个空表
let table = db
.create_empty_table("empty_talbe", schema)
.execute()
.await?;
Ok(table)
}
  1. 包依赖文件 Cargo.toml文件内容如下:
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
arrow-schema = "51.0"
  1. 运行结果如下: LanceDB创建空表

创建带初始化数据的表

  1. 完整代码如下:
use arrow_schema::{DataType, Field, Schema};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use lancedb::{connect, Result, Table as LanceDbTable};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let created_table_with_data = create_table_with_data().await?;
println!(
"Created table with data: {}, Table name: {}",
created_table_with_data,
created_table_with_data.name()
);
Ok(())
}

#[allow(unused)]
async fn create_table_with_data() -> Result<LanceDbTable> {
// 创建本地数据库URI目录
let uri = "data/sample-lancedb";
// 连接数据库
let db = connect(uri).execute().await?;

// 创建模式定义
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

// 初始化`ids`列的数据
let ids = Int32Array::from(vec![1, 2, 3]);
// 初始化`name`列的数据
let names = StringArray::from(vec!["Alice", "Bob", "Lily"]);
// 使用`Schema`以及列数据创建`RecordBatch`
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(names)])?;
// 使用`RecordBatch`创建`RecordBatchIterator`
let batchs = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
// 创建表,并插入初始化数据
let table = db
.create_table("table_with_person", batchs)
.execute()
.await?;
Ok(table)
}
  1. 包依赖文件 Cargo.toml文件内容如下:
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
arrow-schema = "51.0"
  1. 运行结果如下: LanceDB创建带初始化数据的表

初始化记录并创建表

  1. 完整代码
use arrow_array::types::Float32Type;
use arrow_array::{FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field, Schema};
use lancedb::arrow::IntoArrow;
use lancedb::{connect, Result, Table as LanceDbTable};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let created_table_with_records = create_table_with_records().await?;
println!(
"Created table with records: {}, Table name: {}",
created_table_with_records,
created_table_with_records.name()
);
Ok(())
}

#[allow(unused)]
async fn create_table_with_records() -> Result<LanceDbTable> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;

let initial_data = create_some_records()?;
let tbl = db.create_table("my_table", initial_data).execute().await?;

let new_data = create_some_records()?;
// NOTICE: 只有实现了 IntoArrow 的类型才能使用`add`方法,即`create_some_records`返回的类型
tbl.add(new_data).execute().await?;
Ok(tbl)
}

#[allow(unused)]
fn create_some_records() -> Result<impl IntoArrow> {
const TOTAL: usize = 1000;
const DIM: usize = 128;

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
DIM as i32,
),
true,
),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..TOTAL as i32)),
Arc::new(
FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
(0..TOTAL).map(|_| Some(vec![Some(1.0); DIM])),
DIM as i32,
),
),
],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
Ok(Box::new(batches))
}
  1. 包依赖文件 Cargo.toml文件内容如下:
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
arrow-schema = "51.0"
arrow-array = "51.0"
  1. 运行结果如下: LanceDB初始化记录并创建表

打开已存在的表

  1. 完整代码
use lancedb::{connect, Result, Table as LanceDbTable};

async fn main() -> Result<()> {
let opened_table = open_with_existing_table().await?;
println!(
"Opened table: {}, Table name: {}",
opened_table,
opened_table.name()
);
}

#[allow(unused)]
async fn open_with_existing_table() -> Result<LanceDbTable> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("my_table").execute().await?;
Ok(table)
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
  1. 运行结果如下: LanceDB打开已存在的表

删除表记录

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);

delete_table_records().await?; // 根据条件删除表中的记录
Ok(())
}

#[allow(unused)]
async fn delete_table_records() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("my_table").execute().await?;
table.delete("id > 24").await?;
Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

删除表

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);

drop_table().await?; // 删除 data/sample-lancedb/my_table
Ok(())
}

#[allow(unused)]
async fn drop_table() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
db.drop_table("my_table").await?;
Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

删除数据库

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);

drop_database().await?; // 删除 data/sample-lancedb
Ok(())
}

#[allow(unused)]
async fn drop_database() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
db.drop_db().await?;
Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

查询表记录

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {

let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);
Ok(())
}

#[allow(unused)]
async fn query_table() -> Result<VectorQuery> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("my_table").execute().await?;
let result = table.query().nearest_to(&[1.0; 128])?;
Ok(result)
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
  1. 运行结果如下: LanceDB查询表记录

更新表记录

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {

update_table().await?;
Ok(())
}

#[allow(unused)]
async fn update_table() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("table_with_person").execute().await?;
println!("Before update: {:?}", table.query());
table
.update()
.only_if("id=0")
.column("name", "Bob")
.execute()
.await?; // Bob -> Halzzz

Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

总结

本文详细介绍了如何使用Rust编程语言与LanceDB进行交互, 包括表的创建、插入数据、查询、更新和删除操作。 通过这些示例,展示了LanceDB在处理矢量数据和支持多模态数据方面的强大功能, 以及如何通过Rust代码实现这些操作。 LanceDB提供了丰富的API接口,简化了数据库操作, 使得开发者能够高效地管理和查询数据。

希望通过本文的讲解,您能够更好地理解并应用LanceDB来解决实际问题。

链接

鱼雪

本文将详细介绍如何从头开始使用LanceDB。每个步骤都附有详细的说明和图示,帮助您快速上手LanceDB。

LanceDB创建数据库基本调用关系图示

环境搭建

在开始之前,请确保您的开发环境中安装了以下工具:

  • Rust: 用于编写和编译代码。
  • Tokio: 异步运行时。
  • LanceDB: 数据库库。

Rust安装

使用以下命令安装Rust:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

创建Rust项目

创建一个新的Rust项目:

cargo new lancedb-example
cd lancedb-example

安装Tokio

在项目中添加Tokio依赖:

cargo add tokio --features rt-multi-thread

安装LanceDB

在项目中添加LanceDB依赖:

cargo add lancedb

代码解析

以下是一个简单的LanceDB数据库连接示例:

use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
let uri = "data/example-lancedb";
let db_builder = connect(uri);
let db_connect = connect(uri).execute().await?;
println!("LanceDB builder: {:?}", db_builder);
println!("LanceDB connect: {}", db_connect);
Ok(())
}

代码说明

  • URI: 数据库的URI。
  • connect: 接收URI参数,返回ConnectBuilder
  • execute: 返回Connection,在执行目录创建数据库。

connect函数

connect函数用于创建一个ConnectBuilder实例:

pub fn connect(uri: &str) -> ConnectBuilder {
ConnectBuilder::new(uri)
}

ConnectBuilder结构体

ConnectBuilder用于配置和建立与LanceDB数据库的连接。

主要字段和方法包括:

  • uri: String: 数据库的URI。
  • execute(self) -> Result<Connection>: 执行连接建立。

Database结构体

Database封装了与实际数据库交互的逻辑。

Connection结构体

Connection表示与LanceDB数据库的连接实例。

总结

通过本文,您可以了解如何在Rust中使用LanceDB进行数据库操作。希望这篇指南能帮助您快速上手LanceDB。

相关链接

鱼雪