@@ -849,31 +849,31 @@ class NodesMonitor extends EventEmitter {
849
849
} ) ;
850
850
}
851
851
852
- _run_node ( item ) {
853
- if ( ! this . _started ) return P . reject ( new Error ( 'monitor has not started' ) ) ;
852
+ async _run_node ( item ) {
853
+ if ( ! this . _started ) throw new Error ( 'monitor has not started' ) ;
854
854
item . _run_node_serial = item . _run_node_serial || new semaphore . Semaphore ( 1 ) ;
855
- if ( item . node . deleted ) return P . reject ( new Error ( `node ${ item . node . name } is deleted` ) ) ;
856
- return item . _run_node_serial . surround ( ( ) =>
857
- P . resolve ( )
858
- . then ( ( ) => dbg . log1 ( '_run_node:' , item . node . name ) )
859
- . then ( ( ) => this . _get_agent_info ( item ) )
860
- . then ( ( ) => { //If internal or cloud resource, cut down initializing time (in update_rpc_config)
861
- if ( ! item . node_from_store && ( item . node . is_mongo_node || item . node . is_cloud_node ) ) {
862
- return this . _update_nodes_store ( 'force' ) ;
863
- }
864
- } )
865
- . then ( ( ) => this . _uninstall_deleting_node ( item ) )
866
- . then ( ( ) => this . _remove_hideable_nodes ( item ) )
867
- . then ( ( ) => this . _update_node_service ( item ) )
868
- . then ( ( ) => this . _update_create_node_token ( item ) )
869
- . then ( ( ) => this . _update_rpc_config ( item ) )
870
- . then ( ( ) => this . _test_nodes_validity ( item ) )
871
- . then ( ( ) => this . _update_status ( item ) )
872
- . then ( ( ) => this . _handle_issues ( item ) )
873
- . then ( ( ) => this . _update_nodes_store ( ) )
874
- . catch ( err => {
855
+ if ( item . node . deleted ) throw new Error ( `node ${ item . node . name } is deleted` ) ;
856
+ return item . _run_node_serial . surround ( async ( ) => {
857
+ dbg . log1 ( '_run_node:' , item . node . name ) ;
858
+ await this . _get_agent_info ( item ) ;
859
+ //If internal or cloud resource, cut down initializing time (in update_rpc_config)
860
+ if ( ! item . node_from_store && ( item . node . is_mongo_node || item . node . is_cloud_node ) ) {
861
+ return this . _update_nodes_store ( 'force' ) ;
862
+ }
863
+ try {
864
+ await this . _uninstall_deleting_node ( item ) ;
865
+ this . _remove_hideable_nodes ( item ) ;
866
+ await this . _update_node_service ( item ) ;
867
+ await this . _update_create_node_token ( item ) ;
868
+ await this . _update_rpc_config ( item ) ;
869
+ await this . _test_nodes_validity ( item ) ;
870
+ this . _update_status ( item ) ;
871
+ this . _handle_issues ( item ) ;
872
+ await this . _update_nodes_store ( ) ;
873
+ } catch ( err ) {
875
874
dbg . warn ( '_run_node: ERROR' , err . stack || err , 'node' , item . node ) ;
876
- } ) ) ;
875
+ }
876
+ } ) ;
877
877
}
878
878
879
879
_handle_issues ( item ) {
@@ -1681,71 +1681,71 @@ class NodesMonitor extends EventEmitter {
1681
1681
// This is why we are required to use a new variable by the name ready_to_be_deleted
1682
1682
// In order to mark the nodes that wait for their processes to be removed (cloud/mongo resource)
1683
1683
// If the node is not relevant to a cloud/mongo resouce it will be just marked as deleted
1684
- _update_deleted_nodes ( deleted_nodes ) {
1684
+ async _update_deleted_nodes ( deleted_nodes ) {
1685
1685
if ( ! deleted_nodes . length ) return ;
1686
1686
const items_to_update = [ ] ;
1687
- return P . map_with_concurrency ( 10 , deleted_nodes , item => {
1688
- dbg . log0 ( '_update_nodes_store deleted_node:' , item ) ;
1689
1687
1690
- if ( item . node . deleted ) {
1691
- if ( ! item . node_from_store . deleted ) {
1692
- items_to_update . push ( item ) ;
1693
- }
1694
- return ;
1695
- }
1688
+ await P . map_with_concurrency ( 10 , deleted_nodes , async item => {
1689
+ dbg . log0 ( '_update_nodes_store deleted_node:' , item ) ;
1696
1690
1697
- // TODO handle deletion of normal nodes (uninstall?)
1698
- // Just mark the node as deleted and we will not scan it anymore
1699
- // This is done once the node's proccess is deleted (relevant to cloud/mongo resource)
1700
- // Or in a normal node it is done immediately
1701
- if ( ! item . node . is_cloud_node &&
1702
- ! item . node . is_mongo_node &&
1703
- ! item . node . is_internal_node ) {
1704
- item . node . deleted = Date . now ( ) ;
1691
+ if ( item . node . deleted ) {
1692
+ if ( ! item . node_from_store . deleted ) {
1705
1693
items_to_update . push ( item ) ;
1706
- return ;
1707
1694
}
1695
+ return ;
1696
+ }
1708
1697
1709
- return P . resolve ( )
1710
- . then ( ( ) => {
1711
- if ( item . node . is_internal_node ) {
1712
- return P . reject ( 'Do not support internal_node deletion yet' ) ;
1713
- }
1714
- // Removing the internal node from the processes
1715
- return server_rpc . client . hosted_agents . remove_pool_agent ( {
1716
- node_name : item . node . name
1717
- } ) ;
1718
- } )
1719
- . then ( ( ) => {
1720
- // Marking the node as deleted since we've removed it completely
1721
- // If we did not succeed at removing the process we don't mark the deletion
1722
- // This is done in order to cycle the node once again and attempt until
1723
- // We succeed
1724
- item . node . deleted = Date . now ( ) ;
1725
- items_to_update . push ( item ) ;
1726
- } )
1727
- . catch ( err => {
1728
- // We will just wait another cycle and attempt to delete it fully again
1729
- dbg . warn ( 'delete_cloud_or_mongo_pool_node ERROR node' , item . node , err ) ;
1730
- } ) ;
1731
- } )
1732
- . then ( ( ) => NodesStore . instance ( ) . bulk_update ( items_to_update ) )
1733
- . then ( res => {
1734
- // mark failed updates to retry
1735
- if ( res . failed ) {
1736
- for ( const item of res . failed ) {
1737
- this . _set_need_update . add ( item ) ;
1738
- }
1698
+ // TODO handle deletion of normal nodes (uninstall?)
1699
+ // Just mark the node as deleted and we will not scan it anymore
1700
+ // This is done once the node's proccess is deleted (relevant to cloud/mongo resource)
1701
+ // Or in a normal node it is done immediately
1702
+ if ( ! item . node . is_cloud_node &&
1703
+ ! item . node . is_mongo_node &&
1704
+ ! item . node . is_internal_node ) {
1705
+ item . node . deleted = Date . now ( ) ;
1706
+ items_to_update . push ( item ) ;
1707
+ return ;
1708
+ }
1709
+
1710
+ try {
1711
+ if ( item . node . is_internal_node ) {
1712
+ throw new Error ( 'Do not support internal_node deletion yet' ) ;
1739
1713
}
1740
- if ( res . updated ) {
1741
- for ( const item of res . updated ) {
1742
- this . _remove_node_from_maps ( item ) ;
1743
- }
1714
+ // Removing the internal node from the processes
1715
+ await server_rpc . client . hosted_agents . remove_pool_agent ( {
1716
+ node_name : item . node . name
1717
+ } ) ;
1718
+
1719
+ // Marking the node as deleted since we've removed it completely
1720
+ // If we did not succeed at removing the process we don't mark the deletion
1721
+ // This is done in order to cycle the node once again and attempt until
1722
+ // We succeed
1723
+ item . node . deleted = Date . now ( ) ;
1724
+ items_to_update . push ( item ) ;
1725
+
1726
+ } catch ( err ) {
1727
+ // We will just wait another cycle and attempt to delete it fully again
1728
+ dbg . warn ( 'delete_cloud_or_mongo_pool_node ERROR node' , item . node , err ) ;
1729
+ }
1730
+ } ) ;
1731
+
1732
+ try {
1733
+ const res = NodesStore . instance ( ) . bulk_update ( items_to_update ) ;
1734
+
1735
+ // mark failed updates to retry
1736
+ if ( res . failed ) {
1737
+ for ( const item of res . failed ) {
1738
+ this . _set_need_update . add ( item ) ;
1744
1739
}
1745
- } )
1746
- . catch ( err => {
1747
- dbg . warn ( '_update_deleted_nodes: ERROR' , err . stack || err ) ;
1748
- } ) ;
1740
+ }
1741
+ if ( res . updated ) {
1742
+ for ( const item of res . updated ) {
1743
+ this . _remove_node_from_maps ( item ) ;
1744
+ }
1745
+ }
1746
+ } catch ( err ) {
1747
+ dbg . warn ( '_update_deleted_nodes: ERROR' , err . stack || err ) ;
1748
+ }
1749
1749
}
1750
1750
1751
1751
_should_enable_agent ( info , agent_config ) {
0 commit comments