그저 일상들
[Azure] Console 계정 관리 자동화 본문
API 계정 관리 (Console)
- 개요 : DS Hybrid Cloud 환경의 Console 계정 관리 자동화 시스템 적용 전 Ezcom Azure 환경에서 추가 테스트를 진행하여 수정 및 보완할 점을 파악하여 DS 환경에 즉시 적용할 수 있도록 함
Test Process
VS Code 설정
확장 설치
- Azure Tools
- Azure Account
- Python 관련 확장
New Project 생성
- Command + Shift + p → Azure Functions: Create New Project 생성 → 코드 입력
import azure.functions as func
import logging
import requests
import json
import os
from datetime import datetime, timedelta, timezone
from azure.identity import ClientSecretCredential
from azure.core.exceptions import AzureError
from azure.communication.email import EmailClient
from azure.storage.blob import BlobServiceClient
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
notification_disable_days = {
"start": 75,
"second": 85,
"third": 87,
"last": 89,
"disabled": 90
}
notification_delete_days = {
"delete": 104,
"deleted": 105
}
disable_thresholds = [
notification_disable_days["start"],
notification_disable_days["second"],
notification_disable_days["third"],
notification_disable_days["last"]
]
delete_thresholds = [
notification_delete_days["delete"]
]
message_type = {
'PENDING_DISABLE': 'pending_disable',
'DISABLED': 'disabled',
'PENDING_DELETE': 'pending_delete',
'DELETED': 'deleted'
}
@app.route(route="console_test")
def console_test(req: func.HttpRequest) -> func.HttpResponse:
try:
token = get_token()
logging.info(f"Token Value: {token}")
users = get_eid_users(token)
users_sign_in_log = get_eid_users_last_login(users)
# users_sign_in_log = load_dummy_data("dummy_data.json")
need_notify_users = get_notification_users(token, users_sign_in_log)
if need_notify_users:
email_client = EmailClient.from_connection_string(os.getenv("ACS_CONNECTION_STRING"))
for user in need_notify_users:
logging.info(f"Notify User: {user['displayName']}")
send_email(user, email_client)
if token:
return func.HttpResponse(
"Hello, Users. This HTTP triggered function executed successfully."
)
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Could not retrieve token.",
status_code=500,
)
except Exception as e:
logging.error(f"An error occurred: {e}")
return func.HttpResponse("An internal server error occurred.", status_code=500)
def get_token() -> str:
try:
tenant_id = os.getenv("TENANT_ID")
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
logging.info(f"Tenant ID: {tenant_id}")
if not tenant_id or not client_id or not client_secret:
raise ValueError(
"Tenant ID, Client ID, and Client Secret must be set in environment variables."
)
credential = ClientSecretCredential(
tenant_id=tenant_id, client_id=client_id, client_secret=client_secret
)
token = credential.get_token("https://graph.microsoft.com/.default").token
return token
except (ValueError, AzureError) as e:
logging.error(f"Error obtaining token: {e}")
return None
def get_eid_users(token):
graph_api_url = "https://graph.microsoft.com/v1.0/users"
graph_api_headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
users = []
while graph_api_url:
response = requests.get(graph_api_url, headers=graph_api_headers)
response.raise_for_status()
response_data = response.json()
users.extend(response_data.get("value", []))
graph_api_url = response_data.get("@odata.nextLink", None)
logging.info(f"Total users retrieved: {len(users)}")
logging.info(f"Users data: {users}")
filtered_users = [
{
"userId": user.get("id", "No User ID"),
"displayName": user.get("displayName", "No DisplayName"),
"mail": user.get("userPrincipalName", "No Email")
}
for user in users
if user.get("displayName") and user.get("userPrincipalName")
]
return filtered_users
def get_eid_users_last_login(filtered_users):
logging.info(f"Filtered Users: {filtered_users}")
blob_connection_string = os.getenv('BLOB_CONNECTION_STRING')
container_name = os.getenv('BLOB_CONTAINER_NAME_STRING')
if not blob_connection_string or not container_name:
raise ValueError("Blob connection string and container name must be set in environment variables.")
user_signed_list = []
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)
container_client = blob_service_client.get_container_client(container_name)
# file_path = "[json file path]"
# blob_path = "tenantId=aedcfab6-4e2d-4686-94fb-6013f3616adc/y=2024/m=11/d=06/h=07/m=00/PT1H.json"
# try:
# with open(file_path, "rb") as data:
# container_client.upload_blob(name=blob_path, data=data, overwrite=True)
# print("파일이 성공적으로 업로드되었습니다.")
# except Exception as e:
# print(f"파일 업로드 중 오류가 발생했습니다: {e}")
all_sign_in_logs = []
blob_list = container_client.list_blobs()
for blob in blob_list:
logging.info(f"Processing blob: {blob}")
logging.info(f"Blob Name: {blob.name}")
blob_client = container_client.get_blob_client(blob)
blob_data = blob_client.download_blob().readall()
try:
blob_data_str = blob_data.decode('utf-8')
logging.info(f"Blob Data: {blob_data_str[:1000]}")
for line in blob_data_str.splitlines():
try:
sign_in_log = json.loads(line)
all_sign_in_logs.append(sign_in_log)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON from blob {blob.name}, line: {line[:100]}: {e}")
except Exception as e:
logging.error(f"Unexpected error processing blob {blob.name}: {e}")
logging.info(f"Total Sign-In Logs Retrieved: {len(all_sign_in_logs)}")
azure_portal_logs = [
log for log in all_sign_in_logs
if isinstance(log.get("properties"), dict) and log["properties"].get("appDisplayName") == "Azure Portal" ]
logging.info(f"Total Azure Portal Logs Retrieved: {len(azure_portal_logs)}")
azure_portal_logs.sort(key=lambda x: x["properties"]["createdDateTime"], reverse=True)
for log in azure_portal_logs:
if len(filtered_users) == len(user_signed_list):
break
user_display_name = log["properties"]["userDisplayName"]
if user_display_name not in [user["displayName"] for user in user_signed_list]:
matched_user = next((user for user in filtered_users if user["displayName"] == user_display_name), None)
if matched_user:
user_signed_list.append(
{
"userId": matched_user["userId"],
"displayName": user_display_name,
"mail": matched_user["mail"],
"lastSignIn": log["properties"].get("createdDateTime"),
}
)
logging.info(f"User Signed List: {user_signed_list}")
# try:
# with open('all_sign_in_logs.json', 'w', encoding='utf-8') as f:
# json.dump(all_sign_in_logs, f, ensure_ascii=False, indent=4)
# logging.info("All sign-in logs have been saved to all_sign_in_logs.json")
# except Exception as e:
# logging.error(f"Failed to save all sign-in logs to file: {e}")
# with open('user_signed_list.json', 'w', encoding='utf-8') as f:
# json.dump(user_signed_list, f, ensure_ascii=False, indent=4)
return user_signed_list
def get_notification_users(token, signed_users):
need_notify_users = []
for user in signed_users:
user_email = user["mail"]
user_display_name = user["displayName"]
user["displayName"] = user_display_name
last_signed_date = datetime.fromisoformat(
user["lastSignIn"].replace("Z", "+00:00")
)
print(f"Secret: {user_display_name} (ID: {user_email})")
print(f"Expiry Date: {last_signed_date}")
current_date_kst = datetime.now(timezone.utc) + timedelta(hours=9)
days_from_last_login = (current_date_kst - last_signed_date).days
if days_from_last_login < notification_disable_days["start"]:
logging.info(f"Not Filtered Days: {user_email}, {user_display_name}, {days_from_last_login}")
continue
if days_from_last_login == notification_disable_days["disabled"]:
logging.info(f"Step 1: Disabled Days: {user_email}, {user_display_name}, {days_from_last_login}")
disable_eid_account(token, user["userId"])
user["notificationType"] = message_type["DISABLED"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
continue
if days_from_last_login == notification_delete_days["deleted"]:
logging.info(f"Step 2: Deleted Days: {user_email}, {user_display_name}, {days_from_last_login}")
delete_eid_account(token, user["userId"])
user["notificationType"] = message_type["DELETED"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
continue
if days_from_last_login > notification_disable_days["disabled"]:
logging.info(f"Step 3: Surround Days: {user_email}, {user_display_name}, {days_from_last_login}")
for key,value in notification_delete_days.items():
if days_from_last_login == value:
logging.info(f"Step 3.1: Surround Fit Days: {user_email}, {user_display_name}, {days_from_last_login}")
user["notificationType"] = message_type["PENDING_DELETE"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
else:
logging.info(f"Step 4: Surround Days: {user_email}, {user_display_name}, {days_from_last_login}")
for key,value in notification_disable_days.items():
if days_from_last_login == value:
logging.info(f"Step 4.1: Surround Fit Days: {user_email}, {user_display_name}, {days_from_last_login}")
user["notificationType"] = message_type["PENDING_DISABLE"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
logging.info(f"Notify Users:{need_notify_users}")
return need_notify_users
def disable_eid_account(token, user_id):
graph_api_url = f"https://graph.microsoft.com/v1.0/users/{user_id}"
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
body = {"accountEnabled": False}
response = requests.patch(graph_api_url, headers=headers, json=body)
try:
response.raise_for_status()
logging.info(f"Successfully disabled account for user ID {user_id}.")
if response.content:
return response.json()
else:
return None
except requests.exceptions.HTTPError as error:
logging.error(
f"Failed to disable account for user ID {user_id}. Error: {error}"
)
return None
def delete_eid_account(token, user_id):
graph_api_url = f"https://graph.microsoft.com/v1.0/users/{user_id}"
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
response = requests.delete(graph_api_url, headers=headers)
try:
response.raise_for_status()
logging.info(f"Successfully deleted account for user ID {user_id}.")
if response.content:
return response.json()
else:
return None
except requests.exceptions.HTTPError as error:
logging.error(f"Failed to delete account for user ID {user_id}. Error: {error} - Response: {response.text}")
return None
def load_dummy_data(filepath: str):
try:
with open(filepath, 'r', encoding='utf-8') as file:
data = json.load(file)
logging.info(f"Dummy data loaded successfully from {filepath}")
return data
except Exception as e:
logging.error(f"Error loading dummy data from {filepath}: {e}")
return []
def send_email(user, email_client):
subject = f"Notification for {user['displayName']}"
content = get_notification_message(user["notificationType"], user["displayName"], user["userId"], user["daysFromLastLogin"])
sender = os.getenv("EMAIL_COMMUNICATION_SERVICE_DOMAIN")
recipient_email = user["mail"]
msp_email = os.getenv("EMAIL_MSP")
messages = {
"content": {"subject": subject, "plainText": content["value"]},
"recipients": {
"to": [{"address": recipient_email, "displayName": user["displayName"]}],
"cc": [{"address": msp_email, "displayName": user["displayName"]}],
},
"senderAddress": sender,
}
try:
email_client.begin_send(messages)
logging.info(f"Email sent to {recipient_email} successfully.")
except Exception as e:
logging.error(f"Failed to send email to {recipient_email}: {e}")
def get_notification_message(noti_type, display_name, user_id, days_last_login):
logging.info(f"NOTI_TYPE:{noti_type}")
if noti_type == message_type["PENDING_DISABLE"]:
remain_days = notification_disable_days["disabled"] - days_last_login
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 {remain_days}일 뒤에 비활성화 될 예정입니다.
- {user_id}
또한, 비활성화 된 계정은 15일 뒤 삭제될 예정이니 업무에 참고 부탁드리겠습니다.
관련하여 계정 업데이트가 필요하시다면 편하신 방법으로 회신 주시면 감사하겠습니다.
감사합니다.
"""
}
elif noti_type == message_type["DISABLED"]:
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 금일 비활성화 되었습니다.
- {user_id}
비활성화된 계정은 15일 뒤 삭제될 예정이니 업무에 참고 부탁드리겠습니다.
계정 활성화가 필요하시다면 편하신 방법으로 회신 주시면 감사하겠습니다.
감사합니다.
"""
}
elif noti_type == message_type["PENDING_DELETE"]:
remain_days = notification_delete_days["deleted"] - days_last_login
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 {remain_days}일 뒤에 삭제될 예정입니다.
- {user_id}
관련하여 계정 업데이트가 필요하시다면 편하신 방법으로 회신 주시면 감사하겠습니다.
감사합니다.
"""
}
else:
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 삭제되었습니다.
- {user_id}
신규 계정 발급이 필요하시다면 결제 통해 요청 주시면 감사하겠습니다.
감사합니다.
"""
}
logging.info(f"MESSAGES:{message}")
return message
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "",
"FUNCTIONS_WORKER_RUNTIME": "python",
"ACS_CONNECTION_STRING": "endpoint=https://iam-poc-acs.korea.communication.azure.com/;accesskey=Cbexm2iTmEPryyyOyvuVQL3cofH2os7fV5gPWlXdNGcVKkeSlAqpJQQJ99AKACULyCpreWJhAAAAAZCSclp6",
"AZURE_SUBS_ID": "dab2f3c5-0d79-4cf2-bda3-adf24cb2f663",
"EMAIL_COMMUNICATION_SERVICE_DOMAIN": "DoNotReply@52b542a7-f379-4277-a006-2ac322d72b64.azurecomm.net",
"EMAIL_MSP": "jhkwon@ezcom.co.kr",
"MANAGED_IDENTITY_CLIENT_ID": "76c4a1db-6764-45b3-b1d0-ed0a0a91e28a",
"BLOB_CONNECTION_STRING": "DefaultEndpointsProtocol=https;AccountName=iampocsa;AccountKey=pctMBuJZFhYZur0Z83aHlvp6kupORaf3tBaxD2qKlRTsSuGbi1LsKM0/v+QGOx7U+eUDmRr2XCCg+AStiPxWIQ==;EndpointSuffix=core.windows.net", # Blob Storage 액세스키 연결 문자열
"BLOB_CONTAINER_NAME_STRING": "insights-logs-signinlogs", # Blob Storage Container Name
"TENANT_ID": "aedcfab6-4e2d-4686-94fb-6013f3616adc", # Tenant Id
"CLIENT_ID": "fef9a93e-ae74-4e55-a660-f1a7f22a076e", # Application(client) Id
"CLIENT_SECRET": "~Nf8Q~ZLlmsArC_D-5gt3ew_WmSMscNdlbghcdeQ" # Client secret value
}
}
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues
azure-functions
azure-identity
azure-communication-email
azure-storage-blob
requests
Azure Function App 생성
- Command + Shift + p → Azure Functions: Create Function App In Azure
Azure Function 으로 Deploy
- Visual Studio Code → Azure → Local Project Deploy → 생성했던 Function App 선택 → Deploy
Azure Console Service 생성하기
Managed ID (관리 ID) 생성
- Console → 관리 ID → 만들기 → 자동 생성된 리소스 그룹 선택 → Region : Korea Central → 검토+만들기
E-mail Communication Service 생성
- Console → E-mail Communication Service → 만들기 → 자동 생성된 리소스 그룹 선택 → 이름/Data Location 선택 → 검토+만들기
- Console → E-mail Communication Service → 설정 → 도메인 프로비전 → Azure 도메인 → 도메인 배포
Communication Service에 도메인 연결
- Console → Communication Service → 전자 메일 → 도메인 → 도메인 연결
- 자동 생성된 리소스 그룹 선택 / 이전에 생성했던 전자 메일 서비스 선택 / 도메인 확인
Function APP (함수 앱) ID 사용자 할당 항목 추가
- Console → Function APP (함수 앱) → 설정 → ID → 사용자 할당 항목 → 이전 생성한 'MID' 추가
애플리케이션 관리자에 할당 추가
- Console → Microsoft Entra ID → 관리 → 역할 및 관리자 → 애플리케이션 관리자 → 할당 추가 → 이전 생성한 ‘MID’ 추가
SignInLogs 스토리지 컨테이너에 보관
- Console → Microsoft Entra ID → 모니터링 → 진단 설정 → 진단 설정 추가
API 사용권한 추가
- Console → Microsoft Entra ID → 관리 → 앱 등록 → 새 등록 → API 사용 권한 추가 → 권한 추가 → Microsoft Graph → 애플리케이션 사용 권한 → 권한 추가 후 관리자 동의 허용
- API 사용 권한 List (애플리케이션 유형만 추가)
- Application.ReadWrite.All - 애플리케이션
- AuditLog.Read.All - 애플리케이션
- Directory.ReadWrite.All - 애플리케이션
- User.Read - 위임됨
- User.ReadWrite.All - 애플리케이션
- API 사용 권한 List (애플리케이션 유형만 추가)
Function APP (함수 앱) 환경 변수 설정
‘클라이언트 ID’ 환경 변수로 추가
- Console → 관리 ID → '클라이언트 ID' 복사 → 함수 앱 → 설정 → 환경 변수 → 추가 → 이름/값 입력 후 적용 ‘MANAGED_IDENTITY_CLIENT’
‘Domain Name’ 환경 변수로 추가
- 도메인 배포 후 → ‘Domain Name’ 복사 → Console → 함수 앱 → 설정 → 환경 변수 → 추가 → 이름/값 입력 후 적용 'EMAIL_COMMUNICATION_SERVICE_DOMAIN'
‘연결 문자열’ 환경 변수로 추가
- Console → Communication Services → 설정 → 키 → 기본 키 → 연결 문자열 복사 → 함수 앱 → 설정 → 환경 변수 → 추가 → 이름/값 입력 후 적용 ‘ACS_CONNECTION_STRING’
‘구독 ID’ 환경 변수로 추가
- Console → 구독 → 개요 → ‘구독 ID’ 복사 → 함수 앱 → 설정 → 환경 변수 → 추가 → 이름/값 입력 후 적용 ‘AZURE_SUBS_ID’
‘E-mail’ 환경 변수로 추가
- Console → 함수 앱 → 설정 → 환경 변수 → 추가 → [알림 받을 메일] 값으로 입력 후 적용
‘Storage Account’ 액세스키 환경 변수로 추가
- Console → Storage Account → 보안 + 네트워킹 → 두 개의 키 중 하나의 문자열 선택하여 복사 → 'BLOB_CONNECTION_STRING'의 환경 변수로 추가
‘BLOB_CONTAINER_NAME_STRING’ 환경 변수로 추가
- Console → Storage Account → 데이터 스토리지 → 컨테이너 → [Sign-in-logs] 가 적재되는 컨테이너 이름 확인 → ‘BLOB_CONTAINER_NAME_STRING’의 환경 변수로 추가
‘Tenant_ID’ 환경 변수로 추가
- Console → Entra ID → 개요 → Tenant ID 복사 → ‘Tenant_ID’의 환경 변수로 추가
‘Client_ID’ 환경 변수로 추가
- Console → Entra ID → 관리 → 앱 등록 → API 권한 부여한 APP 선택 → 개요 → 애플리케이션(클라이언트)ID 복사
‘Client_Secret’ 환경 변수로 추가
- Console → Entra ID → 관리 → 앱 등록 → API 권한 부여한 APP 선택 → 관리 → 인증서 및 암호 → 클라이언트 비밀의 값 복사 (클라이언트 비밀 생성 시에만 일시적으로 표시되니 미리 저장)
계정 상태 확인
계정 비활성화 확인
- Console → Microsoft Entra ID → 관리 → 사용자 → 비활성한 계정
- 비활성화 통보 메일
계정 삭제 확인
- Console → Microsoft Entra ID → 관리 → 사용자 → 사용자 목록 확인
- isin 계정이 105일 경과되어 제거된 것을 확인할 수 있었음
- 삭제 통보 메일
Local Test
Code
import azure.functions as func
import logging
import requests
import json
import os
from datetime import datetime, timedelta, timezone
from azure.identity import ClientSecretCredential
from azure.core.exceptions import AzureError
from azure.communication.email import EmailClient
from azure.storage.blob import BlobServiceClient
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
notification_disable_days = {
"start": 75,
"second": 85,
"third": 87,
"last": 89,
"disabled": 90
}
notification_delete_days = {
"delete": 104,
"deleted": 105
}
disable_thresholds = [
notification_disable_days["start"],
notification_disable_days["second"],
notification_disable_days["third"],
notification_disable_days["last"]
]
delete_thresholds = [
notification_delete_days["delete"]
]
message_type = {
'PENDING_DISABLE': 'pending_disable',
'DISABLED': 'disabled',
'PENDING_DELETE': 'pending_delete',
'DELETED': 'deleted'
}
@app.route(route="console_test")
def console_test(req: func.HttpRequest) -> func.HttpResponse:
try:
token = get_token()
logging.info(f"Token Value: {token}")
# users = get_eid_users(token) # TODO: Comments for Test
# users_sign_in_log = get_eid_users_last_login(users) # TODO: Comments for Test
dummy_data = load_dummy_data("/Users/jaehun/azure-test/dummy_data.json") # TODO: Activate for Test
users_sign_in_log = adjust_end_dates(dummy_data)
need_notify_users = get_notification_users(token, users_sign_in_log)
if need_notify_users:
email_client = EmailClient.from_connection_string(os.getenv("ACS_CONNECTION_STRING"))
send_email_to_msp(need_notify_users, email_client)
for user in need_notify_users:
logging.info(f"Notify User: {user['displayName']}")
send_email_to_user(user, email_client)
if token:
return func.HttpResponse(
"Hello, Users. This HTTP triggered function executed successfully."
)
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Could not retrieve token.",
status_code=500,
)
except Exception as e:
logging.error(f"An error occurred: {e}")
return func.HttpResponse("An internal server error occurred.", status_code=500)
def get_token() -> str:
try:
tenant_id = os.getenv("TENANT_ID")
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
logging.info(f"Tenant ID: {tenant_id}")
if not tenant_id or not client_id or not client_secret:
raise ValueError(
"Tenant ID, Client ID, and Client Secret must be set in environment variables."
)
credential = ClientSecretCredential(
tenant_id=tenant_id, client_id=client_id, client_secret=client_secret
)
token = credential.get_token("https://graph.microsoft.com/.default").token
return token
except (ValueError, AzureError) as e:
logging.error(f"Error obtaining token: {e}")
return None
def get_eid_users(token):
graph_api_url = "https://graph.microsoft.com/v1.0/users"
graph_api_headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
users = []
while graph_api_url:
response = requests.get(graph_api_url, headers=graph_api_headers)
response.raise_for_status()
response_data = response.json()
users.extend(response_data.get("value", []))
graph_api_url = response_data.get("@odata.nextLink", None)
logging.info(f"Total users retrieved: {len(users)}")
logging.info(f"Users data: {users}")
filtered_users = [
{
"userId": user.get("id", "No User ID"),
"displayName": user.get("displayName", "No DisplayName"),
"mail": user.get("userPrincipalName", "No Email")
}
for user in users
if user.get("displayName") and user.get("userPrincipalName")
]
return filtered_users
def get_eid_users_last_login(filtered_users):
logging.info(f"Filtered Users: {filtered_users}")
blob_connection_string = os.getenv('BLOB_CONNECTION_STRING')
container_name = os.getenv('BLOB_CONTAINER_NAME_STRING')
if not blob_connection_string or not container_name:
raise ValueError("Blob connection string and container name must be set in environment variables.")
user_signed_list = []
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)
container_client = blob_service_client.get_container_client(container_name)
all_sign_in_logs = []
blob_list = container_client.list_blobs()
for blob in blob_list:
logging.info(f"Processing blob: {blob}")
logging.info(f"Blob Name: {blob.name}")
blob_client = container_client.get_blob_client(blob)
blob_data = blob_client.download_blob().readall()
try:
blob_data_str = blob_data.decode('utf-8')
logging.info(f"Blob Data: {blob_data_str[:1000]}")
for line in blob_data_str.splitlines():
try:
sign_in_log = json.loads(line)
all_sign_in_logs.append(sign_in_log)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON from blob {blob.name}, line: {line[:100]}: {e}")
except Exception as e:
logging.error(f"Unexpected error processing blob {blob.name}: {e}")
logging.info(f"Total Sign-In Logs Retrieved: {len(all_sign_in_logs)}")
azure_portal_logs = [log for log in all_sign_in_logs if log["properties"].get("appDisplayName") == "Azure Portal"]
logging.info(f"Total Azure Portal Logs Retrieved: {len(azure_portal_logs)}")
azure_portal_logs.sort(key=lambda x: x["properties"]["createdDateTime"], reverse=True)
for log in azure_portal_logs:
if len(filtered_users) == len(user_signed_list):
break
user_display_name = log["properties"]["userDisplayName"]
if user_display_name not in [user["displayName"] for user in user_signed_list]:
matched_user = next((user for user in filtered_users if user["displayName"] == user_display_name), None)
if matched_user:
user_signed_list.append(
{
"userId": matched_user["userId"],
"displayName": user_display_name,
"mail": matched_user["mail"],
"lastSignIn": log["properties"].get("createdDateTime"),
}
)
logging.info(f"User Signed List: {user_signed_list}")
return user_signed_list
def get_notification_users(token, signed_users):
need_notify_users = []
for user in signed_users:
user_email = user["mail"]
user_display_name = user["displayName"]
user["displayName"] = user_display_name
last_signed_date = datetime.fromisoformat(
user["lastSignIn"].replace("Z", "+00:00")
)
print(f"Secret: {user_display_name} (ID: {user_email})")
print(f"Expiry Date: {last_signed_date}")
current_date_kst = datetime.now(timezone.utc) + timedelta(hours=9)
days_from_last_login = (current_date_kst - last_signed_date).days
if days_from_last_login < notification_disable_days["start"]:
logging.info(f"Not Filtered Days: {user_email}, {user_display_name}, {days_from_last_login}")
continue
if days_from_last_login == notification_disable_days["disabled"]:
logging.info(f"Step 1: Disabled Days: {user_email}, {user_display_name}, {days_from_last_login}")
disable_eid_account(token, user["userId"])
user["notificationType"] = message_type["DISABLED"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
continue
if days_from_last_login == notification_delete_days["deleted"]:
logging.info(f"Step 2: Deleted Days: {user_email}, {user_display_name}, {days_from_last_login}")
delete_eid_account(token, user["userId"])
user["notificationType"] = message_type["DELETED"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
continue
if days_from_last_login > notification_disable_days["disabled"]:
logging.info(f"Step 3: Surround Days: {user_email}, {user_display_name}, {days_from_last_login}")
for key,value in notification_delete_days.items():
if days_from_last_login == value:
logging.info(f"Step 3.1: Surround Fit Days: {user_email}, {user_display_name}, {days_from_last_login}")
user["notificationType"] = message_type["PENDING_DELETE"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
else:
logging.info(f"Step 4: Surround Days: {user_email}, {user_display_name}, {days_from_last_login}")
for key,value in notification_disable_days.items():
if days_from_last_login == value:
logging.info(f"Step 4.1: Surround Fit Days: {user_email}, {user_display_name}, {days_from_last_login}")
user["notificationType"] = message_type["PENDING_DISABLE"]
user["daysFromLastLogin"] = days_from_last_login
need_notify_users.append(user)
logging.info(f"Notify Users:{need_notify_users}")
return need_notify_users
def disable_eid_account(token, user_id):
graph_api_url = f"https://graph.microsoft.com/v1.0/users/{user_id}"
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
body = {"accountEnabled": False}
response = requests.patch(graph_api_url, headers=headers, json=body)
try:
response.raise_for_status()
logging.info(f"Successfully disabled account for user ID {user_id}.")
if response.content:
return response.json()
else:
return None
except requests.exceptions.HTTPError as error:
logging.error(
f"Failed to disable account for user ID {user_id}. Error: {error}"
)
return None
def delete_eid_account(token, user_id):
graph_api_url = f"https://graph.microsoft.com/v1.0/users/{user_id}"
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
response = requests.delete(graph_api_url, headers=headers)
try:
response.raise_for_status()
logging.info(f"Successfully deleted account for user ID {user_id}.")
if response.content:
return response.json()
else:
return None
except requests.exceptions.HTTPError as error:
logging.error(f"Failed to delete account for user ID {user_id}. Error: {error}")
return None
def load_dummy_data(filepath: str):
try:
with open(filepath, 'r', encoding='utf-8') as file:
data = json.load(file)
logging.info(f"Dummy data loaded successfully from {filepath}")
return data
except Exception as e:
logging.error(f"Error loading dummy data from {filepath}: {e}")
return []
def send_email_to_user(user, email_client):
subject = f"Notification for {user['displayName']}"
content = get_notification_message(user["notificationType"], user["displayName"], user["userId"], user["daysFromLastLogin"])
sender = os.getenv("EMAIL_COMMUNICATION_SERVICE_DOMAIN")
recipient_email = user["mail"]
msp_email = os.getenv("EMAIL_MSP")
messages = {
"content": {"subject": subject, "plainText": content["value"]},
"recipients": {
"to": [{"address": recipient_email, "displayName": user["displayName"]}],
"cc": [{"address": msp_email, "displayName": user["displayName"]}],
},
"senderAddress": sender,
}
try:
email_client.begin_send(messages)
logging.info(f"Email sent to {recipient_email} successfully.")
except Exception as e:
logging.error(f"Failed to send email to {recipient_email}: {e}")
def send_email_to_msp(users, email_client):
subject = "Azure EID 계정 상태 알림"
content = get_notification_message_to_msp(users)
sender = os.getenv("EMAIL_COMMUNICATION_SERVICE_DOMAIN")
msp_email = os.getenv("EMAIL_MSP")
messages = {
"content": {"subject": subject, "plainText": content},
"recipients": {
"to": [{"address": msp_email, "displayName": "MSP Administrator"}]
},
"senderAddress": sender,
}
try:
email_client.begin_send(messages)
logging.info(f"Email sent to MSP successfully.")
except Exception as e:
logging.error(f"Failed to send email to MSP: {e}")
def get_notification_message(noti_type, display_name, user_id, days_last_login):
logging.info(f"NOTI_TYPE:{noti_type}")
if noti_type == message_type["PENDING_DISABLE"]:
remain_days = notification_disable_days["disabled"] - days_last_login
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 {remain_days}일 뒤에 비활성화 될 예정입니다.
- {user_id}
또한, 비활성화 된 계정은 15일 뒤 삭제될 예정이니 업무에 참고 부탁드리겠습니다.
관련하여 계정 업데이트가 필요하시다면 편하신 방법으로 회신 주시면 감사하겠습니다.
감사합니다.
"""
}
elif noti_type == message_type["DISABLED"]:
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 금일 비활성화 되었습니다.
- {user_id}
비활성화된 계정은 15일 뒤 삭제될 예정이니 업무에 참고 부탁드리겠습니다.
계정 활성화가 필요하시다면 편하신 방법으로 회신 주시면 감사하겠습니다.
감사합니다.
"""
}
elif noti_type == message_type["PENDING_DELETE"]:
remain_days = notification_delete_days["deleted"] - days_last_login
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 {remain_days}일 뒤에 삭제될 예정입니다.
- {user_id}
관련하여 계정 업데이트가 필요하시다면 편하신 방법으로 회신 주시면 감사하겠습니다.
감사합니다.
"""
}
else:
message = {
"value": f"""안녕하세요, {display_name} 앱 사용자님.
하기 고객님 Azure EID 계정이 90일 미사용 정책에 따라 삭제되었습니다.
- {user_id}
신규 계정 발급이 필요하시다면 결제 통해 요청 주시면 감사하겠습니다.
감사합니다.
"""
}
logging.info(f"MESSAGES:{message}")
return message
def get_notification_message_to_msp(users):
message = "안녕하세요, Hybrid Cloud 운영자님.\n\n다음 사용자들의 Azure EID 계정 상태를 확인해 주세요:\n\n"
for user in users:
display_name = user["displayName"]
user_id = user["userId"]
noti_type = user["notificationType"]
days_last_login = user.get("daysFromLastLogin", 0)
if noti_type == message_type["PENDING_DISABLE"]:
remain_days = notification_disable_days["disabled"] - days_last_login
user_message = f"{display_name} - {user_id} (비활성화 예정: {remain_days}일 남음)\n"
elif noti_type == message_type["DISABLED"]:
user_message = f"{display_name} - {user_id} (비활성화됨)\n"
elif noti_type == message_type["PENDING_DELETE"]:
remain_days = notification_delete_days["deleted"] - days_last_login
user_message = f"{display_name} - {user_id} (삭제 예정: {remain_days}일 남음)\n"
else: # message_type["DELETED"]
user_message = f"{display_name} - {user_id} (삭제됨)\n"
message += user_message
message += "\n관련하여 사용자들에게도 알람이 갔으니 업무에 참고 부탁드립니다.\n\n감사합니다."
return message
def adjust_end_dates(data):
current_time = datetime.now(timezone.utc)
adjustment_days = [75, 85, 87, 89, 90, 104, 105]
for idx, user in enumerate(data):
if idx < len(adjustment_days):
last_sign_in_time = current_time - timedelta(days=adjustment_days[idx])
logging.info(f"Adjusting lastSignIn for user {user['displayName']} by {adjustment_days[idx]} days to {last_sign_in_time}")
user['lastSignIn'] = last_sign_in_time.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
logging.warning(f"Not enough adjustment days provided for user {user['displayName']}.")
return data
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "",
"FUNCTIONS_WORKER_RUNTIME": "python",
"ACS_CONNECTION_STRING": "endpoint=https://console-azure-acs.korea.communication.azure.com/;accesskey=BbiPVK2gcsLj8RMMkPQVbhM8Ce443cjoaTRb0MefQBQS1jmCtet5JQQJ99AJACULyCpW5m4HAAAAAZCSbyq3",
"AZURE_SUBS_ID": "5e22e437-d178-4e96-b21e-2a21eaac3e1f",
"EMAIL_COMMUNICATION_SERVICE_DOMAIN": "DoNotReply@3d2efe57-4dbc-4139-b349-b7e97bc811d3.azurecomm.net",
"EMAIL_MSP": "jhkwon@ezcom.co.kr",
"MANAGED_IDENTITY_CLIENT_ID": "5c27dcad-d807-4610-8212-4805596b6665",
"BLOB_CONNECTION_STRING": "DefaultEndpointsProtocol=https;AccountName=consoletest1023;AccountKey=8U+6S6OEHiMI8qIY7fqElyRrwfVaeQ96oYVGKBg6q7xwklVdd5/7ef8PlyV7ImRxln7dlm1n+l54+AStTtB3Rw==;EndpointSuffix=core.windows.net",
"BLOB_CONTAINER_NAME_STRING": "insights-logs-signinlogs",
"TENANT_ID": "1b071189-0ecb-4f7a-b453-9457c489fdde",
"CLIENT_ID": "337c374f-b542-4a38-b230-6fe51d65b743",
"CLIENT_SECRET": "Szk8Q~aQZtrb0FaWsS9oFsnQOowK2zvc86sRRb0M"
}
}
[
{
"userId": "1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6",
"displayName": "User 1",
"mail": "user1@example.com",
"lastSignIn": "2024-09-20T01:38:27.629969+00:00"
},
{
"userId": "2b3c4d5e-6f7g8h-9i0j-k1l2m3n4o5p7",
"displayName": "User 2",
"mail": "user2@example.com",
"lastSignIn": "2024-09-05T01:38:27.629969+00:00"
},
{
"userId": "3c4d5e6f-7g8h9i-0jk1l-2m3n4o5p8",
"displayName": "User 3",
"mail": "user3@example.com",
"lastSignIn": "2024-08-25T01:38:27.629969+00:00"
},
{
"userId": "4d5e6f7g-8h9i0j-k1l2m3-n4o5p9",
"displayName": "User 4",
"mail": "user4@example.com",
"lastSignIn": "2024-08-23T01:38:27.629969+00:00"
},
{
"userId": "5e6f7g8h-9i0jk1-l2m3n4-o5p0a",
"displayName": "User 5",
"mail": "user5@example.com",
"lastSignIn": "2024-08-21T01:38:27.629969+00:00"
},
{
"userId": "6f7g8h9i-0jk1l2-m3n4o5-p0a1b",
"displayName": "User 6",
"mail": "user6@example.com",
"lastSignIn": "2024-08-20T01:38:27.629969+00:00"
},
{
"userId": "7g8h9i0j-k1l2m3-n4o5p0-a1b2c",
"displayName": "User 7",
"mail": "user7@example.com",
"lastSignIn": "2024-08-06T01:38:27.629969+00:00"
},
{
"userId": "8h9i0jk1-l2m3n4-o5p0a1-b2c3d",
"displayName": "User 8",
"mail": "user8@example.com",
"lastSignIn": "2024-08-05T01:38:27.629969+00:00"
}
]
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues
azure-functions
azure-identity
azure-communication-email
azure-storage-blob
requests
Result
- 운영자 수신 메일
- 사용자 수신 메일
Trouble Shooting
VSC Azure Extension Subscription Setting
- VSC(Visual Studio Code) → Azure Extension → 왼쪽 하단의 계정 → 기존 계정 로그아웃
- 상기 과정을 수행하지 않을 경우 다른 Subscriptions에 접근 불가
- 참고 Link - https://code.visualstudio.com/docs/remote/tunnels#_using-the-vs-code-ui
- Enable Tenant
- az login --tenant "<Tenant ID>" az login # Select Tenant
- Subscription Sign
- Shift + Cmd + p Azure: Sign out Azure: Sign in
- Account
- az account clear # Clear Account az account show # Look Up Account az account set --subscription "<subscription>" # Subscription Set