Skip to content

Ec2 Utils

AwsEngine

AwsEngine uses boto3 for ec2 connections

Source code in app/utils/aws_ec2_utils.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class AwsEngine:
    """AwsEngine uses boto3 for ec2 connections
    """
    def __init__(self,config):
        self.ec2 = boto3.client(
            'ec2',
            aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
            aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
            region_name=config.get("region", "us-east-1")
        )
        self.ec2_resource = boto3.resource(
            'ec2',
            aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
            aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
            region_name=config.get("region", "us-east-1")
        )

check_ec2_server_running_status(aws_engine, InstanceId)

Wait until an ec2 instance is in 'running' state Args: aws_engine (AwsEngine): aws ec2 connection object. InstanceId (str): Instance ID.

Source code in app/utils/aws_ec2_utils.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def check_ec2_server_running_status(
    aws_engine,
    InstanceId
):
    """Wait until an ec2 instance is in 'running' state
    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        InstanceId (str): Instance ID.
    """
    start_time = time.time()
    time_out = 60*5
    instance_state = None
    while True:

        instance_state = get_instance(aws_engine, InstanceId)
        current_wait_time = (time.time() - start_time)
        status = \
            instance_state['Reservations'][0]['Instances'][0]['State']['Name'] \
                == "running"
        if status:
            print(f"EC2 Instance Running." )
            break
        elif current_wait_time > time_out:
            print("TimeOutError : EC2 Instance didnt start!")
            break
        else:
            print(f"Current wait time {current_wait_time}.  Max wait time {time_out}")
            time.sleep(10)

    return instance_state

check_vllm_server_running_status(ssh, tmux_session_name='vllm_server', wait=True, verbose=True, docs_url=None)

Check if VLLM insference endpoint has started.

Parameters:

Name Type Description Default
ssh SSHClient
required
tmux_session_name str

Defaults to "vllm_server".

'vllm_server'
wait bool

Wait unitl the VLLM server is in running state. Defaults to True.

True
verbose bool
True
docs_url str

This url can to used to check if the endpoint is callable.

None
Source code in app/utils/aws_ec2_utils.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def check_vllm_server_running_status(
    ssh: paramiko.SSHClient,
    tmux_session_name: str ="vllm_server",
    wait=True,
    verbose=True,
    docs_url=None
):
    """Check if VLLM insference endpoint has started.  

    Args:
        ssh (paramiko.SSHClient):    
        tmux_session_name (str, optional): Defaults to "vllm_server".
        wait (bool, optional): Wait unitl the VLLM server is in running state. Defaults to True.
        verbose (bool, optional): 
        docs_url (str, optional): This url can to used to check if the endpoint is callable. 
        If this isnt provided, the corresponding tmux pane is used for checks. Defaults to None`
    """
    start_time = time.time()
    time_out = 5#sec
    while True:

        current_wait_time = (time.time() - start_time)
        if docs_url:
            try:
                res = requests.get(docs_url)
                if res.status_code == 200:
                    is_running, server_status = True, "running"
            except:
                is_running, server_status = False, "connection error"
        else:
            is_running, server_status = is_running_vllm_server(ssh, tmux_session_name, verbose)

        if not wait:
            return is_running, server_status
        elif server_status == "running":
            print(f"VLLM SERVER RUNNING." )
            return server_status
        elif current_wait_time > time_out:
            print("TimeOutError : vllm server didnt started, Manual intervention needed!")
            return server_status
        else:
            print(f"Current wait time {current_wait_time}.  Max wait time {time_out}")
            time.sleep(10)

create_ec2_instance(aws_engine, expert_instance_name, expert_instance_type='g5.2xlarge', KeyName='connection-key', ami_id='ami-your-ec2-ami-with-vllm-installed')

Created an ec2 instance in the default region.

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
expert_instance_name str

This will be the instance name

required
expert_instance_type str

Defaults to 'g5.2xlarge'.

'g5.2xlarge'
KeyName str

The pem file key pair to be used. Defaults to 'connection-key'.

'connection-key'
ami_id str

The ami should have vllm installed. Defaults to "ami-your-ec2-ami-with-vllm-installed". This is all the required repo's and pakages installed. conda environment to be used is pytorch.

'ami-your-ec2-ami-with-vllm-installed'
Source code in app/utils/aws_ec2_utils.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def create_ec2_instance(
        aws_engine,
        expert_instance_name,
        expert_instance_type='g5.2xlarge',
        KeyName = 'connection-key',
        ami_id = "ami-your-ec2-ami-with-vllm-installed"
    ):
    """
    Created an ec2 instance in the default region.  

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        expert_instance_name (str): This will be the instance name
        expert_instance_type (str, optional): Defaults to 'g5.2xlarge'.
        KeyName (str, optional): The pem file key pair to be used. Defaults to 'connection-key'.
        ami_id (str, optional): The ami should have vllm installed. 
                                Defaults to "ami-your-ec2-ami-with-vllm-installed".
                                This is all the required repo's and pakages installed.
                                conda environment to be used is pytorch.
    """
    if not ami_id:
        raise

    instance = aws_engine.ec2_resource.create_instances(  
        ImageId = ami_id,
        InstanceType = expert_instance_type,

        MinCount=1,
        MaxCount=1,

        KeyName=KeyName,

        SecurityGroupIds=[
            # This sequrity group has port 8000 open and allows ssh connection
             os.getenv('SECURITY_GROUP_ID'),
        ],


        TagSpecifications=[
            {   'ResourceType': 'instance',
                'Tags': [
                    {
                        'Key': 'Name',
                        'Value': expert_instance_name
                    },
                ]
            },
        ],
    )
    return instance

create_or_revive_expert_instance(aws_engine, instance_name, model_id, expert_instance_type='g5.2xlarge', KeyName='connection-key', ami_id='ami-your-ec2-ami-with-vllm-installed', tmux_session_name='vllm_server', wait_for_expert_to_start=False, key_path='app/connection-key.pem')

Check if ec2 instace is already present. If alreafy present and is inactive, turn it on Else create a new instance. Connect to the instance and start vllm server.

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
instance_name str

The tag assigned to instance.

required
model_id str

description

required
expert_instance_type str

description. Defaults to 'g5.2xlarge'.

'g5.2xlarge'
KeyName str

description. Defaults to 'connection-key'.

'connection-key'
ami_id str

description. Defaults to "ami-your-ec2-ami-with-vllm-installed".

'ami-your-ec2-ami-with-vllm-installed'
tmux_session_name str

Name assigned to tmux session running inference endpoints. Defaults to "vllm_server".

'vllm_server'
wait_for_expert_to_start bool

wait until the expert server is started successfully. It might be recommended to turn this off when threading is not used and experts are started in series.
This would eliminate the wait time, and hence the expert checking can be handled outside this function.

False
key_path str)

Path to the pem file required to connect to ec2 instance.

'app/connection-key.pem'
Source code in app/utils/aws_ec2_utils.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def create_or_revive_expert_instance(
    aws_engine,
    instance_name: str,
    model_id: str,
    expert_instance_type: str='g5.2xlarge',
    KeyName: str = 'connection-key',
    ami_id: str ="ami-your-ec2-ami-with-vllm-installed",
    tmux_session_name: str = "vllm_server",
    wait_for_expert_to_start = False,
    key_path = "app/connection-key.pem"
):
    """
    Check if ec2 instace is already present.
    If alreafy present and is inactive, turn it on
    Else create a new instance.
    Connect to the instance and start vllm server.

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        instance_name (str): The tag assigned to instance.
        model_id (str): _description_
        expert_instance_type (str, optional): _description_. Defaults to 'g5.2xlarge'.  
        KeyName (str, optional): _description_. Defaults to 'connection-key'.  
        ami_id (str, optional): _description_. Defaults to "ami-your-ec2-ami-with-vllm-installed".  
        tmux_session_name (str, optional): Name assigned to tmux session running inference endpoints. Defaults to "vllm_server".  
        wait_for_expert_to_start (bool): wait until the expert server is started successfully.
            It might be recommended to turn this off when threading is not used and experts are started in series.  
            This would eliminate the wait time, and hence the expert checking can be handled outside this function.  
        key_path (str) : Path to the pem file required to connect to ec2 instance.  
    """
    # expert_instance_name = modelid_to_instancename(model_id)

    instance_meta = is_ec2_instance_present(aws_engine, instance_name )
    if instance_meta['is_present'] and \
        instance_meta['meta']['State']['Name']=='stopped':
        print("Instance is present and is in stopped state, reviving...")
        revive_ec2_instance(aws_engine,instance_meta['meta']['InstanceId'])
        # instance = check_ec2_server_running_status(aws_engine, instance_meta['meta']['InstanceId'])
    elif instance_meta['is_present'] and \
        instance_meta['meta']['State']['Name'] == 'running':
        print("Instance is present and is in running state...")
    elif not instance_meta['is_present']:
        print("Creating new instance...")
        """Create a new ec2 instance"""
        instance = create_ec2_instance(
            aws_engine,
            instance_name,
            expert_instance_type,
            KeyName,
            ami_id
        )
        instance_meta = is_ec2_instance_present(aws_engine, instance_name )
        assert instance_meta['is_present']
    else:
        print("Instance in Initialization mode, please try after sometime.")
        return dict()

    """ connect to instance and start vllm server for the expert """
    try:
        ip_address = instance_meta['meta']['PublicIpAddress']
        ssh = get_ssh_session(ip_address, key_path=key_path)
    except Exception as e:
        print("Instance in Initialization mode, please try after sometime.")
        print(str(e))
        return dict()

    return dict(
        ip_address=ip_address,
        instance_name=instance_name
    )

get_available_instances(aws_engine)

Get the details for all the available instances using aws_engine.

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
Source code in app/utils/aws_ec2_utils.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def get_available_instances(aws_engine):
    """Get the details for all the available instances using aws_engine.

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
    """
    response = aws_engine.ec2.describe_instances()
    available_instances = []
    for instances in response['Reservations']:
        for instance in instances['Instances']:
            available_instances.append(
                    instance
                )
    return available_instances

get_instance(aws_engine, InstanceId)

Get Metadata for an Instance

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
InstanceId str

Instance ID.

required

Returns:

Name Type Description
response json

Instance metadata

Source code in app/utils/aws_ec2_utils.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def get_instance(aws_engine,InstanceId):
    """Get Metadata for an Instance

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        InstanceId (str): Instance ID.

    Returns:
        response (json): Instance metadata
    """
    response = aws_engine.ec2.describe_instances(
        InstanceIds=[InstanceId],
    )
    return response

is_ec2_instance_present(aws_engine, instance_name)

check if an ec2 instance is present

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
instance_name str

The tag assigned to instance.

required

Returns: status (dict): Instance Running Status

Source code in app/utils/aws_ec2_utils.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def is_ec2_instance_present(aws_engine, instance_name):
    """
    check if an ec2 instance is present

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        instance_name (str): The tag assigned to instance.
    Returns:
        status (dict): Instance Running Status
    """
    ec2_instances_present = get_available_instances(aws_engine)
    for instance in ec2_instances_present:
        for current_instance_name in instance.get('Tags',[]):
            if current_instance_name['Key'] == "Name":
                if current_instance_name['Value'] == instance_name:
                    if instance['State']['Name'] != 'terminated':
                        return dict(
                            is_present=True, 
                            meta=instance
                            )
    return dict( is_present=False, meta=None )

revive_ec2_instance(aws_engine, InstanceId)

Revive an ec2 instance which is in "Stopped" state.

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
InstanceId str

Instance ID.

required
Source code in app/utils/aws_ec2_utils.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def revive_ec2_instance(
        aws_engine,
        InstanceId
):
    """Revive an ec2 instance which is in "Stopped" state.

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        InstanceId (str): Instance ID.
    """
    response = aws_engine.ec2.start_instances(
        InstanceIds=[
            InstanceId,
        ],
        DryRun=False
    )

run_vllm_server(aws_engine, instance_name, model_id, tmux_session_name='vllm_server', max_wait_time=120, wait_for_expert_to_start=False, key_path='app/keys/connection-key.pem')

Start a VLLM server for the provided model_id. Note model_id should be from HuggingFace.

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
instance_name str

The tag assigned to instance.

required
model_id str

Huggingface model_id to served.

required
tmux_session_name str

Name assigned to tmux session running inference endpoints. Defaults to "vllm_server".

'vllm_server'
max_wait_time int

Wait time in seconds till the server starts.

120
wait_for_expert_to_start bool

Pause the Execution until the inference endpoint starts. Defaults to False.

False
key_path str)

Path to the pem file required to connect to ec2 instance.

'app/keys/connection-key.pem'
Source code in app/utils/aws_ec2_utils.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def run_vllm_server(
    aws_engine,
    instance_name: str,
    model_id: str,
    tmux_session_name: str = "vllm_server",
    max_wait_time = 120,
    wait_for_expert_to_start=False,
    key_path = "app/keys/connection-key.pem"
):
    """
    Start a VLLM server for the provided model_id. Note model_id should be from HuggingFace.  

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        instance_name (str): The tag assigned to instance.
        model_id (str): Huggingface model_id to served.
        tmux_session_name (str, optional): Name assigned to tmux session running inference endpoints. Defaults to "vllm_server".  
        max_wait_time (int, optional): Wait time in seconds till the server starts.  
        wait_for_expert_to_start (bool, optional): Pause the Execution until the inference endpoint starts. Defaults to False.  
        key_path (str) : Path to the pem file required to connect to ec2 instance.  
    """

    """ connect to instance and start vllm server for the expert """
    start_time = time.time()
    while True:
        try:
            instance_meta = is_ec2_instance_present(aws_engine, instance_name )
            ip_address = instance_meta['meta']['PublicIpAddress']
            ssh = get_ssh_session(ip_address, key_path=key_path)
            break
        except:
            pass
        if time.time() - start_time > max_wait_time:
            print("Unable to ssh to instance, instace might be initializing, please try after some time or increase the max_wait_time")
            return
        else:
            time.sleep(15)

    start_vllm_server(
            ssh = ssh,
            model_id = model_id,
            ip_address = ip_address,
            port =8000,
            conda_env_name = "pytorch",
            tmux_session_name = tmux_session_name
        )

    """ keep waiting till expert starts """
    if wait_for_expert_to_start:
        check_vllm_server_running_status( ssh, tmux_session_name )

    response = dict(ip_address=ip_address,instance_name=instance_name)
    response.update(instance_meta['meta'])
    return response

stop_ec2_instance(aws_engine, InstanceId)

Stop an Ec2 Instance. NOTE: This would change the status of ec2 instance to "stopped" and not "Terminate".

Parameters:

Name Type Description Default
aws_engine AwsEngine

aws ec2 connection object.

required
InstanceId str

Instance ID.

required
Source code in app/utils/aws_ec2_utils.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def stop_ec2_instance(
        aws_engine,
        InstanceId
):
    """Stop an Ec2 Instance.
    NOTE:  This would change the status of ec2 instance to "stopped" and not "Terminate".  

    Args:
        aws_engine (AwsEngine): aws ec2 connection object.
        InstanceId (str): Instance ID.
    """
    response = aws_engine.ec2.stop_instances(
        InstanceIds=[
            InstanceId,
        ],
        DryRun=False
    )
    return response