-
Notifications
You must be signed in to change notification settings - Fork 0
Home
-
First, follow the instructions provided in the the client
-
Important: Configure the
SERVER_URL
in config.ini file as per your server's address. By default, it is set for local operation.
-
Important: Configure the
-
Then install the sever (if you configure laminar locally (e.g. laptop), do this in another terminal tab), using the sever instructions.
- Recommendation: Use the Dockerized option for installation.
- Note: Ensure that the server’s is running continuously.
-
Finally install the execution-engine (if you configure laminar locally (e.g. laptop), do this in another terminal tab), using the execution engine.
- Recommendation: Use the Dockerized option for installation.
- Note: Ensure that the engine is running continuously.
-
Important: Configure the
multi
anddynamic
mappings (e.g number of processes, redis_ip, redis_port) in the config.ini file as per your requirements.
To ensure smooth operation of the Laminar framework, it is essential to keep both the server and the execution engine running continuously.
We recommend running the server and execution engine in two separate terminal tabs. In each tab, navigate to the respective directories and start the services using the following command:
docker-compose up
If you need to rebuild the Docker containers (for instance, after making configuration changes), you can do so by following these steps: First, bring down the running containers:
docker-compose down
Then, rebuild and start the containers:
docker-compose up --build
By following these steps, you can ensure that the server and execution engine are properly configured and running efficiently.
To completely delete the registry data, ensure you are working with the server's docker-compose (not the one for the execution engine). Follow these steps:
docker-compose down -v
docker system prune -a
docker-compose up --build
After this process will stop and remove all containers, delete associated volumes, and clean up unused Docker objects such as images and networks. Afterward, the system will rebuild and start fresh, ensuring that all previous data is completely erased and a new, clean environment is initialized.
Laminar has been successfully tested in the following environments:
-
Laptops & Operating Systems:
- macOS
- Ubuntu
-
Virtual Machines (VMs):
- University of St Andrews, School of Computer Science
- EIDF EPCC (University of Edinburgh)
-
Both with and without Dockerized options for the server and execution engine
This section provides a comprehensive overview of the Command Line Interface (CLI) options available in the laminar.py script. These commands allow users to interact with the Laminar framework to manage and execute workflows and Processing Elements (PEs).
First you should to register: python register.py
. Then you should start the cli:
python laminar.py
This will ask you the username and password that you used in the registration steps:
Username: rosa
Password: ****
Welcome to the Laminar CLI
(laminar)
Displays the available commands and their usage.
(laminar) help
Documented commands (type help <topic>):
========================================
code_recommendation quit remove_workflow
describe register_pe run
help register_workflow semantic_search
list remove_all update_pe_description
literal_search remove_pe update_workflow_description
List all the PEs and Workflows of the registry.
(laminar) list
REGISTRY
Result 1: ID: 185
PE Name: IsPrime
Description: A pe to check if the PE has a sequence of words.
Result 2: ID: 186
PE Name: NumberProducer
Description: The number producer class.
Result 3: ID: 187
PE Name: PrintPrime
Description: Print the number of words in the sequence of unique tokens in the sequence of words in the
Result 4: ID: 189
PE Name: CountWords
Description: CountWords - Count the number of words in the sequence.
Result 5: ID: 190
PE Name: SplitLines
Description: SplitLines class.
Result 6: ID: 191
PE Name: SplitWords
Description: SplitWords class.
Result 7: ID: 193
PE Name: AggregateDataPE
Description: AggregateDataPE - Aggregate data from a sequence of readings.
Result 8: ID: 194
PE Name: AlertingPE
Description: AlertingPE class.
Result 9: ID: 195
PE Name: AnomalyDetectionPE
Description: Anomaly detection PE.
Result 10: ID: 196
PE Name: NormalizeDataPE
Description: This pe normalizes the temperature of a record in a PE object.
Result 11: ID: 197
PE Name: ReadSensorDataPE
Description: This pe reads the data from a JSON file and writes it to the output file.
Result 12: ID: 188
Workflow Name: isprime_wf
Description: check if the num is prime or not
Result 13: ID: 192
Workflow Name: wordcount_wf
Description: A workflow that counts the number of words received in the input file.
Result 14: ID: 198
Workflow Name: sensorWorkflow
Description: A workflow that reads the data from a file in the current directory and write the results.
<module_name_1723494432_0.IsPrime object at 0x331bb4e50>
<module_name_1723494432_0.NumberProducer object at 0x331abcb20>
<module_name_1723494432_0.PrintPrime object at 0x331bc1990>
<module_name_1723495120_0.CountWords object at 0x331c30490>
<module_name_1723495120_0.SplitLines object at 0x331c30e50>
<module_name_1723495120_0.SplitWords object at 0x331c30700>
<module_name_1723495129_1.AggregateDataPE object at 0x3309e5f90>
<module_name_1723495129_1.AlertingPE object at 0x3309e7eb0>
<module_name_1723495129_1.AnomalyDetectionPE object at 0x3309e7a30>
<module_name_1723495129_1.NormalizeDataPE object at 0x331aa5630>
<module_name_1723495129_1.ReadSensorDataPE object at 0x331aa7c70>
<dispel4py.workflow_graph.WorkflowGraph object at 0x3309b8b80>
<dispel4py.workflow_graph.WorkflowGraph object at 0x3309d6cb0>
<dispel4py.workflow_graph.WorkflowGraph object at 0x331c64a90>
Registers a new Processing Element (PE) using the specified PE file. If any of the PEs are already registered, the command won't register them again. Important: You need to include all the imports necessary for the PE(s) included in the file.
(laminar) register_pe isprimePE.py
• IsPrime - IsPrime (ID 199)
• NumberProducer - NumberProducer (ID 200)
• PrintPrime - PrintPrime (ID 201)
Registers a new workflow from the specified file. If the workflow or any of its PEs are already registered, the command won't register them again. Important: You need to include all the imports necessary for the workflow (and its PEs) included in the file.
(laminar) register_workflow wordcount_wf.py
Found PEs
• CountWords - type (ID 189)
• SplitLines - type (ID 190)
• SplitWords - type (ID 191)
Found workflows
• wordcount_wf - WorkflowGraph (ID 192)
Provides a detailed description of a specific PE or workflow. If flag --source_code or -sc, is provided, the source code of the workflow or PE will be printed.
(laminar) help describe
It provides the information on PEs or workflow by its name
Usage: describe [identifier] [--source_code | -sc]
(laminar) describe 189 -sc
PE name: CountWords
inputconnections: {'input': {'name': 'input', 'grouping': [0]}}
outputconnections: {'output': {'name': 'output'}}
numprocesses: 1
count: defaultdict(<class 'int'>, {})
PE Source Code:
class CountWords(GenericPE):
def __init__(self):
from collections import defaultdict
GenericPE.__init__(self)
self._add_input("input", grouping=[0])
self._add_output("output")
self.count=defaultdict(int)
def _process(self, inputs):
import os
word, count = inputs['input']
self.count[word] += count
def _postprocess(self):
print(self.count)
self.write('output', self.count)
Removes a PE from the registry.
(laminar) remove_pe 190
Removes a workflow from the registry.
(laminar) remove_workflow 188
Removes all PEs and workflows from the registry.
(laminar) remove_all
Performs a literal search in the registry.
(laminar) help literal_search
Searches the registry for workflows and processing elements matching the search term in the name or description.
Arguments:
search_type Type of items to search for. Choices are:
- 'workflow': Search only for workflows
- 'pe': Search only for processing elements (PEs)
- 'both': Search for both workflows and PEs (default)
search_term The term to search for in the registry.
Usage:
literal_search [workflow|pe|both] [string]
Examples:
literal_search workflow some_term
literal_search pe some_term
literal_search both some_term
(laminar) literal_search both "words"
REGISTRY
Result 1: ID: 189
PE Name: CountWords
Description: CountWords - Count the number of words in the sequence.
Result 2: ID: 191
PE Name: SplitWords
Description: SplitWords class.
Result 3: ID: 199
PE Name: IsPrime
Description: A pe to check if the PE has a sequence of words.
Result 4: ID: 192
Workflow Name: wordcount_wf
Description: A workflow that counts the number of words received in the input file.
[<module_name_1723495120_0.CountWords object at 0x331c16230>, <module_name_1723495120_0.SplitWords object at 0x331c16770>, <module_name_1723495326_3.IsPrime object at 0x331c15c30>, <dispel4py.workflow_graph.WorkflowGraph object at 0x331c14790>]
Performs a semantic search in the registry
(laminar) help semantic_search
Searches the registry for workflows and processing elements matching semantically the search term.
Arguments:
search_type Type of items to search for. Choices are:
- 'workflow': Search only for workflows
- 'pe': Search only for processing elements (PEs)
search_term The term to search for in the registry.
Usage:
semantic_search [workflow|pe] [search_term]
Examples:
semantic_search workflow some_term
semantic_search pe some_term
(laminar) semantic_search pe "a pe that detects anomalies"
Performing similarity search on pe, with query type: text
Encoding query as text...
peId peName description cosine_similarity
5 195 AnomalyDetectionPE Anomaly detection PE. 0.755419
7 199 IsPrime A pe to check if the PE has a sequence of words. 0.419483
4 194 AlertingPE AlertingPE class. 0.388923
3 193 AggregateDataPE AggregateDataPE - Aggregate data from a sequen... 0.221354
6 196 NormalizeDataPE This pe normalizes the temperature of a record... 0.214477
[<module_name_1723495129_1.AnomalyDetectionPE object at 0x3309f41f0>, <module_name_1723495326_3.IsPrime object at 0x3309f5690>, <module_name_1723495129_1.AlertingPE object at 0x3309f7d60>, <module_name_1723495129_1.AggregateDataPE object at 0x3309f4520>, <module_name_1723495129_1.NormalizeDataPE object at 0x3309f4ca0>]
(laminar) describe 195 --source_code
PE name: AnomalyDetectionPE
inputconnections: {'input': {'name': 'input'}}
outputconnections: {'output': {'name': 'output'}}
numprocesses: 1
threshold: 0.1
temperatures: []
PE Source Code:
class AnomalyDetectionPE(IterativePE):
def __init__(self, threshold=0.1):
IterativePE.__init__(self)
self.threshold = threshold
self.temperatures = []
def _process(self, data):
temp = data['normalized_temperature']
self.temperatures.append(temp)
mean_temp = np.mean(self.temperatures)
if abs(temp - mean_temp) > self.threshold:
data['anomaly'] = True
else:
data['anomaly'] = False
return data
Recommends code snippets based on the provided code snippet.
(laminar) help code_recommendation
Provides code recommdations from registered workflows and processing elements matching the code snippet.
Arguments:
search_type Type of items to search for. Choices are:
- 'workflow': Search only for workflows
- 'pe': Search only for processing elements (PEs)
code_snippet The code_snippet to get recommendations from the registry.
Options:
--embedding_type The type of embedding to use. Choices are:
- 'spt': Perform a search based on SPT features
- 'llm': Perform a search based on LLM-generated embeddings
Note: code recommdations for workflows only possible with 'spt' embedding_type
Usage:
semantic_search [workflow|pe] [code_snippet] [--embedding_type llm|spt]
Examples:
code_recommendation pe code_snippet --embedding_type spt
code_recommendation workfkow code_snippet --embedding_type spt
code_recommendation pe code_snippett --embedding_type llm
(laminar) code_recommendation pe "if abs(temp - mean_temp) > self.threshold:"
peId peName description score simlarFunc
0 195 AnomalyDetectionPE Anomaly detection PE. 9.0 def _process(self, data):
3 195 AnomalyDetectionPE Anomaly detection PE. 8.0 def __init__(self, threshold=0.1):
1 189 CountWords CountWords - Count the number of words in the ... 5.0 def __init__(self):
2 194 AlertingPE AlertingPE class. 5.0 def _process(self, data):
[<module_name_1723495129_1.AnomalyDetectionPE object at 0x331c718a0>, <module_name_1723495129_1.AnomalyDetectionPE object at 0x331c733d0>, <module_name_1723495120_0.CountWords object at 0x331c73580>, <module_name_1723495129_1.AlertingPE object at 0x331c717e0>]
Executes a workflow in the specified mode (sequential, multiprocess, redis).
(laminar) help run
Runs a workflow in the registry based on the provided name or ID.
Usage:
run identifier [options]
Options:
identifier Name or ID of the workflow to run
--rawinput Treat input as a raw string instead of evaluating it
-v, --verbose Enable verbose output
-i, --input <data> Input data for the workflow
-r, --resource <resource> Specify resources required by the workflow (can be used multiple times)
--multi Run the workflow in parallel using multiprocessing
--dynamic Run the workflow in parallel using Redis
Examples:
run my_workflow -i '[{"input" : "1,2,3"}]'
run my_workflow -i 100 --dynamic -v
run 123 --input "[{"input" : "1,2,3"}]" --multi --verbose
run my_workflow --dynamic --resource file1.txt --resource file2.txt
(laminar) run 192 -i '[{"input" : "hello hello hello this this is an example example of word word word word count count"}]'
{'CountWords5': {'output': [{'hello': 3, 'this': 2, 'is': 1, 'an': 1, 'example': 2, 'of': 1, 'word': 4, 'count': 2}]}}
(laminar) run 198 -i '[{"input" : "sensor_data_1000.json"}]' --resource sensor_data_1000.json --multi -v
Requested resources: ['sensor_data_1000.json']
File response: 200
{'read': [{'input': 'sensor_data_1000.json'}]}
Processes: {'read6': range(0, 1), 'NormalizeDataPE7': range(1, 3), 'AnomalyDetectionPE8': range(3, 5), 'AlertingPE9': range(5, 7), 'AggregateDataPE10': range(7, 9)}
read6 (rank 0): Processed 1 iteration.
Anomaly detected at 2024-03-22T10:39:00.000: Temperature=29.8743859458, Normalized Temp is 0.746859648645
Anomaly detected at 2024-03-22T12:47:00.000: Temperature=15.8899814834, Normalized Temp is 0.397249537085
Anomaly detected at 2024-03-22T13:03:00.000: Temperature=30.9257854706, Normalized Temp is 0.773144636765
Anomaly detected at 2024-03-22T14:23:00.000: Temperature=15.7627896467, Normalized Temp is 0.3940697411675
Anomaly detected at 2024-03-22T14:55:00.000: Temperature=16.0835392612, Normalized Temp is 0.40208848153
Anomaly detected at 2024-03-22T15:27:00.000: Temperature=13.8187058455, Normalized Temp is 0.34546764613750003
Anomaly detected at 2024-03-22T10:52:00.000: Temperature=25.388832432, Normalized Temp is 0.6347208108
NormalizeDataPE7 (rank 2): Processed 500 iterations.
......
AlertingPE9 (rank 6): Processed 500 iterations.
Average temperature of last 5 readings: 26.24122373222
Average temperature of last 5 readings: 25.25861689112
AggregateDataPE10 (rank 7): Processed 500 iterations.
AggregateDataPE10 (rank 8): Processed 500 iterations.
Update the pe description. It also updates automatically its embeddings.
(laminar) help update_pe_description
Updates the description of a PE by Id
Usage: update_pe_description [pe_id] [new_description]
(laminar) update_pe_description 189 "PE manual description"
Update the pe description. It also updates automatically its embeddings.
(laminar) help update_workflow_description
Updates the description of a workflow by Id
Usage: update_workflow_description [workflow_id] [new_description]
(laminar) update_workflow_description 192 "Workflow manual description"
This section covers how to use the client functions provided by client.py directly in Python scripts or Jupyter notebooks. These functions provide more granular control over workflows and PEs. Here is the current list of client functions:
- register: Creates a new user account
- login: Authenticates an existing user
- register_PE: Adds a new Processing Element (PE) to the registry
- register_Workflow: Adds a new workflow to the registry
- get_PE: Fetches a PE from the registry by its name or ID
- get_Workflow: Fetches a workflow from the registry by its name or ID
- get_PEs_By_Workflow: Retrieves all PEs from the registry linked to a specific workflow
- get_Registry: Fetches all items stored in the registry
- describe: Provides details about a PE or workflow stored in the registry
- search_Registry_Literal: Conducts an exact match search
- search_Registry_Semantic: Conducts a semantic search
- code_Recommendation: Generates a code recommendation
- run: Runs a workflow in sequence
- run_multiprocess: Runs a workflow concurrently
- run_dynamic: Runs a workflow with REDIS
- update_PE_Description: Modifies a PE's description
- update_Workflow_Description: Modifies a workflow's description
- remove_PE: Deletes an existing PE
- remove_Workflow: Deletes an existing workflow
- remove_All: Deletes all PEs and workflows
from dispel4py.base import ProducerPE, IterativePE, ConsumerPE
from dispel4py.workflow_graph import WorkflowGraph
from client import d4pClient
....
producer = NumberProducer()
isprime = IsPrime()
printprime = PrintPrime()
graph = WorkflowGraph()
graph.connect(producer, 'output', isprime, 'input')
graph.connect(isprime, 'output', printprime, 'input')
client = d4pClient()
#Create User and Login
print("\n Create User and Login \n")
client.register("root","root")
client.login("root","root")
print("\n Register a PE \n")
#Without the user providing a manual description of the PE. Laminar will generate one automatically at the registration time
client.register_pe(isprime,"isprime")
#Providing a manual description of the workflow
client.register_pe(isprime,"isprime", description="A pe that detects if a number is prime")
...
client = d4pClient()
client.login("root","root")
print("\n Register Graph \n")
#Without the user providing a manual description of the Workflow. Laminar will generate one automatically at the registration time
client.register_Workflow(graph,"graph_sample")
#Providing a manual description of the workflow
#client.register_Workflow(graph, "graph_sample", description="This is workflow that detects prime numbers")
from client import d4pClient
client = d4pClient()
client.login("root","root")
pe_details = client.get_PE('isprime')
print(pe_details)
from client import d4pClient
client = d4pClient()
client.login("root","root")
workflow_details = client.get_Workflow('graph_sample')
print(workflow_details)
from client import d4pClient
client = d4pClient()
client.login("root","root")
producer = client.get_PE("NumberProducer")[0]
isprime = client.get_PE("IsPrime")[0]
printprime = client.get_PE("PrintPrime")[0]
new_graph = WorkflowGraph()
new_graph.connect(producer, 'output', isprime, 'input')
new_graph.connect(isprime, 'output', printprime, 'input')
print("\n Running the Workflow Sequentially\n")
client.run(new_graph,input=100)
from client import d4pClient
client = d4pClient()
client.login("root","root")
graph_sample = client.get_Workflow('graph_sample')[0]
from client import d4pClient
client = d4pClient()
client.login("root","root")
pes = client.get_PEs_By_Workflow('graph_sample')
for pe in pes:
print(pe)
from client import d4pClient
client = d4pClient()
client.login("root","root")
registry = client.get_Registry()
for elem in registry:
print(elem)
from client import d4pClient
client = d4pClient()
client.login("root","root")
data = client.get_PE("IsPrime")
isprime = data[0]
sc = data[1]
client.describe(isprime, sc, include_source_code=True)
The first parameter is the string used for a literal search. The second parameter defines the search scope: use "pe" to search only within Processing Elements, "workflow" to search exclusively within Workflows, or "both" to search across both. In this example, we are using the "both" option.
client = d4pClient()
client.login("root","root")
results=client.search_Registry_Literal("prime","both")
for r in results:
print(r)
The first parameter is the string used for a semantic search. The second parameter allows you to specify the search scope: use "pe" to search only within Processing Elements (PEs) or "workflow" to search exclusively within Workflows. In this example, we are using "pe" to search within PEs.
client = d4pClient()
client.login("root","root")
results = client.search_registry_semantic("anomaly detection", "pe")
for result in results:
print(result)
The first parameter is the code snippet used for a code recommendation. The second parameter allows you to specify the search scope:use "pe" to search only within Processing Elements (PEs) or "workflow" to search exclusively within Workflows. In this example, we are using "pe" to search within PEs.
results=client.code_Recommendation("random.randint(1, 1000)","pe")
for r in results:
print(r)
...
client = d4pClient()
client.login("root","root")
....
# running the workflow with simple mapping (sequential)
run_response = client.run('graph_sample', input=5)
print(run_response)
# running the workflow with multiprocess mapping
run_response = client.run_multiprocess('graph_sample', input=5)
print(run_response)
# running the workflow with dynamic mapping (redis )
run_response = client.run_dynamic('graph_sample', input=5)
print(run_response)
It needs the ID of the PE
from client import d4pClient
client = d4pClient()
client.login("root","root")
client.update_PE_Description(218, "new PE description")
It needs the ID of the Workflow:
from client import d4pClient
client = d4pClient()
client.login("root","root")
client.update_Workflow_Description(217, "new description")
It needs the ID of the PE:
from client import d4pClient
client = d4pClient()
client.login("root","root")
r=client.remove_PE(218)
It needs the ID of the Workflow:
from client import d4pClient
client = d4pClient()
client.login("root","root")
r=client.remove_PE(217)
from client import d4pClient
client = d4pClient()
client.login("root","root")
r=client.remove_All()
from dispel4py.base import ProducerPE, IterativePE, ConsumerPE
from dispel4py.workflow_graph import WorkflowGraph
import random
from client import d4pClient
class NumberProducer(ProducerPE):
def __init__(self):
ProducerPE.__init__(self)
def _process(self, inputs):
# this PE produces one input
result= random.randint(1, 1000)
return result
class IsPrime(IterativePE):
def __init__(self):
IterativePE.__init__(self)
def _process(self, num):
# this PE consumes one input and produces one output
print("before checking data - %s - is prime or not\n" % num, end="")
if all(num % i != 0 for i in range(2, num)):
return num
class PrintPrime(ConsumerPE):
def __init__(self):
ConsumerPE.__init__(self)
def _process(self, num):
# this PE consumes one input
print("the num %s is prime\n" % num, end="")
producer = NumberProducer()
isprime = IsPrime()
printprime = PrintPrime()
graph = WorkflowGraph()
graph.connect(producer, 'output', isprime, 'input')
graph.connect(isprime, 'output', printprime, 'input')
client = d4pClient()
#Create User and Login
print("\n Create User and Login \n")
client.register("root","root")
client.login("root","root")
print("\n Register Graph with automatic description generated \n")
client.register_Workflow(graph,"graph_sample")
#print("\n Register Graph given a description manually\n")
#client.register_Workflow(graph,"graph_sample", description="a workflow that produce random number and print only the prime ones.")
print("\n Literal to Search on PEs \n")
results=client.search_Registry_Literal("prime","pe")
for r in results:
print(r)
print("\n Text to Code Search on PEs\n")
results=client.search_Registry_Semantic("checks prime numbers","pe")
for r in results:
print(r)
print("\n Code to Text Search (Code Recommendation) on PEs \n")
results=client.code_Recommendation("random.randint(1, 1000)","pe")
for r in results:
print(r)
#SIMPLE (Sequential)
print("\n Running the Workflow Sequentially\n")
r=client.run(graph,input=100)
print(r)
#MULTI
print("\n Running the Workflow in Parallel - Multi mapping\n")
client.run_multiprocess(graph,input=100)
#DYNAMIC (Redis)
print("\n Running the Workflow Dynamically - Redis mapping\n")
client.run_dynamic(graph,input=100)