Cloudflare Workerで降水通知システムを作った


定期的に降水情報から、もうすぐ雨が降る・止むを Slack に通知する機能を Cloudflare Worker に作った。これからの季節、ゲリラ豪雨など突発的に雨が降ることも多いので、気付けるようにしておきたい。名称はsqually。

この記事ではロジック、構成などをメモしておく。特に Cloudflare の Durable Objects (DO) を使ってみたかったのが主な目的。

使用技術・構成

  • cloudflare
    • worker
    • durable objects (地点ごとの降雨状態を保持)
    • cron trigger
  • yahoo weather API (降水情報)

構成図

前提として、yahoo weather api の appId は取得済み、cloudflare のアカウントも設定済みとする。

状態管理

目的としては雨が降り始める/止むタイミングを前もって知らせる。任意の地点のゲリラ豪雨を検知しておけるようにしたかった。

現在の観測と予報から「先 30 分以内に雨があるか」を判定し、「降っていない」と「降っている」が切り替わったタイミングで通知する。降り始めなら「あと約 N 分で雨」、止みなら「約 N 分後に止みそう」と、どちらもなるべく予報を先読みして知らせる。

つまり、地点ごとに次のような 2 状態で推移する。通知が出るのは状態が切り替わる遷移の 1 回だけで、降り続き・晴れ続き(自己ループ)では通知しない。

降水通知の状態遷移

この状態は「前回は降っていたか?」を覚えていないと遷移を検出できず、通知もできない。そこで、状態を tick を跨いで保持・管理する仕組みとして DO を使う。

実装

一部のロジックについて抜き出しておく。

降雨判定ロジック(関数 detect

  • expectRain—観測を含む window [0, horizon] のどこかに雨があるか。false → true で「あと約 N 分で雨」を表現。
  • forecastRain—予報(予測値、未来の時刻)に降水量があるか。true → false で「約 N 分後に止みそう」を表現。予報だけを見るので降っている最中でも「もうすぐ止む」と通知したい。
type DetectEvent = "rain_start" | "rain_stop";
 
interface Sample {
  minutesAhead: number; // 0: 観測値、>0: N 分先の予報
  rainfallMmPerH: number;
}
 
interface DetectorState {
  // DO に格納される
  expectRain: boolean; // 雨があるか
  forecastRain: boolean; // 予報のみに雨があるか
  lastNotifiedAt: number | null; // cooldown 判定用
}
 
interface DetectResult {
  state: DetectorState;
  event: DetectEvent | null;
  peakMmPerH: number;
  peakMinutes: number | null;
  leadMinutes: number | null;
}
 
const THRESHOLD_MM_PER_H = 1; // rain threshold (mm/h)
const HORIZON_MINUTES = 30; // window length
const COOLDOWN_MS = 30 * 60_000; // flapping guard window
 
export function detect(prev: DetectorState, samples: Sample[], nowMs: number): DetectResult {
  // Restrict to the [0, horizon] window, sorted ascending so lead time is deterministic.
  const relevant = samples
    .filter((s) => s.minutesAhead >= 0 && s.minutesAhead <= HORIZON_MINUTES)
    .sort((a, b) => a.minutesAhead - b.minutesAhead);
 
  // 雨っぽい
  const expectRain = relevant.some((s) => s.rainfallMmPerH >= THRESHOLD_MM_PER_H);
  // 予報に雨を含むか
  const forecastRain = relevant.some((s) => s.minutesAhead > 0 && s.rainfallMmPerH >= THRESHOLD_MM_PER_H);
 
  // rain_start wins on the rare tick both flip.
  let event: DetectEvent | null = null;
  // 降り始め
  if (expectRain && !prev.expectRain) {
    event = "rain_start";
  // 予報側が雨でないなら止み
  } else if (!forecastRain && (prev.forecastRain ?? false)) {
    event = "rain_stop";
  }
 
  // Flapping guard: suppress the notification inside the cooldown, but still advance state.
  let lastNotifiedAt = prev.lastNotifiedAt;
  if (event !== null) {
    const cooledDown = nowMs - (prev.lastNotifiedAt ?? 0) < COOLDOWN_MS;
    if (cooledDown) event = null;
    else lastNotifiedAt = nowMs;
  }
 
  return { state: { expectRain, forecastRain, lastNotifiedAt }, event };
}

cooldown はフラッピングガードとして設定している。閾値付近で往復しても cooldown 時間内は通知を抑制する。

Durable Objects

降水状態の切り替えにおいて状態を持つステートフルな処理のため用いた。状態を持つだけなら KV でも足りるが、DO を選んだのは通知が成功したときだけ状態変化させる処理を素直に書けるため。

DO は atomic な処理が得意、かつ読み取りが強整合(常に最新の状態が取れる)ため、逐次前回の状態を参照する処理には使い勝手がよかった。storage には SQLite-backed DO storage を使った(無料プランにおける制約でもある)。

今回は地点ごとに保持する。idFromName(location.id) を使えば、同じ地点 ID では常に同じインスタンスに解決される。以下のような書き方で取得できる。

// index.ts — 地点ごとに DO を確保して評価
const id = env.RAIN_DETECTOR.idFromName(location.id);
const stub = env.RAIN_DETECTOR.get(id);
const { event, peakMmPerH, peakMinutes, leadMinutes } = await stub.evaluate(samples);
wrangler でのバインディングと migrations

DO クラスは wrangler.jsonc でバインドし、SQLite backed として migrations で登録する。

"durable_objects": {
  "bindings": [
    { "name": "RAIN_DETECTOR", "class_name": "RainDetectorDurableObject" },
  ],
},
"migrations": [
  { "tag": "v1", "new_sqlite_classes": ["RainDetectorDurableObject"] },
],

DO クラスは cloudflare:workersDurableObject ベースクラスを継承して定義する。

// rain-detector-durable-object.ts
import { DurableObject } from "cloudflare:workers";
 
const STATE_KEY = "state";
const PENDING_KEY = "pending";
 
const INITIAL_STATE: DetectorState = {
  expectRain: false,
  forecastRain: false,
  lastNotifiedAt: null,
};
 
export class RainDetectorDurableObject extends DurableObject<Env> {
  private storage: DurableObjectStorage;
 
  constructor(state: DurableObjectState, env: Env) {
    super(state, env);
    this.storage = state.storage;
  }
 
  // evaluate() confirm() などの定義
}

加えて、wrangler が class_name で解決できるよう src/index.ts などから再 export しておく。

export { RainDetectorDurableObject } from "./rain-detector-durable-object";

Worker からは stub.evaluate() などを RPC で直接呼びだせる。compatibility date 2026-05-01 でサポートされている。

通知

状態の更新と通知をセットにしている。通知が届かなかったのに状態だけ更新する、という片落ちを起こさないようにする。

  • state—現状の保持内容。次回の判定はこれを基準にする。
  • pending—通知が成功したら state にする次の状態。送信成功までは確定させない。

DO への書き込み

// RainDetectorDurableObject 内のメソッドにすることで stub 経由で呼び出せる
// rain-detector-durable-object.ts
import { DurableObject } from "cloudflare:workers";
 
const STATE_KEY = "state";
const PENDING_KEY = "pending";
 
const INITIAL_STATE: DetectorState = {
  expectRain: false,
  forecastRain: false,
  lastNotifiedAt: null,
};
 
export class RainDetectorDurableObject extends DurableObject<Env> {
  private storage: DurableObjectStorage;
 
  constructor(state: DurableObjectState, env: Env) {
    super(state, env);
    this.storage = state.storage;
  }
 
  /** 検出と状態の設定。 **/
  async evaluate(samples: Sample[]) {
    const prev = (await this.storage.get<DetectorState>(STATE_KEY)) ?? INITIAL_STATE;
    const result = detect(prev, samples, Date.now());
 
    if (result.event === null) {
      // 通知なし
      await this.storage.put(STATE_KEY, result.state);
      await this.storage.delete(PENDING_KEY);
    } else {
      // 通知あり
      await this.storage.put(PENDING_KEY, result.state);
    }
 
    return { event: result.event, peakMmPerH: result.peakMmPerH };
  }
 
  /** 送信成功後に実行。pending を committed へ昇格させる。 **/
  async confirm(): Promise<void> {
    const pending = await this.storage.get<DetectorState>(PENDING_KEY);
    if (pending) {
      await this.storage.put(STATE_KEY, pending);
      await this.storage.delete(PENDING_KEY);
    }
  }
}

状態の確定と通知はこんな感じ。slack 向け。

// index.ts
const message =
  event === "rain_start"
    ? `🌧️ ${location.name}: あと約${leadMinutes ?? "?"}分で雨。ピークは約${peakMinutes ?? "?"}分後 ${peakMmPerH}mm/h`
    : event === "rain_stop"
      ? `☀️ ${location.name}: 約${leadMinutes ?? "?"}分後に雨が止みそうです`
      : null;
 
const notify = notifyEnabled.has(location.id);
if (message) {
  const sent = notify ? await notifyWebhook(env, message) : true;
  if (sent) {
    await stub.confirm();
  } else {
    log("warn", "detect.notify_unconfirmed", { locationId: location.id, event });
  }
}
// notify.ts
export async function notifyWebhook(env: Env, message: string): Promise<boolean> {
  const url = env.SLACK_WEBHOOK_URL?.trim();
  if (!url) return true; // nothing configured — not a failure
 
  const text = message.slice(0, MAX_WEBHOOK_CHARS);
  const body = JSON.stringify({ content: text, text });
  return await postOne(url, body);
}
 
async function postOne(url: string, body: string): Promise<boolean> {
  try {
    const res = await fetch(url, {
      method: "POST",
      headers: { "content-type": "application/json" },
      body,
      signal: AbortSignal.timeout(3000),
    });
    if (!res.ok) {
      log("warn", "notify.failed", { host: safeHost(url), httpStatus: res.status });
      return false;
    }
    return true;
  } catch (err) {
    log("warn", "notify.failed", {
      host: safeHost(url),
      message: err instanceof Error ? err.message : String(err),
    });
    return false;
  }
}

通知に失敗している間は confirm() が呼ばれず、state は古いまま据え置かれる。次で同じ条件が再検出されれば、再送される。送信が通った瞬間にだけ state が更新される at-least-once な通知になる。

通知はこんな感じ。

squally の通知例

データ確認

現時点で直接確認する簡単なコマンド、GUI はなかったので、独自にエンドポイントを作ってそちら経由で見てみる。

DO は SQLite backed、保存は KV スタイル API(storage.get/put/list)。cloudflare ダッシュボードの data studio は SQL API で書いたデータしか見れなかった。特に CLI などからもアクセスできないらしい。__debug エンドポイント経由で確認できるようにしてみる。

async fetch(req: Request, env: Env): Promise<Response> {
  const url = new URL(req.url);
  if (url.pathname !== "/__debug") {
    return new Response("ok");
  }
  const ns = url.searchParams.get("ns");
 
  const name = url.searchParams.get("name") ?? (ns?.endsWith("buffer") ? "global" : null);
  if (!name) {
    return Response.json({ error: `name (location id) required for ns=${ns ?? ""}` }, { status: 400 });
  }
 
  let storage: string;
  switch (ns) {
    case "rain-detector": {
      const stub = env.RAIN_DETECTOR.get(env.RAIN_DETECTOR.idFromName(name));
      storage = await stub._debugDump();
      break;
    }
    default: {
      const stub = env.WARNING_BUFFER.get(env.WARNING_BUFFER.idFromName(name));
      storage = await stub._debugDump();
      break;
    }
  }
  // _debugDump が返す JSON 文字列をそのまま埋め込む
  return new Response(
    `{"ns":${JSON.stringify(ns)},"name":${JSON.stringify(name)},"storage":${storage}}`,
    { headers: { "content-type": "application/json" } },
  );
}

_debugDump() は例によって DO のオブジェクト(stub)のメソッド。

// _debugDump
async _debugDump(): Promise<string> {
  return JSON.stringify(Object.fromEntries(await this.storage.list()));
}

適当に namespace(DO オブジェクト)、location 名(内部で lot/lat で変換している)を指定すると storage 以下に値が格納されていた。

curl 'http://localhost:8787/__debug?ns=rain-detector&name=tokyo-shinjuku'
...
{
  "ns": "rain-detector",
  "name": "tokyo-shinjuku",
  "storage": { "state": { "expectRain": false, "forecastRain": false, "lastNotifiedAt": null } }
}

ちなみにファイルは以下のように worker スクリプト直下の .wrangler 以下に保存されていた。

.wrangler/state/v3/do/
└── squally-RainDetectorDurableObject/<hash>.sqlite

デプロイ設定

wrangler.jsonc について。

{
  "name": "squally",
  "main": "src/index.ts",
  "compatibility_date": "2026-05-01",
  "triggers": { "crons": ["*/5 * * * *"] },
  "vars": {
    "DETECT_THRESHOLD_MM_PER_H": "1", // 雨と判定する閾値 (mm/h)
    "DETECT_HORIZON_MINUTES": "30", // 何分先まで見るか
    "DETECT_COOLDOWN_MINUTES": "30", // フラッピング抑制
  },
  "durable_objects": {
    "bindings": [
      { "name": "RAIN_DETECTOR", "class_name": "RainDetectorDurableObject" },
    ],
  },
  "migrations": [
    { "tag": "v1", "new_sqlite_classes": ["RainDetectorDurableObject"] },
  ],
}

秘匿値は wrangler secret put や console 画面から設定する。

secret用途
YAHOO_APP_IDYahoo API 認証
SLACK_WEBHOOK_URL通知

デプロイと運用コマンド:

# deploy
wrangler deploy
# 実行log
wrangler tail --format=pretty

感想

cloudflare worker で意外と簡単に cron 実行ができた。認証などの設定もせずに storage などと連携できるバインディングも便利。対応言語は typescript のみ、無料プランはコンピュートリミットもあるなど制限もあるため用途は限られるが、今回のような状態を持つ軽微な処理に向いている。DO はクラスなどで構造を管理でき、KV よりも手軽に設定できた。今どき、同じようなシステム構成で agent sdk を用いることで、簡単な LLM を用いた処理も可能になっている。

今回は全体構成の説明を省いたが、実際は cloudlare logs を設定したり、healthchecks.io などの外部サービスを用いて、cron の死活監視を行っている。