9
9
import logging
10
10
import os
11
11
import pathlib
12
+ import typing as t
12
13
from typing import Literal , Optional , Union
13
14
14
15
import kaptan
15
16
16
- from libvcs .projects .git import GitRemote
17
+ from libvcs ._internal .types import StrPath
18
+ from libvcs .sync .git import GitRemote
17
19
18
20
from . import exc
21
+ from .types import ConfigDict , RawConfigDict
19
22
from .util import get_config_dir , update_dict
20
23
21
24
log = logging .getLogger (__name__ )
22
25
26
+ if t .TYPE_CHECKING :
27
+ from typing_extensions import TypeGuard
28
+
23
29
24
30
def expand_dir (
25
31
_dir : pathlib .Path , cwd : pathlib .Path = pathlib .Path .cwd ()
@@ -45,7 +51,7 @@ def expand_dir(
45
51
return _dir
46
52
47
53
48
- def extract_repos (config : dict , cwd = pathlib .Path .cwd ()) -> list [dict ]:
54
+ def extract_repos (config : RawConfigDict , cwd = pathlib .Path .cwd ()) -> list [ConfigDict ]:
49
55
"""Return expanded configuration.
50
56
51
57
end-user configuration permit inline configuration shortcuts, expand to
@@ -62,11 +68,11 @@ def extract_repos(config: dict, cwd=pathlib.Path.cwd()) -> list[dict]:
62
68
-------
63
69
list : List of normalized repository information
64
70
"""
65
- configs = []
71
+ configs : list [ ConfigDict ] = []
66
72
for directory , repos in config .items ():
73
+ assert isinstance (repos , dict )
67
74
for repo , repo_data in repos .items ():
68
-
69
- conf = {}
75
+ conf : dict = {}
70
76
71
77
"""
72
78
repo_name: http://myrepo.com/repo.git
@@ -91,21 +97,36 @@ def extract_repos(config: dict, cwd=pathlib.Path.cwd()) -> list[dict]:
91
97
92
98
if "name" not in conf :
93
99
conf ["name" ] = repo
94
- if "parent_dir" not in conf :
95
- conf ["parent_dir" ] = expand_dir (directory , cwd = cwd )
96
-
97
- # repo_dir -> dir in libvcs 0.12.0b25
98
- if "repo_dir" in conf and "dir" not in conf :
99
- conf ["dir" ] = conf .pop ("repo_dir" )
100
100
101
101
if "dir" not in conf :
102
- conf ["dir" ] = expand_dir (conf ["parent_dir" ] / conf ["name" ], cwd )
102
+ conf ["dir" ] = expand_dir (
103
+ pathlib .Path (expand_dir (pathlib .Path (directory ), cwd = cwd ))
104
+ / conf ["name" ],
105
+ cwd ,
106
+ )
103
107
104
108
if "remotes" in conf :
109
+ assert isinstance (conf ["remotes" ], dict )
105
110
for remote_name , url in conf ["remotes" ].items ():
106
- conf ["remotes" ][remote_name ] = GitRemote (
107
- name = remote_name , fetch_url = url , push_url = url
108
- )
111
+ if isinstance (url , GitRemote ):
112
+ continue
113
+ if isinstance (url , str ):
114
+ conf ["remotes" ][remote_name ] = GitRemote (
115
+ name = remote_name , fetch_url = url , push_url = url
116
+ )
117
+ elif isinstance (url , dict ):
118
+ assert "push_url" in url
119
+ assert "fetch_url" in url
120
+ conf ["remotes" ][remote_name ] = GitRemote (
121
+ name = remote_name , ** url
122
+ )
123
+
124
+ def is_valid_config_dict (val : t .Any ) -> "TypeGuard[ConfigDict]" :
125
+ assert isinstance (val , dict )
126
+ return True
127
+
128
+ assert is_valid_config_dict (conf )
129
+
109
130
configs .append (conf )
110
131
111
132
return configs
@@ -192,12 +213,12 @@ def find_config_files(
192
213
configs .extend (find_config_files (path , match , f ))
193
214
else :
194
215
match = f"{ match } .{ filetype } "
195
- configs = path .glob (match )
216
+ configs = list ( path .glob (match ) )
196
217
197
218
return configs
198
219
199
220
200
- def load_configs (files : list [Union [ str , pathlib . Path ] ], cwd = pathlib .Path .cwd ()):
221
+ def load_configs (files : list [StrPath ], cwd = pathlib .Path .cwd ()):
201
222
"""Return repos from a list of files.
202
223
203
224
Parameters
@@ -216,10 +237,11 @@ def load_configs(files: list[Union[str, pathlib.Path]], cwd=pathlib.Path.cwd()):
216
237
----
217
238
Validate scheme, check for duplicate destinations, VCS urls
218
239
"""
219
- repos = []
240
+ repos : list [ ConfigDict ] = []
220
241
for file in files :
221
242
if isinstance (file , str ):
222
243
file = pathlib .Path (file )
244
+ assert isinstance (file , pathlib .Path )
223
245
ext = file .suffix .lstrip ("." )
224
246
conf = kaptan .Kaptan (handler = ext ).import_config (str (file ))
225
247
newrepos = extract_repos (conf .export ("dict" ), cwd = cwd )
@@ -230,51 +252,49 @@ def load_configs(files: list[Union[str, pathlib.Path]], cwd=pathlib.Path.cwd()):
230
252
231
253
dupes = detect_duplicate_repos (repos , newrepos )
232
254
233
- if dupes :
255
+ if len ( dupes ) > 0 :
234
256
msg = ("repos with same path + different VCS detected!" , dupes )
235
257
raise exc .VCSPullException (msg )
236
258
repos .extend (newrepos )
237
259
238
260
return repos
239
261
240
262
241
- def detect_duplicate_repos (repos1 : list [dict ], repos2 : list [dict ]):
263
+ ConfigDictTuple = tuple [ConfigDict , ConfigDict ]
264
+
265
+
266
+ def detect_duplicate_repos (
267
+ config1 : list [ConfigDict ], config2 : list [ConfigDict ]
268
+ ) -> list [ConfigDictTuple ]:
242
269
"""Return duplicate repos dict if repo_dir same and vcs different.
243
270
244
271
Parameters
245
272
----------
246
- repos1 : dict
247
- list of repo expanded dicts
273
+ config1 : list[ConfigDict]
248
274
249
- repos2 : dict
250
- list of repo expanded dicts
275
+ config2 : list[ConfigDict]
251
276
252
277
Returns
253
278
-------
254
- list of dict, or None
255
- Duplicate repos
279
+ list[ConfigDictTuple]
280
+ List of duplicate tuples
256
281
"""
257
- dupes = []
258
- path_dupe_repos = []
282
+ if not config1 :
283
+ return []
259
284
260
- curpaths = [r ["dir" ] for r in repos1 ]
261
- newpaths = [r ["dir" ] for r in repos2 ]
262
- path_duplicates = list (set (curpaths ).intersection (newpaths ))
285
+ dupes : list [ConfigDictTuple ] = []
263
286
264
- if not path_duplicates :
265
- return None
287
+ repo_dirs = {
288
+ pathlib .Path (repo ["dir" ]).parent / repo ["name" ]: repo for repo in config1
289
+ }
290
+ repo_dirs_2 = {
291
+ pathlib .Path (repo ["dir" ]).parent / repo ["name" ]: repo for repo in config2
292
+ }
266
293
267
- path_dupe_repos . extend (
268
- [ r for r in repos2 if any ( r [ "dir" ] == p for p in path_duplicates )]
269
- )
294
+ for repo_dir , repo in repo_dirs . items ():
295
+ if repo_dir in repo_dirs_2 :
296
+ dupes . append (( repo , repo_dirs_2 [ repo_dir ]) )
270
297
271
- if not path_dupe_repos :
272
- return None
273
-
274
- for n in path_dupe_repos :
275
- currepo = next ((r for r in repos1 if r ["dir" ] == n ["dir" ]), None )
276
- if n ["url" ] != currepo ["url" ]:
277
- dupes += (n , currepo )
278
298
return dupes
279
299
280
300
@@ -304,11 +324,11 @@ def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]):
304
324
305
325
306
326
def filter_repos (
307
- config : dict ,
308
- dir : Union [pathlib .Path , None ] = None ,
327
+ config : list [ ConfigDict ] ,
328
+ dir : Union [pathlib .Path , Literal [ "*" ], None ] = None ,
309
329
vcs_url : Union [str , None ] = None ,
310
330
name : Union [str , None ] = None ,
311
- ):
331
+ ) -> list [ ConfigDict ] :
312
332
"""Return a :py:obj:`list` list of repos from (expanded) config file.
313
333
314
334
dir, vcs_url and name all support fnmatch.
@@ -329,23 +349,35 @@ def filter_repos(
329
349
list :
330
350
Repos
331
351
"""
332
- repo_list = []
352
+ repo_list : list [ ConfigDict ] = []
333
353
334
354
if dir :
335
- repo_list .extend ([r for r in config if fnmatch .fnmatch (r ["parent_dir" ], dir )])
355
+ repo_list .extend (
356
+ [
357
+ r
358
+ for r in config
359
+ if fnmatch .fnmatch (str (pathlib .Path (r ["dir" ]).parent ), str (dir ))
360
+ ]
361
+ )
336
362
337
363
if vcs_url :
338
364
repo_list .extend (
339
- r for r in config if fnmatch .fnmatch (r .get ("url" , r .get ("repo" )), vcs_url )
365
+ r
366
+ for r in config
367
+ if fnmatch .fnmatch (str (r .get ("url" , r .get ("repo" ))), vcs_url )
340
368
)
341
369
342
370
if name :
343
- repo_list .extend ([r for r in config if fnmatch .fnmatch (r .get ("name" ), name )])
371
+ repo_list .extend (
372
+ [r for r in config if fnmatch .fnmatch (str (r .get ("name" )), name )]
373
+ )
344
374
345
375
return repo_list
346
376
347
377
348
- def is_config_file (filename : str , extensions : list [str ] = [".yml" , ".yaml" , ".json" ]):
378
+ def is_config_file (
379
+ filename : str , extensions : Union [list [str ], str ] = [".yml" , ".yaml" , ".json" ]
380
+ ):
349
381
"""Return True if file has a valid config file type.
350
382
351
383
Parameters
0 commit comments