Webhook再試行ロジック
Omise Webhookの自動再試行スケジュール、べき等パターンの実装、失敗処理、監視についての完全なガイド。
概要
Omiseは自動的にWebhookを再試行します。エンドポイントがタイムアウトまたは5xxエラーを返す場合、Webhookは指数バックオフで再試行されます。
再試行スケジュール
Omiseは以下の指数バックオフスケジュールを使用してWebhookを再試行します:
| 試行 | 待機時間 | 説明 |
|---|---|---|
| 1 | 1時間 | 最初の再試行 |
| 2 | 2時間 | |
| 3 | 4時間 | |
| 4 | 8時間 | |
| 5 | 12時間 | |
| 6 | 24時間 | |
| 7 | 24時間 | |
| 8 | 24時間 | |
| 9 | 24時間 | |
| 10 | 24時間 | 最終試行 (最大3日) |
べき等パターン
重複イベントを安全に処理するため、べき等パターンを実装する必要があります。
パターン1: イベントIDトラッキング
Redisまたはデータベースでイベントをトラッキング:
// Node.js - Redisを使用したべき等処理
const redis = require('redis');
const client = redis.createClient();
async function handleWebhook(event) {
const eventId = event.id;
const key = `webhook:${eventId}`;
// すでに処理済みかどうかを確認
const exists = await client.exists(key);
if (exists) {
console.log(`Duplicate event: ${eventId}`);
return;
}
// イベントを処理
try {
await processEvent(event);
// Redisにセット (24時間後に期限切れ)
await client.setex(key, 86400, '1');
} catch (error) {
console.error(`Error processing event: ${error.message}`);
// 処理済みとしてマークしない - 再試行を許可
throw error;
}
}
パターン2: データベース制約
データベース内の一意の制約を使用:
# Python - SQLAlchemyを使用したべき等処理
from sqlalchemy import Column, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class ProcessedWebhook(Base):
__tablename__ = 'processed_webhooks'
event_id = Column(String, primary_key=True, unique=True)
created_at = Column(DateTime, default=datetime.utcnow)
def handle_webhook(event):
session = Session()
try:
# すでに処理済みかどうかを確認
existing = session.query(ProcessedWebhook).filter_by(
event_id=event['id']
).first()
if existing:
print(f"Duplicate event: {event['id']}")
return
# イベントを処理
process_event(event)
# 記録を追加
webhook_record = ProcessedWebhook(event_id=event['id'])
session.add(webhook_record)
session.commit()
except IntegrityError:
# 別のプロセスが既に処理済み
session.rollback()
print(f"Event already processed: {event['id']}")
finally:
session.close()
パターン3: ビジネスロジックべき等性
ビジネスロジックのべき等性を確保:
# Ruby - Railsでのべき等処理
def handle_charge_complete(charge)
charge_id = charge['id']
# すでに処理済みかどうかを確認 (データから)
existing_order = Order.find_by(omise_charge_id: charge_id)
if existing_order
puts "Charge already processed: #{charge_id}"
return
end
# チャージを処理
Order.create!(
omise_charge_id: charge_id,
amount: charge['amount'],
status: 'completed'
)
# 確認メールを送信
send_confirmation_email(existing_order.customer)
end
メッセージキューでの実装
本番環境での信頼性の高い処理:
Bull (Node.js)
// Node.js - Bullを使用したWebhook処理
const Queue = require('bull');
const webhookQueue = new Queue('webhooks', {
redis: { host: 'localhost', port: 6379 }
});
// Webhookエンドポイント
app.post('/webhooks/omise', async (req, res) => {
// シグネチャを検証
if (!verifySignature(req)) {
return res.status(401).json({ error: 'Invalid signature' });
}
// キューに追加
await webhookQueue.add(req.body, {
jobId: req.body.id, // べき等キーとして使用
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: true
});
// 迅速にレスポンス
res.json({ received: true });
});
// キュープロセッサー
webhookQueue.process(async (job) => {
const event = job.data;
console.log(`Processing webhook: ${event.key}`);
// べき等をチェック
const processed = await isEventProcessed(event.id);
if (processed) {
console.log(`Event already processed: ${event.id}`);
return;
}
// イベントを処理
await processEvent(event);
// 記録を追加
await markEventAsProcessed(event.id);
});
Celery (Python)
# Python - Celeryを使用したWebhook処理
from celery import Celery
from flask import Flask, request, jsonify
app = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379')
@app.route('/webhooks/omise', methods=['POST'])
def handle_webhook():
# シグネチャを検証
if not verify_signature(request):
return jsonify({'error': 'Invalid signature'}), 401
# タスクをキューに追加
event = request.json
process_webhook.delay(event, event_id=event['id'])
# 迅速にレスポンス
return jsonify({'received': True}), 200
@celery.task(bind=True, max_retries=3)
def process_webhook(self, event):
try:
# べき等をチェック
if is_event_processed(event['id']):
print(f"Event already processed: {event['id']}")
return
# イベントを処理
handle_event(event)
# 記録を追加
mark_event_as_processed(event['id'])
except Exception as e:
# 指数バックオフで再試行
raise self.retry(exc=e, countdown=2 ** self.request.retries)
Sidekiq (Ruby)
# Ruby - Sidekiqを使用したWebhook処理
class WebhookJob
include Sidekiq::Worker
sidekiq_options retries: 3
def perform(event_json)
event = JSON.parse(event_json)
# べき等をチェック
if event_processed?(event['id'])
puts "Event already processed: #{event['id']}"
return
end
# イベントを処理
handle_event(event)
# 記録を追加
mark_event_as_processed(event['id'])
end
private
def event_processed?(event_id)
ProcessedWebhook.exists?(event_id: event_id)
end
def mark_event_as_processed(event_id)
ProcessedWebhook.create!(event_id: event_id)
end
end
# Railsコントローラー
class WebhooksController < ApplicationController
skip_before_action :verify_authenticity_token
def omise
# シグネチャを検証
unless verify_signature(request)
return render json: { error: 'Invalid signature' }, status: 401
end
# ジョブをキューに追加
WebhookJob.perform_async(request.body.read)
# 迅速にレスポンス
render json: { received: true }
end
end
失敗処理
デッドレターキュー
処理に失敗したイベントを処理:
// Node.js - デッドレターキューを使用した処理
const webhookQueue = new Queue('webhooks');
const dlq = new Queue('webhooks-dlq');
// 失敗イベントをデッドレターキューに移動
webhookQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
dlq.add(job.data, {
originalJobId: job.id,
failureReason: err.message,
failureTime: new Date()
});
});
// デッドレターキューを手動で処理
dlq.process(async (job) => {
console.log(`Processing failed event: ${job.data.originalJobId}`);
// 管理者に通知を送信、ログを記録など
});
ログと監視
Webhookメトリクス
// Prometheusメトリクス
const prometheus = require('prom-client');
const webhookCounter = new prometheus.Counter({
name: 'webhook_events_total',
help: 'Total webhook events received',
labelNames: ['event_type', 'status']
});
const webhookDuration = new prometheus.Histogram({
name: 'webhook_processing_seconds',
help: 'Time to process webhook',
labelNames: ['event_type']
});
app.post('/webhooks/omise', async (req, res) => {
const startTime = Date.now();
const event = req.body;
try {
await processEvent(event);
webhookCounter.inc({
event_type: event.key,
status: 'success'
});
} catch (error) {
webhookCounter.inc({
event_type: event.key,
status: 'error'
});
} finally {
webhookDuration.observe(
{ event_type: event.key },
(Date.now() - startTime) / 1000
);
}
res.json({ received: true });
});
手動再試行
ダッシュボードか ら
- ダッシュボードに移動
- 設定 > Webhookを開く
- エンドポイントを選択
- 最近の配信をビュー
- 失敗したイベントを見つけて再送信をクリック
API経由
# Webhookを再トリガー
curl https://api.omise.co/webhooks/events/{event_id}/resend \
-u skey_test_YOUR_KEY: \
-X POST
ベストプラクティス
1. 必ずべき等を実装
すべてのWebhookハンドラーでべき等を確保:
async function handleWebhook(event) {
// イベントが既に処理済みかどうかをチェック
const eventId = event.id;
const exists = await checkIfProcessed(eventId);
if (exists) return;
// イベントを処理
await processEvent(event);
// 処理済みとしてマーク
await markAsProcessed(eventId);
}
2. 迅速にレスポンス
Webhookエンドポイントは10秒以内にレスポンスする必要があります:
@app.route('/webhooks/omise', methods=['POST'])
def handle_webhook():
# 迅速にストレージにキューを作成
event = request.json
webhook_queue.put(event)
# 迅速にレスポンス
return jsonify({'received': True}), 200
# バックグラウンドで処理
def process_webhooks():
while True:
event = webhook_queue.get()
handle_event(event)
3. デッドレターキューを使用
処理に失敗したイベントをトラッキング:
webhookQueue.on('failed', async (job, err) => {
// デッドレターキューに移動
await deadLetterQueue.add({
originalEvent: job.data,
error: err.message,
failedAt: new Date()
});
// 管理者に通知
notifyAdmins(`Webhook processing failed: ${err.message}`);
});
トラブルシューティング
イベントが重複して処理される
原因: べき等が実装されていない
解決策: すべてのWebhookハンドラーでべき等を実装