-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
key_rotation_async.py
111 lines (96 loc) · 5.68 KB
/
key_rotation_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
import os
from azure.identity.aio import DefaultAzureCredential
from azure.keyvault.keys import KeyRotationLifetimeAction, KeyRotationPolicy, KeyRotationPolicyAction
from azure.keyvault.keys.aio import KeyClient
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
# 1. An Azure Key Vault (https://docs.microsoft.com/azure/key-vault/quick-create-cli)
#
# 2. azure-keyvault-keys and azure-identity libraries (pip install these)
#
# 3. Set environment variable VAULT_URL with the URL of your key vault
#
# 4. Set up your environment to use azure-identity's DefaultAzureCredential. For more information about how to configure
# the DefaultAzureCredential, refer to https://aka.ms/azsdk/python/identity/docs#azure.identity.DefaultAzureCredential
#
# 5. Key rotation permissions for your service principal in your vault
#
# ----------------------------------------------------------------------------------------------------------
# Sample - creates and updates a key's automated rotation policy, and rotates a key on-demand
#
# 1. Create a new key rotation policy (update_key_rotation_policy)
#
# 2. Get a key's current rotation policy (get_key_rotation_policy)
#
# 3. Update a key's rotation policy (update_key_rotation_policy)
#
# 4. Rotate a key on-demand (rotate_key)
#
# 5. Delete a key (delete_key)
# ----------------------------------------------------------------------------------------------------------
async def run_sample():
# Instantiate a key client that will be used to call the service.
# Here we use the DefaultAzureCredential, but any azure-identity credential can be used.
VAULT_URL = os.environ["VAULT_URL"]
credential = DefaultAzureCredential()
client = KeyClient(vault_url=VAULT_URL, credential=credential)
# First, create a key
key_name = "rotation-sample-key-async"
key = await client.create_rsa_key(key_name)
print(f"\nCreated a key; new version is {key.properties.version}")
# Set the key's automated rotation policy to rotate the key two months after the key was created.
# If you pass an empty KeyRotationPolicy() as the `policy` parameter, the rotation policy will be set to the
# default policy. Any keyword arguments will update specified properties of the policy.
actions = [KeyRotationLifetimeAction(KeyRotationPolicyAction.rotate, time_after_create="P2M")]
updated_policy = await client.update_key_rotation_policy(
key_name, KeyRotationPolicy(), expires_in="P90D", lifetime_actions=actions
)
assert updated_policy.expires_in == "P90D"
# The updated policy should have the specified lifetime action
policy_action = None
for i in range(len(updated_policy.lifetime_actions)):
if updated_policy.lifetime_actions[i].action == KeyRotationPolicyAction.rotate:
policy_action = updated_policy.lifetime_actions[i]
assert policy_action, "The specified action should exist in the key rotation policy"
assert policy_action.time_after_create == "P2M", "The action should have the specified time_after_create"
assert policy_action.time_before_expiry is None, "The action shouldn't have a time_before_expiry"
print(f"\nCreated a new key rotation policy: {policy_action.action} after {policy_action.time_after_create}")
# Get the key's current rotation policy
current_policy = await client.get_key_rotation_policy(key_name)
policy_action = None
for i in range(len(current_policy.lifetime_actions)):
if current_policy.lifetime_actions[i].action == KeyRotationPolicyAction.rotate:
policy_action = current_policy.lifetime_actions[i]
assert policy_action
print(f"\nCurrent rotation policy: {policy_action.action} after {policy_action.time_after_create}")
# Update the key's automated rotation policy to notify 10 days before the key expires
new_actions = [KeyRotationLifetimeAction(KeyRotationPolicyAction.notify, time_before_expiry="P10D")]
# To preserve an existing rotation policy, pass in the existing policy as the `policy` parameter.
# Any property specified as a keyword argument will be overridden completely by the provided value.
# In this case, the rotate action we created earlier will be removed from the policy.
new_policy = await client.update_key_rotation_policy(key_name, current_policy, lifetime_actions=new_actions)
assert new_policy.expires_in == "P90D", "The key's expiry time should have been preserved"
# The updated policy should include the new notify action
notify_action = None
for i in range(len(new_policy.lifetime_actions)):
if new_policy.lifetime_actions[i].action == KeyRotationPolicyAction.notify:
notify_action = new_policy.lifetime_actions[i]
assert notify_action, "The specified action should exist in the key rotation policy"
assert notify_action.time_after_create is None, "The action shouldn't have a time_after_create"
assert notify_action.time_before_expiry == "P10D", "The action should have the specified time_before_expiry"
print(f"\nNew policy action: {notify_action.action} {notify_action.time_before_expiry} before expiry")
# Finally, you can rotate a key on-demand by creating a new version of the key
rotated_key = await client.rotate_key(key_name)
print(f"\nRotated the key on-demand; new version is {rotated_key.properties.version}")
# To clean up, delete the key
await client.delete_key(key_name)
print("\nDeleted the key")
await credential.close()
await client.close()
if __name__ == "__main__":
asyncio.run(run_sample())