Một thứ Hai đầu quý: HR gửi danh sách 47 dev mới onboard, mỗi người cần access Kibana với role tuỳ team. Cách “manual”: click chuột 47 lần trong Stack Management, dễ sai role, mất 2 giờ và cảm giác trống rỗng. Cách “API”: chạy script 30 giây, log lại ai được tạo, ai fail, có audit trail.
Bài này không nói về API auth (đã có ở bài 1) mà tập trung automation patterns: pagination, retry, idempotency, bulk operations. Những thứ phân biệt một script đẹp chạy đúng một lần với một tool dùng được hàng tuần.
Mục tiêu bài:
- Pattern auth + retry + idempotency cho mọi script
- Tạo bulk user và role qua API
- Mass update dashboard (sửa cùng một field cho 20 dashboard)
- Wrap API thành CLI Python nội bộ
- Tránh rate limit và xử lý lỗi partial failure
Phần 1: Framework chung
Mọi script automation Kibana nên có chung khung này:
import os
import time
import requests
from typing import Any
class KibanaClient:
def __init__(self):
self.url = os.environ['KIBANA_URL'].rstrip('/')
self.api_key = os.environ['KIBANA_API_KEY']
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'ApiKey {self.api_key}',
'kbn-xsrf': 'true',
'Content-Type': 'application/json',
})
def request(self, method: str, path: str, retries: int = 3, **kwargs) -> Any:
url = f'{self.url}{path}'
for attempt in range(retries):
try:
resp = self.session.request(method, url, timeout=30, **kwargs)
if resp.status_code == 429:
wait = int(resp.headers.get('Retry-After', 2 ** attempt))
time.sleep(wait)
continue
if resp.status_code >= 500:
time.sleep(2 ** attempt)
continue
resp.raise_for_status()
return resp.json() if resp.text else {}
except requests.RequestException as e:
if attempt == retries - 1:
raise
time.sleep(2 ** attempt)
raise RuntimeError(f'Max retries exceeded: {method} {path}')
Ba điểm cốt lõi:
- Single session: reuse TCP connection, nhanh hơn nhiều cho bulk request
- Retry với exponential backoff: handle 5xx và 429 tự động
- Timeout rõ ràng: không treo session
Phần 2: Bulk user creation
Kibana 8.x dùng ES native user management. Tạo user qua ES API thay vì Kibana API:
def create_user(client: KibanaClient, username: str, email: str,
full_name: str, roles: list[str], password: str) -> dict:
return client.request(
'POST',
f'/api/security/user/{username}',
json={
'username': username,
'email': email,
'full_name': full_name,
'roles': roles,
'password': password,
}
)
Bulk wrapper với idempotency:
def bulk_create_users(client: KibanaClient, users: list[dict]) -> dict:
results = {'created': [], 'updated': [], 'failed': []}
for user in users:
try:
existing = check_user_exists(client, user['username'])
if existing:
client.request(
'POST',
f'/api/security/user/{user["username"]}',
json={**user, 'password': None}
)
results['updated'].append(user['username'])
else:
create_user(client, **user)
results['created'].append(user['username'])
except Exception as e:
results['failed'].append({
'username': user['username'],
'error': str(e),
})
return results
def check_user_exists(client: KibanaClient, username: str) -> bool:
try:
client.request('GET', f'/api/security/user/{username}')
return True
except requests.HTTPError as e:
if e.response.status_code == 404:
return False
raise
Idempotency là then chốt: chạy lại script 5 lần không tạo duplicate user, không corrupt data, chỉ update field cần update.
Pattern: read user từ CSV
import csv
def load_users_csv(path: str) -> list[dict]:
with open(path) as f:
return list(csv.DictReader(f))
users = load_users_csv('onboarding-q2.csv')
client = KibanaClient()
result = bulk_create_users(client, users)
print(f"Created: {len(result['created'])}")
print(f"Updated: {len(result['updated'])}")
print(f"Failed: {len(result['failed'])}")
for fail in result['failed']:
print(f" - {fail['username']}: {fail['error']}")
CSV format:
username,email,full_name,roles,password
alice.nguyen,alice@example.com,Alice Nguyen,"kibana_user,logs_reader",ChangeMe!2026
bob.tran,bob@example.com,Bob Tran,"kibana_user,metrics_reader",ChangeMe!2026
Lưu ý: password ban đầu phải được force-change ngay khi user login lần đầu. Pattern an toàn: gen password ngẫu nhiên, gửi qua kênh secure (Vault, 1Password), không log ra console.
Phần 3: Mass dashboard update
Use case thực tế: 25 dashboard reference data view app-logs-* đang dùng pattern cũ application-logs-*. Đổi từng cái qua UI = 2 giờ. Mass update qua API = 5 phút.
Approach: load tất cả dashboard, parse JSON, replace, save lại.
def find_objects(client: KibanaClient, object_type: str) -> list[dict]:
"""List tất cả saved objects của 1 type, có pagination."""
objects = []
page = 1
per_page = 100
while True:
result = client.request(
'GET',
f'/api/saved_objects/_find?type={object_type}&per_page={per_page}&page={page}'
)
objects.extend(result['saved_objects'])
if len(result['saved_objects']) < per_page:
break
page += 1
return objects
def update_dashboard(client: KibanaClient, dashboard_id: str, attributes: dict) -> dict:
return client.request(
'PUT',
f'/api/saved_objects/dashboard/{dashboard_id}',
json={'attributes': attributes}
)
Bulk update với reference rewrite:
def rewrite_data_view_references(client: KibanaClient,
old_pattern: str,
new_pattern: str) -> dict:
"""Rewrite data view reference trong tất cả dashboard."""
results = {'updated': [], 'skipped': [], 'failed': []}
dashboards = find_objects(client, 'dashboard')
for dash in dashboards:
refs = dash.get('references', [])
modified = False
for ref in refs:
if ref['type'] == 'index-pattern' and ref['name'] == old_pattern:
ref['name'] = new_pattern
modified = True
if not modified:
results['skipped'].append(dash['id'])
continue
try:
client.request(
'PUT',
f'/api/saved_objects/dashboard/{dash["id"]}',
json={
'attributes': dash['attributes'],
'references': refs,
}
)
results['updated'].append(dash['id'])
except Exception as e:
results['failed'].append({
'id': dash['id'],
'error': str(e),
})
return results
Pre-flight check trước khi chạy: export tất cả dashboard ra NDJSON làm backup. Một command:
curl -sS -H "Authorization: ApiKey ${KEY}" \
-H "kbn-xsrf: true" \
-X POST "${URL}/api/saved_objects/_export" \
-d '{"type":"dashboard","includeReferencesDeep":true}' \
> backup-$(date +%Y%m%d-%H%M%S).ndjson
Rollback nếu script bị bug: re-import file backup này.
Phần 4: Wrap thành CLI
Khi script được dùng đi dùng lại, wrap thành CLI nội bộ với Click hoặc Typer:
import typer
app = typer.Typer()
@app.command()
def create_users(csv_path: str = typer.Option(...)):
"""Bulk create user từ CSV."""
users = load_users_csv(csv_path)
client = KibanaClient()
result = bulk_create_users(client, users)
typer.echo(f"Created {len(result['created'])}, "
f"updated {len(result['updated'])}, "
f"failed {len(result['failed'])}")
@app.command()
def rewrite_refs(old: str = typer.Option(...),
new: str = typer.Option(...),
dry_run: bool = False):
"""Rewrite data view references in dashboards."""
client = KibanaClient()
if dry_run:
dashboards = find_objects(client, 'dashboard')
affected = [d['id'] for d in dashboards
for r in d.get('references', [])
if r['type'] == 'index-pattern' and r['name'] == old]
typer.echo(f"Would update {len(affected)} dashboards: {affected}")
return
result = rewrite_data_view_references(client, old, new)
typer.echo(f"Updated: {len(result['updated'])}")
@app.command()
def export_all(output_dir: str = typer.Option(...)):
"""Export tất cả saved objects ra NDJSON theo type."""
client = KibanaClient()
for obj_type in ['dashboard', 'lens', 'visualization', 'search']:
result = client.request(
'POST',
'/api/saved_objects/_export',
json={'type': obj_type, 'includeReferencesDeep': False},
)
path = f'{output_dir}/{obj_type}.ndjson'
with open(path, 'wb') as f:
f.write(result if isinstance(result, bytes) else str(result).encode())
typer.echo(f'Exported {obj_type} -> {path}')
if __name__ == '__main__':
app()
Setup pyproject.toml để package:
[project]
name = "kbn-cli"
version = "0.1.0"
dependencies = ["requests", "typer"]
[project.scripts]
kbn = "kbn_cli:app"
Install với pip install -e .. Từ đó dùng kbn create-users --csv-path users.csv.
Phần 5: Pattern advanced
Pagination cho list lớn
API _find có giới hạn per_page (default 20, max 10000). Nhưng đừng dùng max ngay, dễ timeout. Pattern an toàn:
def paginate(client: KibanaClient, endpoint: str, per_page: int = 100):
page = 1
while True:
result = client.request(
'GET',
f'{endpoint}&per_page={per_page}&page={page}',
)
items = result.get('saved_objects', [])
if not items:
break
yield from items
if len(items) < per_page:
break
page += 1
Throttling tránh rate limit
Kibana không có rate limit explicit, nhưng Elasticsearch dưới gầm có. Pattern an toàn:
import time
class ThrottledClient(KibanaClient):
def __init__(self, rps: float = 10):
super().__init__()
self.min_interval = 1.0 / rps
self.last_request = 0.0
def request(self, *args, **kwargs):
elapsed = time.time() - self.last_request
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_request = time.time()
return super().request(*args, **kwargs)
10 RPS là default an toàn. Tăng khi đã đo thấy ES không stress.
Partial failure handling
Bulk operation có thể success một phần. Pattern:
def bulk_with_recovery(items, operation, max_failures=5):
results = []
failures = []
for item in items:
try:
r = operation(item)
results.append(r)
except Exception as e:
failures.append((item, e))
if len(failures) > max_failures:
raise RuntimeError(
f'Too many failures ({len(failures)}), aborting. '
f'Last error: {e}'
)
return results, failures
Abort sớm nếu fail rate cao. Không cố chạy đến hết khi rõ ràng có bug.
Phần 6: Pitfall thực tế
Pitfall 1: API key không có permission tạo user
API key tạo qua Kibana UI mặc định kế thừa quyền của user tạo. Nếu user đó không phải superuser, key không tạo được user.
Fix: tạo key qua ES API với role descriptor đủ:
curl -X POST -u elastic:password "${ES_URL}/_security/api_key" \
-d '{
"name": "user-management-script",
"expiration": "30d",
"role_descriptors": {
"user_admin": {
"cluster": ["manage_security"]
}
}
}'
Pitfall 2: Saved object update mất reference
Tôi từng PUT lại dashboard với attributes mới nhưng quên send references. Kết quả: dashboard mất hết lens, hiện trắng tinh.
Fix: PUT phải gửi full object bao gồm references hiện có. Pattern an toàn:
current = client.request('GET', f'/api/saved_objects/dashboard/{id}')
current['attributes']['title'] = 'New Title'
client.request(
'PUT',
f'/api/saved_objects/dashboard/{id}',
json={
'attributes': current['attributes'],
'references': current['references'],
}
)
Pitfall 3: Concurrent update conflict
Hai script cùng update một dashboard. Một fail với error version_conflict. Kibana dùng optimistic locking qua field version.
Fix: catch lỗi và retry với fresh version:
def update_with_retry(client, dashboard_id, mutate_fn, retries=3):
for attempt in range(retries):
current = client.request('GET', f'/api/saved_objects/dashboard/{dashboard_id}')
new_attrs = mutate_fn(current['attributes'])
try:
return client.request(
'PUT',
f'/api/saved_objects/dashboard/{dashboard_id}',
json={
'attributes': new_attrs,
'references': current['references'],
'version': current['version'],
}
)
except requests.HTTPError as e:
if e.response.status_code == 409 and attempt < retries - 1:
time.sleep(2 ** attempt)
continue
raise
Cheatsheet
| Việc | Endpoint | Method |
|---|---|---|
| List user | /api/security/user | GET |
| Get user | /api/security/user/{name} | GET |
| Create/update user | /api/security/user/{name} | POST |
| Delete user | /api/security/user/{name} | DELETE |
| List role | /api/security/role | GET |
| Create role | /api/security/role/{name} | PUT |
| Find saved objects | /api/saved_objects/_find?type={t} | GET |
| Bulk get | /api/saved_objects/_bulk_get | POST |
| Bulk delete | /api/saved_objects/_bulk_delete | POST |
| Export | /api/saved_objects/_export | POST |
| Import | /api/saved_objects/_import | POST (multipart) |
Lời kết
Mỗi tác vụ Kibana lặp lại quá 3 lần là ứng viên cho automation. Script không cần đẹp ngay, cần đúng và idempotent. Sau vài tuần dùng nó sẽ tiến hoá thành CLI nội bộ cho cả team.
Bài tiếp theo trong series Kibana từ A đến Z sẽ đi xa hơn: dùng Terraform để quản lý saved objects, alert rules và connectors như infrastructure. Khi đã quen với REST API, Terraform provider Kibana là bước tự nhiên kế tiếp, cho phép declarative config thay vì imperative script.