Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 65 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,78 +25,90 @@ pip install bloombee
```bash
git clone https://github.com/ai-decentralized/BloomBee.git
cd BloomBee
python3 -m venv bloombee-venv
source bloombee-venv/bin/activate
pip install -e .

pip install pynvml
pip install attrs
```
If you are using Hivemind (required for BloomBee setup), please install this as well:
Create and activate an environment (either one):

```bash
# Using venv
python3 -m venv bloombee-venv && source bloombee-venv/bin/activate

# OR using conda (recommended)
conda create -n bloombee python=3.10.16 && conda activate bloombee
```
git clone https://github.com/learning-at-home/hivemind
cd hivemind
pip install -e .

Then install:

```bash
pip install -e .
```
## How to use BloomBee(<a href="https://colab.research.google.com/drive/1pENMOEoEV01DqBImZzuX_4jTV3fNwNga#scrollTo=oyCFDemCZsRs">Try now in Colab</a>)
#### 1. Start the main server
```
python -m bloombee.cli.run_dht --host_maddrs /ip4/0.0.0.0/tcp/31340 --identity_path bootstrapp1.id

#### 1. Start the main server
Start the DHT main node:
```bash
python -m bloombee.cli.run_dht --host_maddrs /ip4/0.0.0.0/tcp/31340 --identity_path bootstrapp1.id
````

After running, you will see output similar to:

```
Now you will get the BloomBee's main server location:
[INFO] Running a DHT instance. To connect other peers to this one, use:
--initial_peers /ip4/10.0.4.215/tcp/31340/p2p/QmZtZJwF8G2qspQxEVxXfipV4fR7EgpfnkXdbbzaEooaVf
```
Mon 00 01:23:45.678 [INFO] Running a DHT instance. To connect other peers to this one, use --initial_peers /ip4/YOUR_IP_ADDRESS/tcp/31340/p2p/QmefxzDL1DaJ7TcrZjLuz7Xs9sUVKpufyg7f5276ZHFjbQ
```
You can provide this address as --initial_peers to workers or other backbone servers.

If you want your swarm to be accessible outside of your local network, ensure that you have a **public IP address** or set up **port forwarding** correctly, so that your peer is reachable from the outside.
Copy **your own** full address (including the `/p2p/...` part).
Each DHT node generates a unique Peer ID, so do **not** copy the example above.

#### 2. Connect the workers to the main bloombee server
Here is the BloomBee Server location:
```
export BBSERVER=/ip4/10.52.2.249/tcp/31340/p2p/QmefxzDL1DaJ7TcrZjLuz7Xs9sUVKpufyg7f5276ZHFjbQ
You can provide this address as `--initial_peers` to connect workers or other backbone servers.

```
To setup the workers, connect to the GPUs being used (If using remote SSH to instance):
```
chmod 400 ~/.ssh/<YOURKEYPAIR>.pem
ssh -i ~/.ssh/<YOURKEYPAIR.pem cc@<FLOATING IP>
```
Next, make sure that the workers are fully set up in the BloomBee environment.
```
git clone https://github.com/ai-decentralized/BloomBee.git
cd BloomBee
python3 -m venv bloombee-venv
source bloombee-venv/bin/activate
pip install -e .
> 💡 **Tip:**
> If you want your swarm to be accessible outside of your local network,
> ensure you have a **public IP address** or set up **port forwarding** correctly.

pip install pynvml
pip install attrs
---

git clone https://github.com/learning-at-home/hivemind
cd hivemind
pip install -e .
```
Start one worker to hold 16 blocks (16 tranformer layers)
```
python -m bloombee.cli.run_server huggyllama/llama-7b --initial_peers $BBSERVER --num_blocks 16 --identity_path bootstrap_1.id
```
Start second worker to hold another 16 blocks (16 tranformer layers)
#### 2. Connect the workers to the main BloomBee server

Set your main server address (replace with your actual output from step 1):

```bash
export BBSERVER=/ip4/10.0.4.215/tcp/31340/p2p/QmZtZJwF8G2qspQxEVxXfipV4fR7EgpfnkXdbbzaEooaVf
```
python -m bloombee.cli.run_server huggyllama/llama-7b --initial_peers $BBSERVER --num_blocks 16 --identity_path bootstrap_1.id

Activate the BloomBee environment on each worker
(you can reuse the environment created in **From Source**).

Each worker should be started **in a separate terminal** (or on a separate node)
after activating its environment.

Start the first worker to hold 16 blocks (e.g., 16 transformer layers):

```bash
python -m bloombee.cli.run_server huggyllama/llama-7b \
--initial_peers $BBSERVER --num_blocks 16 --identity_path bootstrap_1.id
```
In case your workers do not run do to IP connection resets, please configure the config files containing the workers' IPs.

If a bitsandbytes error comes up, please use this fix:
Start the second worker in another activated terminal:

```bash
python -m bloombee.cli.run_server huggyllama/llama-7b \
--initial_peers $BBSERVER --num_blocks 16 --identity_path bootstrap_2.id
```
cd ~/BloomBee
rm -rf bitsandbytes

If you encounter network issues (e.g., connection resets),
please verify your worker IP configurations in the relevant config files.

**Optional:** If `bitsandbytes` causes a CUDA version error:

```bash
cd ~
git clone https://github.com/TimDettmers/bitsandbytes.git
cd bitsandbytes
cd bitsandbytes && python setup.py install
```
Make sure to set CUDA versions to the correct library paths if necessary.

Ensure your CUDA library path matches your environment.



#### 3. Run inference or finetune jobs

Expand Down
Empty file modified benchmarks/benchmark_forward.py
100755 → 100644
Empty file.
135 changes: 123 additions & 12 deletions benchmarks/benchmark_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
import multiprocessing as mp
from time import perf_counter
import logging

import numpy as np
import torch
Expand All @@ -14,15 +15,24 @@

logger = get_logger()

# Set logging level to INFO to see all debug messages
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)


def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--model", type=str, required=True, help="Model")
parser.add_argument("--initial_peers", type=str, nargs="+", default=PUBLIC_INITIAL_PEERS, help="Initial peers")
parser.add_argument("--torch_dtype", type=str, default="float32", help="Torch dtype")
parser.add_argument("--n_processes", type=str, default=1, help="Number of concurrent processes")
parser.add_argument("--seq_len", type=int, default=2048, help="Sequence length")
parser.add_argument("--seq_len", type=int, default=2048, help="Number of tokens to generate (generation length)")
parser.add_argument("--prompt_len", type=int, default=None, help="Desired prompt/prefill length in tokens (optional)")
parser.add_argument("--warmup_steps", type=int, default=1, help="Number of warmup steps")
parser.add_argument("--batch_size", type=int, default=1, help="Client batch size (number of sequences to generate in parallel)")
args = parser.parse_args()

if args.n_processes == "n_gpus":
Expand All @@ -45,34 +55,135 @@ def main():
def benchmark_inference(process_idx, args, result_pipe):
tokenizer = AutoTokenizer.from_pretrained(args.model, use_fast=False)
# Using use_fast=False since LlamaTokenizerFast takes a long time to start, and we decode 1 token at a time anyway

# Set pad_token for LLaMA tokenizer (required for batch padding)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
logger.info(f"[DEBUG] Set pad_token to eos_token: {tokenizer.pad_token}")

model = AutoDistributedModelForCausalLM.from_pretrained(
args.model, initial_peers=args.initial_peers, torch_dtype=DTYPE_MAP[args.torch_dtype]
args.model, initial_peers=args.initial_peers, torch_dtype=DTYPE_MAP[args.torch_dtype],
use_server_to_server=True # Explicitly enable server-to-server communication
)
logger.info(f"Created model: {process_idx=} {model.device=}")

test_prompt = ""
input_ids = tokenizer.encode(test_prompt, return_tensors="pt", add_special_tokens=True)
# Prepare batch of prompts for benchmarking
batch_size = getattr(args, 'batch_size', 1)

# Create different prompts for each batch to verify independent generation
if batch_size == 1:
prompts = [""]
elif batch_size == 2:
prompts = ["Once upon a time", "In a galaxy far away"]
elif batch_size == 3:
prompts = ["Once upon a time", "In a galaxy far away", "The quick brown fox"]
else:
base_prompt = (
"Quantum mechanics explains the behavior of particles at very small scales. "
"Neural networks learn patterns by adjusting weights through backpropagation. "
"Distributed systems require robust consensus mechanisms to maintain state. "
"Optimization algorithms like gradient descent are fundamental to machine learning. "
"Transformer architectures rely on attention mechanisms to capture dependencies. "
"Reinforcement learning optimizes actions by maximizing cumulative rewards. "
"Bayesian inference updates beliefs based on observed evidence and prior knowledge. "
"Convex optimization problems guarantee global minima under certain conditions. "
"Signal processing extracts meaningful information from noisy measurements. "
)
prompts = [
f"{base_prompt} Example {i + 1} discusses large-scale AI systems and scientific discovery."
for i in range(batch_size)
]

if args.prompt_len is None:
encodings = tokenizer(prompts, return_tensors="pt", padding=True, add_special_tokens=True)
input_ids = encodings["input_ids"]
else:
target_prompt_length = args.prompt_len
bos_token_id = tokenizer.bos_token_id
filler_sentence = (
" Advanced research explores interdisciplinary insights, collaborative innovation, "
"scientific computation, trustworthy deployment, and sustainable engineering practices."
)
filler_tokens = tokenizer(filler_sentence, add_special_tokens=False)["input_ids"]
if not filler_tokens:
filler_tokens = [tokenizer.eos_token_id or tokenizer.pad_token_id or 0]
processed = []
for prompt in prompts:
prompt_tokens = tokenizer(prompt, add_special_tokens=False)["input_ids"]
if bos_token_id is not None:
full_tokens = [bos_token_id] + prompt_tokens
else:
full_tokens = prompt_tokens[:]
if len(full_tokens) >= target_prompt_length:
full_tokens = full_tokens[:target_prompt_length]
else:
while len(full_tokens) < target_prompt_length:
need = target_prompt_length - len(full_tokens)
full_tokens.extend(filler_tokens[:need])
processed.append(full_tokens)
input_ids = torch.tensor(processed, dtype=torch.long)

logger.info(f"[DEBUG] {process_idx=} Client batch_size={batch_size}, input_ids.shape={input_ids.shape}")
for i, prompt in enumerate(prompts):
logger.info(f"[DEBUG] {process_idx=} batch[{i}] prompt: '{prompt}' (token_ids: {input_ids[i].tolist()})")
temp_result_tokens = input_ids

# Calculate max_length: prompt_length + number of tokens to generate
prompt_length = input_ids.shape[1]
if args.prompt_len is not None:
target_prompt_length = args.prompt_len
pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id
if target_prompt_length < prompt_length:
input_ids = input_ids[:, :target_prompt_length]
elif target_prompt_length > prompt_length:
extra = target_prompt_length - prompt_length
pad_block = torch.full((batch_size, extra), pad_token_id, dtype=input_ids.dtype)
input_ids = torch.cat([input_ids, pad_block], dim=1)
prompt_length = target_prompt_length
temp_result_tokens = input_ids
logger.info(f"[DEBUG] {process_idx=} adjusted prompt_length to {prompt_length} tokens")

total_max_length = prompt_length + args.seq_len
logger.info(f"[DEBUG] {process_idx=} prompt_length={prompt_length}, generating {args.seq_len} tokens, total_max_length={total_max_length}")

step_times = []

with model.transformer.h.inference_session(max_length=args.seq_len) as sess:
with model.transformer.h.inference_session(max_length=total_max_length) as sess:
logger.info(f"[DEBUG] {process_idx=} Created inference session with max_length={total_max_length}")
logger.info(f"[BENCHMARK_START] Process={process_idx} | BatchSize={batch_size} | SeqLen={args.seq_len}")

for step in range(args.seq_len):
start_time = perf_counter()
step_start_time = perf_counter()

# For the first step, pass input_ids; for subsequent steps, generate() will use session state
if step == 0:
logger.info(f"[DEBUG] {process_idx=} {step=} First step, passing input_ids.shape={input_ids.shape}")
outputs = model.generate(input_ids, max_new_tokens=1, session=sess)
else:
outputs = model.generate(max_new_tokens=1, session=sess)

# Log generated tokens for all sequences in the batch
for batch_idx in range(outputs.shape[0]):
new_token_id = outputs[batch_idx][-1].item()
new_token_text = tokenizer.decode([new_token_id])
logger.info(f"[DEBUG] {process_idx=} {step=} batch[{batch_idx}] Generated token: '{new_token_text}' (id={new_token_id})")

outputs = model.generate(max_new_tokens=1, session=sess)
new_token_id = outputs[0][-1].item()
new_token_text = tokenizer.decode([new_token_id])
temp_result_tokens = torch.cat([temp_result_tokens, outputs[:, -1:]], dim=1)

if step >= args.warmup_steps:
step_times.append(perf_counter() - start_time)
step_times.append(perf_counter() - step_start_time)
speed = 1 / np.mean(step_times)
logger.info(f"{process_idx=} {step=} {speed=:.2f}")
# Report speed per sequence (total tokens / time)
effective_speed = speed * batch_size
logger.info(f"{process_idx=} {step=} {speed=:.2f} tokens/sec/sequence, effective={effective_speed:.2f} tokens/sec")

# Show final generated text for each batch
for batch_idx in range(temp_result_tokens.shape[0]):
full_text = tokenizer.decode(temp_result_tokens[batch_idx], skip_special_tokens=True)
logger.info(f"\nbatch[{batch_idx}] Full generated text:\n{full_text}\n")

result_pipe.send(speed)


if __name__ == "__main__":
main()
main()
Empty file modified benchmarks/benchmark_training.py
100755 → 100644
Empty file.
Binary file removed bootstrapP100_7.id
Binary file not shown.
Binary file removed bootstrap_1.id
Binary file not shown.
Binary file removed bootstrapp1.id
Binary file not shown.
Binary file removed bootstrapp100.id
Binary file not shown.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ packages = find:
python_requires = >=3.8
install_requires =
torch>=1.12
bitsandbytes==0.46.0
bitsandbytes==0.41.0
accelerate>=0.27.2
huggingface-hub>=0.11.1,<1.0.0
tokenizers>=0.13.3
Expand Down
8 changes: 5 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_version():
"tokenizers>=0.13.3",
"transformers==4.43.1",
"speedtest-cli==2.1.3",
"hivemind",
"hivemind @ git+https://github.com/learning-at-home/hivemind.git@213bff98a62accb91f254e2afdccbf1d69ebdea9",
"tensor_parallel==1.0.23",
"humanfriendly",
"async-timeout>=4.0.2",
Expand All @@ -63,8 +63,10 @@ def get_version():
"sentencepiece>=0.1.99",
"peft==0.8.2",
"safetensors>=0.3.1",
"Dijkstar>=2.6.0",
"numpy<2",
"Dijkstar>=2.6.0",
"numpy<2",
"attrs",
"nvidia-ml-py"
],
extras_require={
"dev": [
Expand Down
Empty file modified src/bloombee/cli/run_prod_server.sh
100755 → 100644
Empty file.
3 changes: 3 additions & 0 deletions src/bloombee/cli/run_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def main():
parser.add_argument('--inference_max_length', type=int, default=None,
help='Maximum total sequence length permitted per inference, defaults to 16384 tokens. '
'Default: 8192 for models with multi-query attention (based on Llama 2, Falcon), 2048 for others')
parser.add_argument('--batch_size', type=int, default=1,
help='Number of sequences to process in parallel (GPU batch size). '
'Default: 1 (no batching). Higher values improve throughput but use more memory.')
parser.add_argument('--min_batch_size', type=int, default=1,
help='Minimum required batch size for all operations (in total tokens)')
parser.add_argument('--max_batch_size', type=int, default=None,
Expand Down
4 changes: 2 additions & 2 deletions src/bloombee/client/inference_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def step(
request_metadata.update(self.session_metadata)
if self._position is not None:
request_metadata["start_from_position"] = self._position
elif self.config.use_server_to_server:
# Enable server-to-server communication to trigger CROSS_GPU_TRANSFER
if self.config.use_server_to_server:
next_servers = self._collect_next_servers()
if next_servers:
request_metadata["next_servers"] = next_servers
Expand Down Expand Up @@ -372,7 +373,6 @@ def step( # 执行一次推理步骤,处理输入数据和相应的提示与
self._position += n_input_tokens
# print(f"lient inference session outputs, inputs: {inputs}")
outputs = inputs

outputs = outputs.to(device=inputs_device, dtype=inputs_dtype)
# print('client inference session outputs ', outputs.shape)
return outputs
Expand Down
8 changes: 6 additions & 2 deletions src/bloombee/flexgen_utils/ExecutionEnv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ class ExecutionEnv:
mixed: Any = None

@classmethod
def create(cls, offload_dir):
gpu = TorchDevice("cuda:0")
def create(cls, offload_dir, device_type="cuda"):
# For CPU-only mode, use CPU for all devices including 'gpu' slot
if device_type == "cpu":
gpu = TorchDevice("cpu") # Use CPU for the 'gpu' slot
else:
gpu = TorchDevice("cuda:0")
print('ExecutionEnv: gpu ', gpu)
cpu = TorchDevice("cpu")
disk = TorchDisk(offload_dir)
Expand Down
Loading