Skip to main content

34 posts tagged with "Rust"

View All Tags

本文介绍Arrow生态持久化存储文件的读取方式之一,使用Rust parquet库读取parquet文件。

Cargo.toml中依赖库如下

[package]
name = "parquet_rs_read_parquet"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.86"
arrow = { version = "52.1.0", features = ["prettyprint"] }
parquet = "52.1.0"

代码如下

use anyhow::Result;
use arrow::array::AsArray;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;

const PQ_FILE: &str = "../assets/sample.parquet";

fn main() -> Result<()> {
read_with_parquet(PQ_FILE)?;
Ok(())
}

fn read_with_parquet(file: &str) -> Result<()> {
let file = File::open(file)?;
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
.with_batch_size(8192)
.with_limit(3)
.build()?;

for record_batch in reader {
let record_batch = record_batch?;
let emails = record_batch.column(0).as_binary::<i32>();

for email in emails {
let email = email.unwrap();
println!("{:?}", String::from_utf8_lossy(email));
}
}
Ok(())
}
  • PQ_FILE: 表示parquet文件的路径,当前我们使用的相对目录,这个目录是运行时的相 对目录,即cargo run命令运行时的目录即为当前目录
  • record_batch.column(0).as_binary::<i32>(): 这里需要用到 arrow::array::AsArray Trait
  • ParquetRecordBatchReaderBuilder: build()函数返回一个ParquetRecordBatchReader
  • String::from_utf8_lossy(): 将二进制转换为字符串

运行

# `cd parquet_rs_read_parquet`, otherwise, the `sample.parquet` file cannot be found.
cargo run
  • 执行结果

Rust parquet库读取parquet文件运行结果

链接

鱼雪

读取 CSV 文件当做,然后以 SQL 方式对进行操作。

安装依赖

cargo add tokio --features rt-multi-thread
cargo add datafusion

DataFusion 中运行 SQL 查询

  1. 引入库
use datafusion::prelude::*;
use datafusion::error::Result;
  1. 注册表
let ctx = SessionContext::new();
ctx.register_csv("example", "assets/example.csv", CsvReadOption::new()).await?;

在 SQL 语句中执行时,将 CSV 文件注册成一个表。

  • 源码
pub async fn register_csv(
&self,
name: &str,
table_path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
let listing_options = options
.to_listing_options(&self.copied_config(), self.copied_table_options());

self.register_listing_table(
name,
table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;

Ok(())
}
  • register_csv函数参数:

    • name: &str: 注册成的表名,表名example在后面查询中使用

    • table_path: &str: CSV 文件路径

    • options: CsvReadOptions: 读取 CSV 文件的可选项设置

      • has_header: bool: 如果对没有标题的文件进行模式推断,将创建默认的列名。
      • delimiter: u8: 可选的列分隔符。默认为 b','
      • quote: u8: 可选的引号字符。默认为 b'"'
      • escape: Option<u8>: 可选的转义字符。默认为 None
      • comment: Option<u8>: 如果启用,以这个字节开头的行将被忽略。。
      • schema: Option<&'a Schema>: 可选的表示 CSV 文件的模式。如果为 None,CSV 读取器将尝试根据文件中的数据推断模式。
      • schema_infer_max_records: usize: 如果需要进行模式推断,从 CSV 文件中读取的最大行数。默认为 DEFAULT_SCHEMA_INFER_MAX_RECORD
      • file_extension: &'a str: 文件扩展名;只有具有此扩展名的文件才会被选择用于数据输入。默认为 FileType::CSV.get_ext().as_str()
      • table_partition_cols: Vec<(String, DataType)>: 部分列
      • file_compression_type: FileCompressionType: 文件压缩类型
      • file_sort_order: Vec<Vec<Expr>>: 指示文件的排序方式
  1. 创建一个计划运行 SQL 查询
let df = ctx.sql("SELECT a, MIN(b), FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
self.sql_with_options(sql, SQLOptions::new()).await
}

sql函数参数:

  • sql: 待执行的 SQL 语句
  1. 打印结果
df.show().await?;
  • 源码
pub async fn show(self) -> Result<()> {
let results = self.collect().await?;
Ok(pretty::print_batches(&results)?)
}
  • 返回值:

    • results的类型为Vec<RecordBatch>

执行结果

DataFusion

鱼雪

LanceDB是一种使用持久存储构建的用于矢量搜索的开源数据库,极大简化了检索、过滤和嵌入管理。 LanceDB的主要功能包括:

  1. 生产规模的矢量搜索,无需管理服务器。
  2. 存储、查询和过滤矢量、元数据和多模态数据(文本、图像、视频、点云等)。
  3. 支持矢量相似性搜索、全文搜索和SQL。
  4. 本地支持Python和Javascript/Typescript。
  5. 零拷贝、自动版本管理,管理数据版本而无需额外基础设施。
  6. GPU支持构建矢量索引(*)。
  7. 生态系统集成
  • LangChain🦜️🔗
  • LlamaIndex🦙
  • Apache-Arrow
  • Pandas
  • Polars
  • DuckDB

关于LanceDB中Table介绍

Table方法

  • name(): 获取表的名称
  • schema(): 获取表的模式
  • count_rows(): 获取表中的行数
  • add(): 添加记录添加到表中,但传入的参数是需要实现IntoArrow的类型
  • query(): 查询表中的记录
  • update(): 更新表中的记录
  • delete(): 删除表中的记录
  • create_index(): 创建索引
  • merge_insert(): 合并插入
  • vector_search(): 矢量搜索
  • optimize(): 优化表
  • add_columns(): 添加(多)列
  • alter_columns(): 修改(多)列
  • drop_columns(): 删除(多)列
  • version(): 获取表的版本,由于LanceDB使用版本控制变化
  • checkpoint(): 根据指定版本获取检查点
  • checkpoint_latest(): 获取最新检查点
  • restore(): 恢复到指定版本
  • list_indices(): 列出表的索引

Schema模式定义Table定义方式

LanceDB Schema关系图

LanceDB Table依赖关系图

创建空表

1 完整代码

use arrow_schema::{DataType, Field, Schema};
use lancedb::{connect, Result};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let created_empty_talbe = create_empty_table().await?;
println!(
"Created empty table: {}, Table name: {}",
created_empty_talbe,
created_empty_talbe.name()
);
Ok(())
}

#[allow(unused)]
async fn create_empty_table() -> Result<LanceDbTable> {
// 创建模式定义
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
// 创建数据库URI目录
let uri = "data/sample-lancedb";
// 连接数据库
let db = connect(uri).execute().await?;
// 创建一个空表
let table = db
.create_empty_table("empty_talbe", schema)
.execute()
.await?;
Ok(table)
}
  1. 包依赖文件 Cargo.toml文件内容如下:
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
arrow-schema = "51.0"
  1. 运行结果如下: LanceDB创建空表

创建带初始化数据的表

  1. 完整代码如下:
use arrow_schema::{DataType, Field, Schema};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use lancedb::{connect, Result, Table as LanceDbTable};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let created_table_with_data = create_table_with_data().await?;
println!(
"Created table with data: {}, Table name: {}",
created_table_with_data,
created_table_with_data.name()
);
Ok(())
}

#[allow(unused)]
async fn create_table_with_data() -> Result<LanceDbTable> {
// 创建本地数据库URI目录
let uri = "data/sample-lancedb";
// 连接数据库
let db = connect(uri).execute().await?;

// 创建模式定义
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

// 初始化`ids`列的数据
let ids = Int32Array::from(vec![1, 2, 3]);
// 初始化`name`列的数据
let names = StringArray::from(vec!["Alice", "Bob", "Lily"]);
// 使用`Schema`以及列数据创建`RecordBatch`
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(names)])?;
// 使用`RecordBatch`创建`RecordBatchIterator`
let batchs = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
// 创建表,并插入初始化数据
let table = db
.create_table("table_with_person", batchs)
.execute()
.await?;
Ok(table)
}
  1. 包依赖文件 Cargo.toml文件内容如下:
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
arrow-schema = "51.0"
  1. 运行结果如下: LanceDB创建带初始化数据的表

初始化记录并创建表

  1. 完整代码
use arrow_array::types::Float32Type;
use arrow_array::{FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field, Schema};
use lancedb::arrow::IntoArrow;
use lancedb::{connect, Result, Table as LanceDbTable};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let created_table_with_records = create_table_with_records().await?;
println!(
"Created table with records: {}, Table name: {}",
created_table_with_records,
created_table_with_records.name()
);
Ok(())
}

#[allow(unused)]
async fn create_table_with_records() -> Result<LanceDbTable> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;

let initial_data = create_some_records()?;
let tbl = db.create_table("my_table", initial_data).execute().await?;

let new_data = create_some_records()?;
// NOTICE: 只有实现了 IntoArrow 的类型才能使用`add`方法,即`create_some_records`返回的类型
tbl.add(new_data).execute().await?;
Ok(tbl)
}

#[allow(unused)]
fn create_some_records() -> Result<impl IntoArrow> {
const TOTAL: usize = 1000;
const DIM: usize = 128;

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
DIM as i32,
),
true,
),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..TOTAL as i32)),
Arc::new(
FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
(0..TOTAL).map(|_| Some(vec![Some(1.0); DIM])),
DIM as i32,
),
),
],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
Ok(Box::new(batches))
}
  1. 包依赖文件 Cargo.toml文件内容如下:
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
arrow-schema = "51.0"
arrow-array = "51.0"
  1. 运行结果如下: LanceDB初始化记录并创建表

打开已存在的表

  1. 完整代码
use lancedb::{connect, Result, Table as LanceDbTable};

async fn main() -> Result<()> {
let opened_table = open_with_existing_table().await?;
println!(
"Opened table: {}, Table name: {}",
opened_table,
opened_table.name()
);
}

#[allow(unused)]
async fn open_with_existing_table() -> Result<LanceDbTable> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("my_table").execute().await?;
Ok(table)
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
  1. 运行结果如下: LanceDB打开已存在的表

删除表记录

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);

delete_table_records().await?; // 根据条件删除表中的记录
Ok(())
}

#[allow(unused)]
async fn delete_table_records() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("my_table").execute().await?;
table.delete("id > 24").await?;
Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

删除表

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);

drop_table().await?; // 删除 data/sample-lancedb/my_table
Ok(())
}

#[allow(unused)]
async fn drop_table() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
db.drop_table("my_table").await?;
Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

删除数据库

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);

drop_database().await?; // 删除 data/sample-lancedb
Ok(())
}

#[allow(unused)]
async fn drop_database() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
db.drop_db().await?;
Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

查询表记录

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {

let queried_result = query_table().await?;
println!("Query result: {:?}", queried_result);
Ok(())
}

#[allow(unused)]
async fn query_table() -> Result<VectorQuery> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("my_table").execute().await?;
let result = table.query().nearest_to(&[1.0; 128])?;
Ok(result)
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}
  1. 运行结果如下: LanceDB查询表记录

更新表记录

  1. 完整代码
use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {

update_table().await?;
Ok(())
}

#[allow(unused)]
async fn update_table() -> Result<()> {
let uri = "data/sample-lancedb";
let db = connect(uri).execute().await?;
let table = db.open_table("table_with_person").execute().await?;
println!("Before update: {:?}", table.query());
table
.update()
.only_if("id=0")
.column("name", "Bob")
.execute()
.await?; // Bob -> Halzzz

Ok(())
}
  1. 包依赖文件
lancedb = "0.7.0"
tokio = {version = "1.38.0", features = ["rt-multi-thread"]}

总结

本文详细介绍了如何使用Rust编程语言与LanceDB进行交互, 包括表的创建、插入数据、查询、更新和删除操作。 通过这些示例,展示了LanceDB在处理矢量数据和支持多模态数据方面的强大功能, 以及如何通过Rust代码实现这些操作。 LanceDB提供了丰富的API接口,简化了数据库操作, 使得开发者能够高效地管理和查询数据。

希望通过本文的讲解,您能够更好地理解并应用LanceDB来解决实际问题。

链接

鱼雪

本文介绍从头开始使用LanceDB,每一个步骤会给出详细的说明和解释以及图示说明,帮助读者快速上手LanceDB。

LanceDB创建数据库基本调用关系图示

环境搭建

  • Rust
  • tokio
  • lancedb

Rust安装

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

创建Rust项目

cargo new lancedb-example
cd lancedb-example

安装Tokio

cargo add tokio --features rt-multi-thread

安装LanceDB

cargo add lancedb

代码解析

use lancedb::{connect, Result};

#[tokio::main]
async fn main() -> Result<()> {
uri = "data/example-lancedb";
db_builder = connect(uri);
db_connect = connect(uri).execute().await?;
println!("LanceDB builder: {:}", db_builder);
println!("LanceDB connect: {}", db_connect);
Ok(())
}
note

上述代码创建了一个最简单的LanceDB数据库创建和连接,并且打印ConnectBuilderConnection:

  • uri: 表示数据库的URI
  • connect: 接收URI参数,返回ConnectBuilder, COnnectBuilder实现了Debug trait
  • execute: 返回Connection, 实现了Display trait,会在执行的目录创建 data/example-lancedb目录

connect函数的定义如下

/// Connect to a LanceDB database.
///
/// # Arguments
///
/// * `uri` - URI where the database is located, can be a local directory, supported remote cloud storage,
/// or a LanceDB Cloud database. See [ConnectOptions::uri] for a list of accepted formats
pub fn connect(uri: &str) -> ConnectBuilder {
ConnectBuilder::new(uri)
}
note

connect函数中创建了ConnectBuilder,调用connect即创建了一个ConnectBuilder

ConnectBuilder

pub struct ConnectBuilder {
/// Database URI
///
/// ### Accpeted URI formats
///
/// - `/path/to/database` - local database on file system.
/// - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud object store
/// - `db://dbname` - LanceDB Cloud
uri: String,

/// LanceDB Cloud API key, required if using Lance Cloud
api_key: Option<String>,
/// LanceDB Cloud region, required if using Lance Cloud
region: Option<String>,
/// LanceDB Cloud host override, only required if using an on-premises Lance Cloud instance
host_override: Option<String>,

storage_options: HashMap<String, String>,

/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
/// reasons, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero timedelta
/// for eventual consistency. If more than that interval has passed since
/// the last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// always consistent.
read_consistency_interval: Option<std::time::Duration>,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
}

impl ConnectBuilder {
/// Create a new [`ConnectOptions`] with the given database URI.
pub fn new(uri: &str) -> Self {
Self {
uri: uri.to_string(),
api_key: None,
region: None,
host_override: None,
read_consistency_interval: None,
storage_options: HashMap::new(),
embedding_registry: None,
}
}

// ......

/// Establishes a connection to the database
pub async fn execute(self) -> Result<Connection> {
if self.uri.starts_with("db") {
self.execute_remote()
} else {
let internal = Arc::new(Database::connect_with_options(&self).await?);
Ok(Connection {
internal,
uri: self.uri,
})
}
}
}
note

ConnectBuilder是一个结构体,用于配置和建立与LanceDB数据库的连接

作用:

ConnectBuilder通过存储连接参数(如URI、API秘钥、区域等)来构建数据库连接。

它提供了一些方法来设置这些参数,并通过execute方法建立实际的数据库连接。

主要字段和方法:

  • uri: String: 数据库的URI
  • api_key: Option<String>: LanceDB Cloud的API秘钥
  • region: Option<String>: LanceDB Cloud的区域
  • storage_options: HashMap<String, String>: 存储选项
  • read_consistency_interval: Option<std::time::Duration>: 读一致性检查间隔。
  • embedding_registry: Option<Arc<dyn EmbeddingRegistry>>: 嵌入注册表。

主要方法:

  • new(uri: &str) -> Self: 创建一个新的ConnectBuilder实例
  • execute(self) -> Result<Connection>: 执行连接建立,返回一个Connection实例

execute函数:

  • execute函数根据uri以及数据库连接选项,创建数据库连接
  • execute函数中首先判断uri是否是以db开头,如果是db开头,执行远程执行。
  • 否则,使用Database设置连接选项,传入Connection对象,使用选项和uri创建Connection

Database

#[derive(Debug)]
struct Database {
object_store: ObjectStore,
query_string: Option<String>,

pub(crate) uri: String,
pub(crate) base_path: object_store::path::Path,

// the object store wrapper to use on write path
pub(crate) store_wrapper: Option<Arc<dyn WrappingObjectStore>>,

read_consistency_interval: Option<std::time::Duration>,

// Storage options to be inherited by tables created from this connection
storage_options: HashMap<String, String>,
embedding_registry: Arc<dyn EmbeddingRegistry>,
}

impl Database {
async fn connect_with_options(options: &ConnectBuilder) -> Result<Self> {
let uri = &options.uri;
let parse_res = url::Url::parse(uri);

// TODO: pass params regardless of OS
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
Self::open_path(
uri,
options.read_consistency_interval,
options.embedding_registry.clone(),
)
.await
}
Ok(mut url) => {
// iter thru the query params and extract the commit store param
let mut engine = None;
let mut mirrored_store = None;
let mut filtered_querys = vec![];

// WARNING: specifying engine is NOT a publicly supported feature in lancedb yet
// THE API WILL CHANGE
for (key, value) in url.query_pairs() {
if key == ENGINE {
engine = Some(value.to_string());
} else if key == MIRRORED_STORE {
if cfg!(windows) {
return Err(Error::NotSupported {
message: "mirrored store is not supported on windows".into(),
});
}
mirrored_store = Some(value.to_string());
} else {
// to owned so we can modify the url
filtered_querys.push((key.to_string(), value.to_string()));
}
}

// Filter out the commit store query param -- it's a lancedb param
url.query_pairs_mut().clear();
url.query_pairs_mut().extend_pairs(filtered_querys);
// Take a copy of the query string so we can propagate it to lance
let query_string = url.query().map(|s| s.to_string());
// clear the query string so we can use the url as the base uri
// use .set_query(None) instead of .set_query("") because the latter
// will add a trailing '?' to the url
url.set_query(None);

let table_base_uri = if let Some(store) = engine {
static WARN_ONCE: std::sync::Once = std::sync::Once::new();
WARN_ONCE.call_once(|| {
log::warn!("Specifing engine is not a publicly supported feature in lancedb yet. THE API WILL CHANGE");
});
let old_scheme = url.scheme().to_string();
let new_scheme = format!("{}+{}", old_scheme, store);
url.to_string().replacen(&old_scheme, &new_scheme, 1)
} else {
url.to_string()
};

let plain_uri = url.to_string();

let storage_options = options.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(&plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}

let write_store_wrapper = match mirrored_store {
Some(path) => {
let mirrored_store = Arc::new(LocalFileSystem::new_with_prefix(path)?);
let wrapper = MirroringObjectStoreWrapper::new(mirrored_store);
Some(Arc::new(wrapper) as Arc<dyn WrappingObjectStore>)
}
None => None,
};

let embedding_registry = options
.embedding_registry
.clone()
.unwrap_or_else(|| Arc::new(MemoryRegistry::new()));
Ok(Self {
uri: table_base_uri,
query_string,
base_path,
object_store,
store_wrapper: write_store_wrapper,
read_consistency_interval: options.read_consistency_interval,
storage_options,
embedding_registry,
})
}
Err(_) => {
Self::open_path(
uri,
options.read_consistency_interval,
options.embedding_registry.clone(),
)
.await
}
}
}
}
note

Database是一个结构体,表示LanceDB数据库实例

作用:

Database结构体封装了与实际数据库交互的逻辑。 它负责管理数据库的基本路径、对象存储、嵌入注册表等信息, 并提供方法来连接和操作数据库。

主要字段:

  • object_store: ObjectStore: 数据库的对象存储。
  • query_string: Option<String>: 查询字符串。
  • uri: String: 数据库的URI。
  • base_path: object_store::path::Path: 基础路径。
  • store_wrapper: Option<Arc<dyn WrappingObjectStore>>: 存储包装器。
  • read_consistency_interval: Option<std::time::Duration>: 读一致性检查间隔。
  • storage_options: HashMap<String, String>: 存储选项。
  • embedding_registry: Arc<dyn EmbeddingRegistry>: 嵌入注册表。

主要方法:

  • connect_with_options(options: &ConnectBuilder) -> Result<Self>: 根据ConnectBuilder配置建立数据库连接。

Connection

/// A connection to LanceDB
#[derive(Clone)]
pub struct Connection {
uri: String,
internal: Arc<dyn ConnectionInternal>,
}
note

Connection是一个结构体,表示与LanceDB数据库的连接实例

作用:

Connection结构体持有数据库连接的相关信息,并提供与数据库交互的接口。 它是建立在ConnectBuilder配置基础上的实际连接对象。

主要字段:

  • uri: String: 数据库的URI
  • internal: Arc<dyn ConnectionInternal>: 内部连接实现

总结

  • connect: 一个函数,用于创建ConnectBuilder实例,并初始化数据库连接设置。
  • ConnectBuilder: 一个结构体,负责配置和建立数据库连接。它存储连接参数,并提供execute方法来建立实际的数据库连接。
  • Connection: 一个结构体,表示与LanceDB数据库的连接实例,持有连接信息并提供与数据库交互的接口。
  • Database: 一个结构体,表示LanceDB数据库实例,封装了与数据库交互的逻辑和相关信息。

链接

鱼雪

这两天想用待办事项的番茄钟,以前也用过很多其他的产品,包括免费的、付费的,在手机上的还有在电脑上的都有。

然后最近都一直在学Rust的内容,以及之前学过一点Tauri,所以就想着自己写一个番茄钟的应用,然后就有了这个项目。

项目初衷

  1. 用Rust写一个待办事项番茄钟的应用,练习Rust的开发
  2. 足够简单好用,只专注于今天的事情, 不要有太多的功能(以往用的番茄钟都有很多功能,比如统计、报表等等,但是我只想专注于今天的事情, 比如四象限等还要想怎么安排,然后就容易导致事情拖延,托着拖着就不做了)
  3. 既有待办事项,又有番茄钟,两者结合,不要分开,这样就不用来回切换了
  4. 只专注于电脑端,不要有手机端,因为手机端的话,就容易分心,不专注于今天的事情了
  5. 只专注于电脑端,只专注于办公

项目地址

Github Repo: https://github.com/yuxuetr/todo-pomodoro

安装依赖

  1. 安装Rust
curl --proto '=https' --tlsv1.2 https://sh.rustup.rs -sSf | sh
  1. 安装Node.js 安装链接和方法

  2. 安装pnpm

npm i -g pnpm

构建方式

  1. 克隆仓库
git clone https://github.com/yuxuetr/todo-pomodoro.git
  1. 安装依赖
cd todo-pomodoro
pnpm install
  1. 构建运行项目
pnpm run tauri dev
  1. 运行结果如图 待办事项番茄钟

设置应用的图标

图标文件存放在src-tauri/icons下,在这下面有针对不同平台的应用图标格式。

图标的设置在tauri.conf.json文件中,如下所示

"bundle": {
"active": true,
"targets": "all",
"identifier": "com.yuxuetr.dev",
"icon": [
"icons/32x32.png",
"icons/128x128.png",
"icons/128x128@2x.png",
"icons/icon.icns",
"icons/icon.ico"
]
}

在MacOS下,应用的图标是icon.icns,所以需要将图标文件放到icons目录下,然后在tauri.conf.json文件中设置图标的路径。

MacOS平台应用的图标大小是1024x1024,所以需要将图标文件转换成1024x1024的大小,然后修改成icon.icns放到icons目录下。

如果需要将图标由直角转换成圆角,可以使用在线工具。也可以试试我写的工具。

有三个不同的版本:

应用发布

第一次发布应用,需要先构建应用需要使用--target参数,指定目标平台

cargo tauri build --target aarch64-apple-darwin
# 或者
pnpm tauri build --target aarch64-apple-darwin

之后每次发布应用,只需要执行下面的命令即可

pnpm tauri build

应用构建与发布

执行命令后,会弹出下面的窗口

应用安装

然后按照Mac应用的方式安装应用,将应用拖到应用文件夹即可

项目后续

  1. 项目后期增加统计功能(如果有必要的话)
  2. 项目后期增加命令行功能(Tauri支持clip命令,可以在命令行中使用)
鱼雪

在Axum中,中间件是处理请求和响应的强大工具。

它们可以在请求到达实际处理程序之前或响应发回客户端之前对其进行处理

Axum提供了多种方式来实现中间件。

本文将介绍两种常用的方法:使用from_fntower::Layer

方式一:from_fn

from_fn是一种简便的方法,可以快速创建中间件。它适用于那些不需要复杂逻辑的中间件。

示例代码: request_id

以下是一个通过from_fn实现请求ID中间件的示例:

// request_id.rs
use super::REQUEST_ID_HEADER;
use axum::{extract::Request, http::HeaderValue, middleware::Next, response::Response};
use tracing::warn;

pub async fn set_request_id(mut req: Request, next: Next) -> Response {
let id = match req.headers().get(REQUEST_ID_HEADER) {
Some(v) => Some(v.clone()),
None => {
let request_id = uuid::Uuid::new_v4().to_string();
match HeaderValue::from_str(&request_id) {
Ok(v) => {
req.headers_mut().insert(REQUEST_ID_HEADER, v.clone());
Some(v)
}
Err(e) => {
warn!("parse generated request id failed: {}", e);
None
}
}
}
};

let mut res = next.run(req).await;

if let Some(id) = id {
res.headers_mut().insert(REQUEST_ID_HEADER, id);
}
res
}

在这个示例中,中间件会检查请求头中是否包含请求ID。 如果没有,则生成一个新的UUID并将其添加到请求头中。 然后,继续处理请求并在响应头中添加相同的请求ID。

Token验证中间件

这里再提供一个示例,展示如何实现一个验证Token的中间件:

// auth.rs
use crate::AppState;
use axum::{
extract::{FromRequestParts, Request, State},
http::StatusCode,
middleware::Next,
response::{IntoResponse, Response},
};
use axum_extra::{
headers::{authorization::Bearer, Authorization},
TypedHeader;
use tracing::warn;

pub async fn verify_token(State(state): State<AppState>, req: Request, next: Next) -> Response {
let (mut parts, body) = req.into_parts();
let req = match TypedHeader::<Authorization<Bearer>>::from_request_parts(&mut parts, &state).await {
Ok(TypedHeader(Authorization(bearer))) => {
let token = bearer.token();
match state.dk.verify(token) {
Ok(user) => {
let mut req = Request::from_parts(parts, body);
req.extensions_mut().insert(user);
req
}
Err(e) => {
let msg = format!("verify token failed: {}", e);
warn!(msg);
return (StatusCode::FORBIDDEN, msg).into_response();
}
}
}
Err(e) => {
let msg = format!("parse Authorization header failed: {}", e);
warn!(msg);
return (StatusCode::UNAUTHORIZED, msg).into_response();
}
};
next.run(req).await
}

在这个示例中,中间件会检查请求头中的Authorization Bearer token,并验证其有效性。 如果验证失败,将返回相应的错误响应。

方式二: tower::Layer

tower::Layer是创建复杂中间件的推荐方式。

它提供了更多的灵活性和控制,可以处理更复杂的逻辑。

示例代码

以下是一个通过tower::Layer实现服务器时间中间件的示例:

// server_time.rs
use super::{REQUEST_ID_HEADER, SERVER_TIME_HEADER};
use axum::{extract::Request, response::Response};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::time::Instant;
use tower::{Layer, Service};
use tracing::warn;

#[derive(Clone)]
pub struct ServerTimeLayer;

impl<S> Layer<S> for ServerTimeLayer {
type Service = ServerTimeMiddleware<S>;

fn layer(&self, inner: S) -> Self::Service {
ServerTimeMiddleware { inner }
}
}

#[derive(Clone)]
pub struct ServerTimeMiddleware<S> {
inner: S,
}

impl<S> Service<Request> for ServerTimeMiddleware<S>
where
S: Service<Request, Response = Response> + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request) -> Self::Future {
let start = Instant::now();
let future = self.inner.call(req);
Box::pin(async move {
let mut res: Response = future.await?;
let elapsed = format!("{}us", start.elapsed().as_micros());
match elapsed.parse() {
Ok(v) => {
res.headers_mut().insert(SERVER_TIME_HEADER, v);
}
Err(e) => {
warn!(
"Parse elapsed time failed: {} for request {:?}",
e,
res.headers().get(REQUEST_ID_HEADER)
);
}
}
Ok(res)
})
}
}

在这个示例中,中间件会记录请求处理的时间,并将其添加到响应头中。 它使用tower::Layer来创建一个自定义的中间件结构体ServerTimeMiddleware

通过这些示例,可以看到在Axum中实现中间件的灵活性和强大功能。 根据需求选择合适的方法,可以帮助你更好地管理和处理HTTP请求和响应。

总结

  1. 中间件概述
  • 中间件是处理请求和响应的工具,可以在请求到达处理程序之前或响应发回客户端之前对其进行处理。
  • Axum提供了多种实现中间件的方法,主要有from_fntower::Layer两种方式。
  1. 方式一:from_fn

    • from_fn方法适用于快速创建简单的中间件。
  • 示例代码展示了如何通过from_fn实现请求ID中间件:

    • 检查请求头是否包含请求ID,如果没有则生成一个新的UUID并添加到请求头中。
    • 继续处理请求,并在响应头中添加相同的请求ID。
  1. 方式二:tower::Layer

    • tower::Layer方法适用于创建复杂的中间件,提供更多灵活性和控制。
  • 示例代码展示了如何通过tower::Layer实现服务器时间中间件:

    • 记录请求处理时间,并将其添加到响应头中。
    • 创建一个自定义的中间件结构体ServerTimeMiddleware
  1. Token验证中间件

    • 示例代码展示了如何实现一个验证Token的中间件:

      • 检查请求头中的Authorization Bearer token,并验证其有效性。
    • 如果验证失败,返回相应的错误响应。

鱼雪

在Rust中,struct StructName(DemoName); 这种用法定义的事一个元组结构体(tuple struct), 它是一种特殊的结构体它的字段没有名称,只有索引

它并不会直接继承DemoName中所有pub内容,而是将DemoName作为一个单独的字段封装在结构体中。

为什么要这样做呢?这样做的好处是可以在不改变DemoName的情况下,为其添加额外的功能。

元组结构体的定义和使用

struct DemoName {
pub index: i32,
pub value: String,
}

struct StructName(DemoName);

在上面的例子中,StructName是一个元组结构体,它包含一个DemoName类型的字段。 这个字段可以通过索引0来访问

fn main() {
let demo = DemoName {
index: 1,
value: "Hello".to_string(),
};
let struct_name = StructName(demo);

// 访问内部的`DemoName`结构体
println!("Index: {}", struct_name.0.index);
println!("Value: {}", struct_name.0.value);
}

在这个例子中,我们创建了一个DemoName实例, 并将其作为一个字段封装在StructName结构体中。 要访问DemoName中的字段,我们使用struct_name.0来访问。

自动解引用和Deref

通过实现Deref trait,可以使元组结构体在使用时表现的像它所包含的值一样,从而简化代码。

use std::ops::Deref;

struct DemoName {
pub index: i32,
pub value: String,
}

struct StructName(DemoName);

impl Deref for StructName {
type Target = DemoName;

fn deref(&self) -> &Self::Target {
&self.0
}
}

fn main() {
let demo = DemoName {
index: 1,
value: "Hello".to_string(),
};
let struct_name = StructName(demo);

// 访问内部的`DemoName`结构体
println!("Index: {}", struct_name.index);
println!("Value: {}", struct_name.value);
}

在这个例子中,通过实现Deref trait,我们可以直接访问StructName中的DemoName字段, 而不需要显式地使用索引0

结构体的创建和使用方式

普通结构体

普通结构体的字段有名字,可以通过名字来访问。

struct DemoName {
pub index: i32,
pub value: String,
}

fn main() {
let demo = DemoName {
index: 1,
value: "Hello".to_string(),
};

// 访问结构体的字段
println!("Index: {}", demo.index);
println!("Value: {}", demo.value);
}

元素结构体

元组结构体的字段没有名字,只有索引,可以通过索引来访问。

struct DemoName (i32, String);

fn main() {
let demo = DemoName(1, "Hello".to_string());

// 访问元组结构体的字段
println!("Index: {}", demo.0);
println!("Value: {}", demo.1);
}

单元结构体

单元结构体没有字段,通常用于标记或实现某些trait。

struct UnitStruct;

fn main() {
let unit = UnitStruct;
}

实际使用案例: JWT签名和验证

use crate::{AppError, User};
use jwt_simple::prelude::*;
use std::ops::Deref;

const JWT_DURATION: u64 = 60 * 60 * 24 * 7;
const JWT_ISS: &str = "chat_server";
const JWT_AUD: &str = "chat_web";

pub struct EncodingKey(Ed25519KeyPair);
pub struct DecodingKey(Ed25519PublicKey);

impl EncodingKey {
pub fn load(pem: &str) -> Result<Self, AppError> {
Ok(Self(Ed25519KeyPair::from_pem(pem)?))
}

pub fn sign(&self, user: impl Into<User>) -> Result<String, AppError> {
let claims = Claims::with_custom_claims(
user.into(), Duration::from_secs(JWT_DURATION));
let claims = claims.with_issuer(JWT_ISS).with_audience(JWT_AUD);
Ok(self.0.sign(claims)?)
}
}

impl DecodingKey {
pub fn load(pem: &str) -> Result<Self, AppError> {
Ok(Self(Ed25519PublicKey::from_pem(pem)?))
}

pub fn verify(&self, token: &str) -> Result<User, AppError> {
let mut options = VerificationOptions::default();
options.allowed_issuers = Some(HashSet::from_strings(&[JWT_ISS]));
options.allowed_audiences = Some(HashSet::from_strings(&[JWT_AUD]));
let claims = self.0.verify_token::<User>(token, Some(options))?;
Ok(claims.custom)
}
}

impl Deref for EncodingKey {
type Target = Ed25519KeyPair;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Deref for DecodingKey {
type Target = Ed25519PublicKey;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod tests {
use anyhow::Result;
use super::*;
use crate::User;

#[tokio::test]
async fn test_jwt() -> Result<()> {
let encoding_pem = include_str!("../../fixtures/encoding.pem");
let decoding_pem = include_str!("../../fixtures/decoding.pem");
let ek = EncodingKey::load(encoding_pem)?;
let dk = DecodingKey::load(decoding_pem)?;

let user = User::new(1, "Hal", "halzzz@gmail.com");

let token = ek.sign(user.clone())?;
let user2 = dk.verify(&token)?;
assert_eq!(user, user2);
Ok(())
}
}

在这个例子中,我们定义了两个结构体EncodingKeyDecodingKey, 它们分别用于签名和验证JWT令牌。 这里使用EncodingKey封装了Ed25519KeyPair,使其方便加载密钥签名。 这里使用DecodingKey封装了Ed25519PublicKey,使其方便加载公钥验证

通过实现Deref trait,我们可以直接访问Ed25519KeyPairEd25519PublicKey中的方法, 而不需要显式地使用self.0来访问。

总结

本文介绍了元组结构体(tuple struct)的定义和使用,以及如何通过实现Deref trait来简化代码。 元组结构体是一种特殊的结构体,它的字段没有名称,只有索引。 通过实现Deref trait,可以使元组结构体在使用时表现的像它所包含的值一样,从而简化代码。

鱼雪

Deref trait 是 Rust 中的一个特性,它允许我们重载解引用运算符 *。 这个特性在 Rust 中非常有用, 因为它允许我们在自定义类型上使用解引用运算符,而不需要手动调用 *

在Rust中,Deref trait 是一个非常强大的工具, 它允许你通过解引用运算符*来访问底层数据

Deref trait 最常见的用途之一 是将自定义智能指针类型转换为其内部持有的数据类型。

通过实现 Deref trait,你可以使你的自定义类型与标准库中的类型(如引用和智能指针)具有相同的行为。

使用 Deref 的情形

  • 自定义智能指针:如果你创建了一个自定义智能指针类型,可以通过实现 Deref trait 来使其像标准指针一样工作。
  • 类型转换:当你希望你的类型在某些上下文中像另一个类型一样工作时,可以使用 Deref 进行隐式转换。
  • 函数调用简化:当你希望你的类型能够自动解引用以便调用底层类型的方法时,Deref 非常有用。

情形1: 自定义智能指针

情形说明

  • 如果你创建了一个自定义智能指针类型,可以通过实现 Deref trait 来使其像标准指针一样工作。
use std::ops::Deref;

struct MyBox<T> {
value: T,
}

impl<T> MyBox<T> {
fn new(value: T) -> MyBox<T> {
MyBox { value }
}
}

impl<T> Deref for MyBox<T> {
type Target = T;

fn deref(&self) -> &T {
&self.value
}
}

fn main() {
let x = 5;
let y = MyBox::new(x);

// 通过 Deref trait 自动解引用
println!("Value in MyBox: {}", *y);
}

情形2: 类型转换

情形说明

  • 当你希望你的类型在某些上下文中像另一个类型一样工作时,可以使用 Deref 进行隐式转换
use std::ops::Deref;

struct MyBox<T> {
value: T,
}

impl<T> MyBox<T> {
fn new(value: T) -> MyBox<T> {
MyBox { value }
}
}

impl<T> Deref for MyBox<T> {
type Target = T;

fn deref(&self) -> &T {
&self.value
}
}

fn print_value(value: &i32) {
println!("Value: {}", value);
}

fn main() {
let x = 5;
let y = MyBox::new(x);

// 通过 Deref trait 自动解引用
print_value(&y);
}

情形3: 函数调用简化

情形说明

  • 当你希望你的类型能够自动解引用以便调用底层类型的方法时,Deref 非常有用
use crate::{AppError, User};
use jwt_simple::prelude::*;
use std::ops::Deref;

const JWT_DURATION: u64 = 60 * 60 * 24 * 7;

pub struct EncodingKey(Ed25519KeyPair);
pub struct DecodingKey(Ed25519PublicKey);

impl EncodingKey {
pub fn load(pem: &str) -> Result<Self, AppError> {
Ok(Self(Ed25519KeyPair::from_pem(pem)?))
}

pub fn sign(user: User, key: &EncodingKey) -> Result<String, AppError> {
let claims = Claims::with_custom_claims(user, Duration::from_secs(JWT_DURATION));
Ok(key.sign(claims)?) // 这里key为引用,但是我们想要像操作结构体本身一样使用
}
}

impl DecodingKey {
pub fn load(pem: &str) -> Result<Self, AppError> {
Ok(Self(Ed25519PublicKey::from_pem(pem)?))
}

pub fn verify(token: &str, key: &DecodingKey) -> Result<User, AppError> {
let claims = key.verify_token::<User>(token, None)?; // 这里key为引用,但是我们想要像操作结构体本身一样使用
Ok(claims.custom)
}
}

impl Deref for EncodingKey {
type Target = Ed25519KeyPair;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Deref for DecodingKey {
type Target = Ed25519PublicKey;

fn deref(&self) -> &Self::Target {
&self.0
}
}

如果参数传递的是结构体的引用,并且你想要在使用该引用时像操作结构体本身一样使用

那么实现 Deref trait 是一个很好的选择

通过实现 Deref trait,你可以让结构体的引用表现得像结构体本身一样,从而简化代码并提高可读性。

总结

Deref trait 主要用在智能指针类型和自定义类型上, 它可以帮助我们简化代码、减少重复、提高可读性。

Rust中传参,优先使用引用,而不是值, 实现 Deref trait 可以让你在使用引用时像操作结构体本身一样使用, 从而提高代码的可读性和简洁性。

鱼雪