Một thứ Hai đầu quý: HR gửi danh sách 47 dev mới onboard, mỗi người cần access Kibana với role tuỳ team. Cách “manual”: click chuột 47 lần trong Stack Management, dễ sai role, mất 2 giờ và cảm giác trống rỗng. Cách “API”: chạy script 30 giây, log lại ai được tạo, ai fail, có audit trail.

Bài này không nói về API auth (đã có ở bài 1) mà tập trung automation patterns: pagination, retry, idempotency, bulk operations. Những thứ phân biệt một script đẹp chạy đúng một lần với một tool dùng được hàng tuần.

Mục tiêu bài:

  • Pattern auth + retry + idempotency cho mọi script
  • Tạo bulk user và role qua API
  • Mass update dashboard (sửa cùng một field cho 20 dashboard)
  • Wrap API thành CLI Python nội bộ
  • Tránh rate limit và xử lý lỗi partial failure

Phần 1: Framework chung

Mọi script automation Kibana nên có chung khung này:

import os
import time
import requests
from typing import Any

class KibanaClient:
    def __init__(self):
        self.url = os.environ['KIBANA_URL'].rstrip('/')
        self.api_key = os.environ['KIBANA_API_KEY']
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'ApiKey {self.api_key}',
            'kbn-xsrf': 'true',
            'Content-Type': 'application/json',
        })

    def request(self, method: str, path: str, retries: int = 3, **kwargs) -> Any:
        url = f'{self.url}{path}'
        for attempt in range(retries):
            try:
                resp = self.session.request(method, url, timeout=30, **kwargs)
                if resp.status_code == 429:
                    wait = int(resp.headers.get('Retry-After', 2 ** attempt))
                    time.sleep(wait)
                    continue
                if resp.status_code >= 500:
                    time.sleep(2 ** attempt)
                    continue
                resp.raise_for_status()
                return resp.json() if resp.text else {}
            except requests.RequestException as e:
                if attempt == retries - 1:
                    raise
                time.sleep(2 ** attempt)
        raise RuntimeError(f'Max retries exceeded: {method} {path}')

Ba điểm cốt lõi:

  1. Single session: reuse TCP connection, nhanh hơn nhiều cho bulk request
  2. Retry với exponential backoff: handle 5xx và 429 tự động
  3. Timeout rõ ràng: không treo session

Phần 2: Bulk user creation

Kibana 8.x dùng ES native user management. Tạo user qua ES API thay vì Kibana API:

def create_user(client: KibanaClient, username: str, email: str,
                full_name: str, roles: list[str], password: str) -> dict:
    return client.request(
        'POST',
        f'/api/security/user/{username}',
        json={
            'username': username,
            'email': email,
            'full_name': full_name,
            'roles': roles,
            'password': password,
        }
    )

Bulk wrapper với idempotency:

def bulk_create_users(client: KibanaClient, users: list[dict]) -> dict:
    results = {'created': [], 'updated': [], 'failed': []}

    for user in users:
        try:
            existing = check_user_exists(client, user['username'])
            if existing:
                client.request(
                    'POST',
                    f'/api/security/user/{user["username"]}',
                    json={**user, 'password': None}
                )
                results['updated'].append(user['username'])
            else:
                create_user(client, **user)
                results['created'].append(user['username'])
        except Exception as e:
            results['failed'].append({
                'username': user['username'],
                'error': str(e),
            })

    return results

def check_user_exists(client: KibanaClient, username: str) -> bool:
    try:
        client.request('GET', f'/api/security/user/{username}')
        return True
    except requests.HTTPError as e:
        if e.response.status_code == 404:
            return False
        raise

Idempotency là then chốt: chạy lại script 5 lần không tạo duplicate user, không corrupt data, chỉ update field cần update.

Pattern: read user từ CSV

import csv

def load_users_csv(path: str) -> list[dict]:
    with open(path) as f:
        return list(csv.DictReader(f))

users = load_users_csv('onboarding-q2.csv')
client = KibanaClient()
result = bulk_create_users(client, users)

print(f"Created: {len(result['created'])}")
print(f"Updated: {len(result['updated'])}")
print(f"Failed: {len(result['failed'])}")
for fail in result['failed']:
    print(f"  - {fail['username']}: {fail['error']}")

CSV format:

username,email,full_name,roles,password
alice.nguyen,alice@example.com,Alice Nguyen,"kibana_user,logs_reader",ChangeMe!2026
bob.tran,bob@example.com,Bob Tran,"kibana_user,metrics_reader",ChangeMe!2026

Lưu ý: password ban đầu phải được force-change ngay khi user login lần đầu. Pattern an toàn: gen password ngẫu nhiên, gửi qua kênh secure (Vault, 1Password), không log ra console.

Phần 3: Mass dashboard update

Use case thực tế: 25 dashboard reference data view app-logs-* đang dùng pattern cũ application-logs-*. Đổi từng cái qua UI = 2 giờ. Mass update qua API = 5 phút.

Approach: load tất cả dashboard, parse JSON, replace, save lại.

def find_objects(client: KibanaClient, object_type: str) -> list[dict]:
    """List tất cả saved objects của 1 type, có pagination."""
    objects = []
    page = 1
    per_page = 100

    while True:
        result = client.request(
            'GET',
            f'/api/saved_objects/_find?type={object_type}&per_page={per_page}&page={page}'
        )
        objects.extend(result['saved_objects'])
        if len(result['saved_objects']) < per_page:
            break
        page += 1

    return objects

def update_dashboard(client: KibanaClient, dashboard_id: str, attributes: dict) -> dict:
    return client.request(
        'PUT',
        f'/api/saved_objects/dashboard/{dashboard_id}',
        json={'attributes': attributes}
    )

Bulk update với reference rewrite:

def rewrite_data_view_references(client: KibanaClient,
                                   old_pattern: str,
                                   new_pattern: str) -> dict:
    """Rewrite data view reference trong tất cả dashboard."""
    results = {'updated': [], 'skipped': [], 'failed': []}

    dashboards = find_objects(client, 'dashboard')

    for dash in dashboards:
        refs = dash.get('references', [])
        modified = False

        for ref in refs:
            if ref['type'] == 'index-pattern' and ref['name'] == old_pattern:
                ref['name'] = new_pattern
                modified = True

        if not modified:
            results['skipped'].append(dash['id'])
            continue

        try:
            client.request(
                'PUT',
                f'/api/saved_objects/dashboard/{dash["id"]}',
                json={
                    'attributes': dash['attributes'],
                    'references': refs,
                }
            )
            results['updated'].append(dash['id'])
        except Exception as e:
            results['failed'].append({
                'id': dash['id'],
                'error': str(e),
            })

    return results

Pre-flight check trước khi chạy: export tất cả dashboard ra NDJSON làm backup. Một command:

curl -sS -H "Authorization: ApiKey ${KEY}" \
  -H "kbn-xsrf: true" \
  -X POST "${URL}/api/saved_objects/_export" \
  -d '{"type":"dashboard","includeReferencesDeep":true}' \
  > backup-$(date +%Y%m%d-%H%M%S).ndjson

Rollback nếu script bị bug: re-import file backup này.

Phần 4: Wrap thành CLI

Khi script được dùng đi dùng lại, wrap thành CLI nội bộ với Click hoặc Typer:

import typer

app = typer.Typer()

@app.command()
def create_users(csv_path: str = typer.Option(...)):
    """Bulk create user từ CSV."""
    users = load_users_csv(csv_path)
    client = KibanaClient()
    result = bulk_create_users(client, users)
    typer.echo(f"Created {len(result['created'])}, "
               f"updated {len(result['updated'])}, "
               f"failed {len(result['failed'])}")

@app.command()
def rewrite_refs(old: str = typer.Option(...),
                 new: str = typer.Option(...),
                 dry_run: bool = False):
    """Rewrite data view references in dashboards."""
    client = KibanaClient()

    if dry_run:
        dashboards = find_objects(client, 'dashboard')
        affected = [d['id'] for d in dashboards
                    for r in d.get('references', [])
                    if r['type'] == 'index-pattern' and r['name'] == old]
        typer.echo(f"Would update {len(affected)} dashboards: {affected}")
        return

    result = rewrite_data_view_references(client, old, new)
    typer.echo(f"Updated: {len(result['updated'])}")

@app.command()
def export_all(output_dir: str = typer.Option(...)):
    """Export tất cả saved objects ra NDJSON theo type."""
    client = KibanaClient()
    for obj_type in ['dashboard', 'lens', 'visualization', 'search']:
        result = client.request(
            'POST',
            '/api/saved_objects/_export',
            json={'type': obj_type, 'includeReferencesDeep': False},
        )
        path = f'{output_dir}/{obj_type}.ndjson'
        with open(path, 'wb') as f:
            f.write(result if isinstance(result, bytes) else str(result).encode())
        typer.echo(f'Exported {obj_type} -> {path}')

if __name__ == '__main__':
    app()

Setup pyproject.toml để package:

[project]
name = "kbn-cli"
version = "0.1.0"
dependencies = ["requests", "typer"]

[project.scripts]
kbn = "kbn_cli:app"

Install với pip install -e .. Từ đó dùng kbn create-users --csv-path users.csv.

Phần 5: Pattern advanced

Pagination cho list lớn

API _find có giới hạn per_page (default 20, max 10000). Nhưng đừng dùng max ngay, dễ timeout. Pattern an toàn:

def paginate(client: KibanaClient, endpoint: str, per_page: int = 100):
    page = 1
    while True:
        result = client.request(
            'GET',
            f'{endpoint}&per_page={per_page}&page={page}',
        )
        items = result.get('saved_objects', [])
        if not items:
            break
        yield from items
        if len(items) < per_page:
            break
        page += 1

Throttling tránh rate limit

Kibana không có rate limit explicit, nhưng Elasticsearch dưới gầm có. Pattern an toàn:

import time

class ThrottledClient(KibanaClient):
    def __init__(self, rps: float = 10):
        super().__init__()
        self.min_interval = 1.0 / rps
        self.last_request = 0.0

    def request(self, *args, **kwargs):
        elapsed = time.time() - self.last_request
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self.last_request = time.time()
        return super().request(*args, **kwargs)

10 RPS là default an toàn. Tăng khi đã đo thấy ES không stress.

Partial failure handling

Bulk operation có thể success một phần. Pattern:

def bulk_with_recovery(items, operation, max_failures=5):
    results = []
    failures = []

    for item in items:
        try:
            r = operation(item)
            results.append(r)
        except Exception as e:
            failures.append((item, e))
            if len(failures) > max_failures:
                raise RuntimeError(
                    f'Too many failures ({len(failures)}), aborting. '
                    f'Last error: {e}'
                )

    return results, failures

Abort sớm nếu fail rate cao. Không cố chạy đến hết khi rõ ràng có bug.

Phần 6: Pitfall thực tế

Pitfall 1: API key không có permission tạo user

API key tạo qua Kibana UI mặc định kế thừa quyền của user tạo. Nếu user đó không phải superuser, key không tạo được user.

Fix: tạo key qua ES API với role descriptor đủ:

curl -X POST -u elastic:password "${ES_URL}/_security/api_key" \
  -d '{
    "name": "user-management-script",
    "expiration": "30d",
    "role_descriptors": {
      "user_admin": {
        "cluster": ["manage_security"]
      }
    }
  }'

Pitfall 2: Saved object update mất reference

Tôi từng PUT lại dashboard với attributes mới nhưng quên send references. Kết quả: dashboard mất hết lens, hiện trắng tinh.

Fix: PUT phải gửi full object bao gồm references hiện có. Pattern an toàn:

current = client.request('GET', f'/api/saved_objects/dashboard/{id}')
current['attributes']['title'] = 'New Title'
client.request(
    'PUT',
    f'/api/saved_objects/dashboard/{id}',
    json={
        'attributes': current['attributes'],
        'references': current['references'],
    }
)

Pitfall 3: Concurrent update conflict

Hai script cùng update một dashboard. Một fail với error version_conflict. Kibana dùng optimistic locking qua field version.

Fix: catch lỗi và retry với fresh version:

def update_with_retry(client, dashboard_id, mutate_fn, retries=3):
    for attempt in range(retries):
        current = client.request('GET', f'/api/saved_objects/dashboard/{dashboard_id}')
        new_attrs = mutate_fn(current['attributes'])
        try:
            return client.request(
                'PUT',
                f'/api/saved_objects/dashboard/{dashboard_id}',
                json={
                    'attributes': new_attrs,
                    'references': current['references'],
                    'version': current['version'],
                }
            )
        except requests.HTTPError as e:
            if e.response.status_code == 409 and attempt < retries - 1:
                time.sleep(2 ** attempt)
                continue
            raise

Cheatsheet

ViệcEndpointMethod
List user/api/security/userGET
Get user/api/security/user/{name}GET
Create/update user/api/security/user/{name}POST
Delete user/api/security/user/{name}DELETE
List role/api/security/roleGET
Create role/api/security/role/{name}PUT
Find saved objects/api/saved_objects/_find?type={t}GET
Bulk get/api/saved_objects/_bulk_getPOST
Bulk delete/api/saved_objects/_bulk_deletePOST
Export/api/saved_objects/_exportPOST
Import/api/saved_objects/_importPOST (multipart)

Lời kết

Mỗi tác vụ Kibana lặp lại quá 3 lần là ứng viên cho automation. Script không cần đẹp ngay, cần đúng và idempotent. Sau vài tuần dùng nó sẽ tiến hoá thành CLI nội bộ cho cả team.

Bài tiếp theo trong series Kibana từ A đến Z sẽ đi xa hơn: dùng Terraform để quản lý saved objects, alert rules và connectors như infrastructure. Khi đã quen với REST API, Terraform provider Kibana là bước tự nhiên kế tiếp, cho phép declarative config thay vì imperative script.