@@ -3,13 +3,18 @@ use crate::config;
3
3
use crate :: embedding;
4
4
use crate :: llm;
5
5
use crate :: utils;
6
- use std:: time:: UNIX_EPOCH ;
6
+ use crate :: state:: { IndexState , IndexedChunk } ;
7
+ use futures:: stream:: { FuturesUnordered , StreamExt } ;
8
+ use std:: time:: { Duration , UNIX_EPOCH } ;
9
+ use tokio:: time:: sleep;
7
10
use indicatif:: { ProgressBar , ProgressStyle } ;
8
11
use reqwest:: Client ;
9
12
use std:: fs;
10
13
use std:: path:: Path ;
11
14
use uuid:: Uuid ;
12
15
16
+ const BATCH_SIZE : usize = 8 ;
17
+
13
18
pub fn handle_config ( set_api_key : Option < String > , show : bool ) -> anyhow:: Result < ( ) > {
14
19
let config_path = dirs:: config_dir ( )
15
20
. ok_or_else ( || anyhow:: anyhow!( "Unable to determine config directory" ) ) ?
@@ -57,7 +62,7 @@ pub fn handle_config(set_api_key: Option<String>, show: bool) -> anyhow::Result<
57
62
}
58
63
59
64
pub async fn handle_index ( client : & Client , path : & Path ) -> anyhow:: Result < ( ) > {
60
- let paths = crate :: utils:: collect_files ( path) ?;
65
+ let paths = utils:: collect_files ( path) ?;
61
66
let total_files = paths. len ( ) as u64 ;
62
67
let pb = ProgressBar :: new ( total_files) ;
63
68
pb. set_style (
@@ -66,16 +71,16 @@ pub async fn handle_index(client: &Client, path: &Path) -> anyhow::Result<()> {
66
71
. tick_chars ( "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏ " ) ,
67
72
) ;
68
73
69
- let config_dir = crate :: config:: get_config_dir ( ) ?;
70
- let mut state = crate :: state :: IndexState :: load ( & config_dir) ?;
74
+ let config_dir = config:: get_config_dir ( ) ?;
75
+ let mut state = IndexState :: load ( & config_dir) ?;
71
76
72
77
for path in paths {
73
78
pb. set_message ( format ! ( "Indexing {}" , path. display( ) ) ) ;
74
79
let metadata = fs:: metadata ( & path) ?;
75
80
let modified = metadata. modified ( ) ?. duration_since ( UNIX_EPOCH ) ?. as_secs ( ) ;
76
81
let file_str = path. to_string_lossy ( ) . to_string ( ) ;
77
82
78
- // Skip if the file hasn't changed
83
+ // Skip if file unchanged
79
84
if let Some ( prev) = state. get_last_modified ( & file_str) {
80
85
if prev == modified {
81
86
pb. inc ( 1 ) ;
@@ -84,30 +89,50 @@ pub async fn handle_index(client: &Client, path: &Path) -> anyhow::Result<()> {
84
89
}
85
90
86
91
let content = fs:: read_to_string ( & path) ?;
87
- let chunks = crate :: utils:: chunk_text ( & content) ;
92
+ let chunks = utils:: chunk_text ( & content) ;
88
93
let prev_chunks = state. get_file_chunks ( & file_str) . cloned ( ) . unwrap_or_default ( ) ;
89
- let mut new_chunks = vec ! [ ] ;
94
+ let mut new_chunks = Vec :: new ( ) ;
95
+ let mut chunk_info = Vec :: new ( ) ;
90
96
91
97
for chunk in & chunks {
92
98
if chunk. trim ( ) . is_empty ( ) || chunk. len ( ) > 100_000 {
93
99
continue ;
94
100
}
95
101
96
- let hash = crate :: state :: IndexState :: hash_chunk ( chunk) ;
97
- if crate :: state :: IndexState :: has_chunk ( & prev_chunks, & hash) {
102
+ let hash = IndexState :: hash_chunk ( chunk) ;
103
+ if IndexState :: has_chunk ( & prev_chunks, & hash) {
98
104
continue ;
99
105
}
100
106
101
- let id = Uuid :: new_v4 ( ) . to_string ( ) ;
102
- let embedding = crate :: embedding:: get_embedding ( client, chunk) . await ?;
103
- crate :: chroma:: send_to_chroma ( client, & id, chunk, & embedding, & path, & pb) . await ?;
107
+ chunk_info. push ( ( chunk. clone ( ) , hash) ) ;
108
+ }
109
+
110
+ for batch in chunk_info. chunks ( BATCH_SIZE ) {
111
+ let mut tasks = FuturesUnordered :: new ( ) ;
112
+
113
+ for ( chunk, hash) in batch. iter ( ) . cloned ( ) {
114
+ let client = client. clone ( ) ;
115
+ let path = path. to_path_buf ( ) ;
116
+ let pb = pb. clone ( ) ;
117
+ tasks. push ( async move {
118
+ sleep ( Duration :: from_millis ( 100 ) ) . await ;
119
+ let embedding = embedding:: get_embedding ( & client, & chunk) . await ?;
120
+ let id = Uuid :: new_v4 ( ) . to_string ( ) ;
121
+ chroma:: send_to_chroma ( & client, & id, & chunk, & embedding, & path, & pb) . await ?;
122
+ Ok :: < _ , anyhow:: Error > ( IndexedChunk { id, hash } )
123
+ } ) ;
124
+ }
104
125
105
- new_chunks. push ( crate :: state:: IndexedChunk { id, hash } ) ;
126
+ while let Some ( result) = tasks. next ( ) . await {
127
+ if let Ok ( chunk) = result {
128
+ new_chunks. push ( chunk) ;
129
+ }
130
+ }
106
131
}
107
132
108
133
if !new_chunks. is_empty ( ) {
109
134
let mut updated_chunks = prev_chunks. clone ( ) ;
110
- let mut removed_chunks = vec ! [ ] ;
135
+ let mut removed_chunks = Vec :: new ( ) ;
111
136
112
137
updated_chunks. retain ( |c| {
113
138
let keep = new_chunks. iter ( ) . all ( |n| n. hash != c. hash ) ;
@@ -121,7 +146,7 @@ pub async fn handle_index(client: &Client, path: &Path) -> anyhow::Result<()> {
121
146
state. update_file_chunks ( & file_str, updated_chunks, modified) ;
122
147
123
148
for chunk in removed_chunks {
124
- crate :: chroma:: delete_chunk ( client, & chunk. id ) . await ?;
149
+ chroma:: delete_chunk ( client, & chunk. id ) . await ?;
125
150
}
126
151
}
127
152
0 commit comments