18
18
import torch .distributed as dist
19
19
20
20
from internlm .accelerator import get_accelerator
21
- from internlm .core .context .process_group_initializer_simplified import Initializer , ParallelMeta
22
21
from internlm .utils .common import SingletonMeta
23
22
from internlm .utils .logger import get_logger
24
23
from internlm .utils .timeout import LLM_NCCL_TIMEOUT
25
24
26
25
from . import process_group_initializer as pgroup_initializer
27
- from .process_group_initializer_simplified import ParallelMode
26
+ from .process_group_initializer import ParallelMode
28
27
from .random import add_seed , get_seeds , set_mode
29
28
from internlm .utils .common import get_args
30
29
@@ -422,20 +421,6 @@ def init_global_dist(
422
421
use_cpu (bool): whether to set up cpu process group.
423
422
"""
424
423
425
- # find cluster info
426
- if "clusters" not in self .config :
427
- nv_info = {
428
- "rank_range" : [0 , 8 ],
429
- "peak_tflops" : 320 ,
430
- "capacity" : 80 * 1024 ** 3 ,
431
- "intra_bw" : 150 ,
432
- "inter_bw" : 100 ,
433
- }
434
- self .set_cluster_info ("nv_cluster" , nv_info )
435
- else :
436
- for cluster in self .config .clusters :
437
- self .clusters .append (ClusterInfo (** cluster ))
438
-
439
424
# initialize the default process group
440
425
if not fake_mode :
441
426
init_method = f"tcp://[{ host } ]:{ port } "
@@ -667,7 +652,7 @@ def _init_pg(self, rank, world_size, parallel_config):
667
652
initializers .append (pgroup_initializer .Initializer_ISP_Data (* initializer_args ))
668
653
if (
669
654
isinstance (parallel_config ["tensor" ], dict )
670
- and parallel_config ["tensor" ]["mode" ] == TensorParallelMode . isp . name
655
+ and parallel_config ["tensor" ]["mode" ] == " isp"
671
656
):
672
657
initializers .append (pgroup_initializer .Initializer_Zero1_ISP (* initializer_args ))
673
658
else :
@@ -688,6 +673,8 @@ def _init_pg(self, rank, world_size, parallel_config):
688
673
self ._register_dist (* parallel_setting )
689
674
690
675
def _init_use_simplified_pg (self , rank , world_size , parallel_config ):
676
+ from internlm .core .context .process_group_initializer_simplified import InitializerParallelMeta
677
+
691
678
try :
692
679
self .tensor_mode = parallel_config ["tensor" ]["mode" ]
693
680
except AttributeError :
@@ -861,14 +848,14 @@ def check_pg_is_intra(self, parallel_mode: ParallelMode):
861
848
return (max_rank - min_rank ) <= 7
862
849
863
850
def same_group_in_one_node (self , parallel_mode : ParallelMode ):
864
- """获得一个节点内有多少个相同类型的PG, 在跨节点通信时会存在带宽竞争
865
- 这里返回的相同PG的数量会乘上每个rank的通信数据量大小
851
+ """Get the number of the same type of PG within a node. There will be bandwidth competition during cross-node communication.
852
+ The number of the same PG returned here will be multiplied by the communication data size of each rank.
866
853
867
854
Args:
868
855
parallel_mode (ParallelMode):
869
856
870
857
Returns:
871
- int: 一个节点内相同类型的PG的数量
858
+ int: The number of the same type of PG within a node.
872
859
"""
873
860
pg_group_ranks = self .get_ranks_in_group (parallel_mode )
874
861
pg_group_ranks = sorted (pg_group_ranks )
@@ -881,68 +868,7 @@ def same_group_in_one_node(self, parallel_mode: ParallelMode):
881
868
else :
882
869
return stride
883
870
884
- # def set_cluster_info(self, name: str, info: dict):
885
- # self.clusters[name] = ClusterInfo(**info)
886
-
887
- def get_cluster_info (self , name : str ):
888
- return self .clusters [name ]
889
-
890
- def get_cluster_name_from_ip (self ):
891
- """
892
- node_ip_list = [
893
- 'metax-c500-1',
894
- 'metax-c500-2',
895
- 'nvidia-node-1',
896
- 'nvidia-node-2',
897
- ]
898
- """
899
- hostname = socket .gethostname ()
900
- cluster_name = hostname .split ("-" )[0 ]
901
- return cluster_name
902
-
903
- def sort_rank_based_on_ip_and_capacity (self ):
904
- Capacity = []
905
-
906
- def sort_rank (x , y ):
907
- x_name = self .get_cluster_name_from_ip (x )
908
- y_name = self .get_cluster_name_from_ip (y )
909
- if x_name == y_name :
910
- return x_name > y_name
911
- else :
912
- x_c = self .clusters [x_name ]["capacity" ]
913
- y_c = self .clusters [y_name ]["capacity" ]
914
- return x_c > y_c
915
871
916
- for cluster_name , cluster_info in self .clusters .items ():
917
- peak_tflops .append (cluster_info ["peak_tflops" ])
918
- # Alpha.append(cluster_info.rank_range[-1] - cluster_info.rank_range[-1] + 1)
919
- Capacity .append (cluster_info ["capacity" ])
920
-
921
- def switch_topology_aware_rank_scheduling ():
922
- """
923
- Switch topology-aware rank scheduling can optimize the performance of small-scale
924
- collective communications. Currently only supported in Alibaba Cloud.
925
- """
926
-
927
- local_rank = int (os .environ ["LOCAL_RANK" ])
928
- cluster_name = get_cluster_name_from_ip ()
929
-
930
- try :
931
- if cluster_name == "Ali" :
932
- pass
933
- else :
934
- rank = int (os .environ ["MLP_WORKER_RACK_RANK_INDEX" ]) * 8 + local_rank
935
- except Exception as e :
936
- logger .error (
937
- f"The switch topology awareness error is reported, the reason is: { e } " ,
938
- "but don’t worry, this error will not affect normal training." ,
939
- "If you train on Alibaba or Volcano Cloud, please contact wangguoteng or lijiaxing" ,
940
- )
941
- else :
942
- # If there is no any error, hack torch rank.
943
- os .environ ["RANK" ] = str (rank )
944
- if local_rank == 0 :
945
- logger .info ("Successfully bound node switch affinity!" )
946
872
947
873
948
874
global_context = ParallelContext ()
0 commit comments