Skip to content

Orchestrator

The Orchestrator functions as the central wrapper object, facilitating the seamless deployment of underlying models. Each individual model can be hosted on a dedicated server for efficient and scalable service. Furthermore, multiple models can be hosted on a shared server. Refer to the configurations in the configs/demo* directory for illustrative examples.

Source code in app/orchestrator.py
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Orchestrator:
    """The Orchestrator functions as the central wrapper object, facilitating the seamless deployment of underlying models. Each individual model can be hosted on a dedicated server for efficient and scalable service.
    Furthermore, multiple models can be hosted on a shared server. Refer to the configurations in the `configs/demo*` directory for illustrative examples.
    """
    def __init__(self, config, verbose=True):
        """init

        Args:
            config (json): The configuration file contains all the necessary information required to initialize a server. Refer to the examples in the `app/config` directory for guidance.
            verbose (bool, optional): Defaults to True.
        """
        self.config = config
        self.verbose = verbose
        self.orchestrator = get_server(config['orchestrator'], "orchestrator")
        self.experts = {expert_config['expert_id']: get_server(expert_config) \
                        for expert_config in self.config['experts']}
        self.config['logs'] = []

    def load_orchestrator_server(self):
        """Loads the Orchestrator Server. Activates the dedicated machine using specifications from the configuration for the orchestrator server.
        """
        if self.orchestrator.config.get('s3_download_path'):
            raise NotImplementedError
        instance_state = self.orchestrator.start_server()
        self.config['logs'].append('starting orchestrator')

    def load_experts_server(self):
        """
        Load Servers for Each Expert
        Activates dedicated machines using specifications from the configuration for each of the Experts' servers.
        """
        for expert_id,expert in self.experts.items():
            instance_state = expert.start_server()
            self.config['logs'].append(f'starting expert {expert_id}')

    def start_inference_endpoints(self, max_wait_time=120):
        """Start Inference Endpoint for Orchestrator and All Experts.
        The inference endpoints for each model can be served using VLLM or AWS Sagemaker.
        Note: For closed-source endpoints like GPT-4, `start_inference_endpoint` is non-functional.

        Args:
            max_wait_time (int, optional): Defaults to 120.
        """
        self.orchestrator.start_inference_endpoint(max_wait_time)
        for expert in self.experts.values():
            expert.start_inference_endpoint(max_wait_time)

    def check_servers_state(self):
        """Perform a ping test on each server to confirm their operational status.

        Returns:
            status (bool): True/False if the the inference endpoint is running.
        """
        status = []
        state = self.orchestrator.check_servers_state()
        print(f"Model id {self.orchestrator.instance_name} : {state}")
        status.append(state[0])
        for expert in self.experts.values():
            state = expert.check_servers_state()
            print(f"Model id {expert.instance_name} : {state}")
            status.append(state[0])
        return False not in status

    def get_response(self,query):
        """Get the Response for input query.
        The input query undergoes orchestration to determine the most suitable expert model. Subsequently, the query is processed through the chosen expert model, and the resulting response is returned.   

        Args:
            query (str): Input Query.

        Returns:
            response (str): Response.
        """
        expert_id = self.get_orchestrator_response(query)
        response = self.get_expert_response(query,expert_id)
        return response

    def get_orchestrator_response(self,query):
        expert_id = self.orchestrator.get_response(query)
        print(f"Selecting Expert Id {expert_id} for current query")
        return expert_id

    def get_expert_response(self,query,expert_id):
        expert = self.experts.get(expert_id)
        response = expert.get_response(query)
        return response

__init__(config, verbose=True)

init

Parameters:

Name Type Description Default
config json

The configuration file contains all the necessary information required to initialize a server. Refer to the examples in the app/config directory for guidance.

required
verbose bool

Defaults to True.

True
Source code in app/orchestrator.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def __init__(self, config, verbose=True):
    """init

    Args:
        config (json): The configuration file contains all the necessary information required to initialize a server. Refer to the examples in the `app/config` directory for guidance.
        verbose (bool, optional): Defaults to True.
    """
    self.config = config
    self.verbose = verbose
    self.orchestrator = get_server(config['orchestrator'], "orchestrator")
    self.experts = {expert_config['expert_id']: get_server(expert_config) \
                    for expert_config in self.config['experts']}
    self.config['logs'] = []

check_servers_state()

Perform a ping test on each server to confirm their operational status.

Returns:

Name Type Description
status bool

True/False if the the inference endpoint is running.

Source code in app/orchestrator.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def check_servers_state(self):
    """Perform a ping test on each server to confirm their operational status.

    Returns:
        status (bool): True/False if the the inference endpoint is running.
    """
    status = []
    state = self.orchestrator.check_servers_state()
    print(f"Model id {self.orchestrator.instance_name} : {state}")
    status.append(state[0])
    for expert in self.experts.values():
        state = expert.check_servers_state()
        print(f"Model id {expert.instance_name} : {state}")
        status.append(state[0])
    return False not in status

get_response(query)

Get the Response for input query. The input query undergoes orchestration to determine the most suitable expert model. Subsequently, the query is processed through the chosen expert model, and the resulting response is returned.

Parameters:

Name Type Description Default
query str

Input Query.

required

Returns:

Name Type Description
response str

Response.

Source code in app/orchestrator.py
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_response(self,query):
    """Get the Response for input query.
    The input query undergoes orchestration to determine the most suitable expert model. Subsequently, the query is processed through the chosen expert model, and the resulting response is returned.   

    Args:
        query (str): Input Query.

    Returns:
        response (str): Response.
    """
    expert_id = self.get_orchestrator_response(query)
    response = self.get_expert_response(query,expert_id)
    return response

load_experts_server()

Load Servers for Each Expert Activates dedicated machines using specifications from the configuration for each of the Experts' servers.

Source code in app/orchestrator.py
29
30
31
32
33
34
35
36
def load_experts_server(self):
    """
    Load Servers for Each Expert
    Activates dedicated machines using specifications from the configuration for each of the Experts' servers.
    """
    for expert_id,expert in self.experts.items():
        instance_state = expert.start_server()
        self.config['logs'].append(f'starting expert {expert_id}')

load_orchestrator_server()

Loads the Orchestrator Server. Activates the dedicated machine using specifications from the configuration for the orchestrator server.

Source code in app/orchestrator.py
21
22
23
24
25
26
27
def load_orchestrator_server(self):
    """Loads the Orchestrator Server. Activates the dedicated machine using specifications from the configuration for the orchestrator server.
    """
    if self.orchestrator.config.get('s3_download_path'):
        raise NotImplementedError
    instance_state = self.orchestrator.start_server()
    self.config['logs'].append('starting orchestrator')

start_inference_endpoints(max_wait_time=120)

Start Inference Endpoint for Orchestrator and All Experts. The inference endpoints for each model can be served using VLLM or AWS Sagemaker. Note: For closed-source endpoints like GPT-4, start_inference_endpoint is non-functional.

Parameters:

Name Type Description Default
max_wait_time int

Defaults to 120.

120
Source code in app/orchestrator.py
38
39
40
41
42
43
44
45
46
47
48
def start_inference_endpoints(self, max_wait_time=120):
    """Start Inference Endpoint for Orchestrator and All Experts.
    The inference endpoints for each model can be served using VLLM or AWS Sagemaker.
    Note: For closed-source endpoints like GPT-4, `start_inference_endpoint` is non-functional.

    Args:
        max_wait_time (int, optional): Defaults to 120.
    """
    self.orchestrator.start_inference_endpoint(max_wait_time)
    for expert in self.experts.values():
        expert.start_inference_endpoint(max_wait_time)