Ollamaを使った「家庭用セキュリティAI」を自分で作る

スマホ・PCの守り方

「Amazonを装った不審なメールが届いた」「銀行からの緊急連絡と書いてあるが本物か分からない」——フィッシングメールは年々巧妙化しており、もはや一般のユーザーが目視で見分けることが難しくなっています。

警察庁の発表によると、フィッシング報告件数は2023年だけで約119万件を超えており、被害額も拡大し続けています。セキュリティソフトのフィルターでは見抜けないケースも増えている中、Ollamaを使った「家庭用セキュリティAI」を自分で作るという選択肢が現実的になってきました。

本記事では、プログラミングの知識が多少あれば誰でも構築できる、ローカル動作のフィッシングメール判定AIを段階的に解説します。メールの本文はすべて自分のPCの中で処理されるため、プライバシーを守りながら高精度な判定が可能です。

この記事で作れるもの

  • メールのテキストを貼り付けるだけで危険度を判定するPythonツール
  • GmailやThunderbirdと連携する自動チェックシステム
  • URLの安全性を自動評価する機能
  • 家族でも使えるシンプルなWebインターフェース
  • 判定結果を通知するSlack/LINE連携ボット

なぜローカルAIがメール判定に最適なのか

既存のフィッシング対策の限界

現在広く使われているフィッシング対策には、それぞれ無視できない弱点があります。

対策手段仕組み弱点
メールフィルター(スパムフィルター)既知のパターンやドメインをブロック新しいフィッシング手口に対応できない
セキュリティソフト既知の悪意あるURLをブロック作られたばかりのフィッシングURLは未登録
クラウドAI(ChatGPT等)へのコピペLLMによる文脈理解メール本文に個人情報が含まれると漏洩リスク
自分の目視確認経験則と知識による判断巧妙な手口には対応できない・疲労する

特にクラウドAIへのコピペは、一見便利ですが大きな問題があります。メールには送信者の名前、メールアドレス、取引情報など個人情報が含まれていることが多く、それをChatGPTなどに貼り付けることは、個人情報を外部サーバーに送信することを意味します。

Ollamaを使ったローカルAIなら、メール本文が自分のPC外に出ることは一切ありません。高精度な言語理解能力と完全なプライバシー保護を同時に実現できます。

フィッシングメールの現代的な手口

AIを使った判定が特に有効なのは、従来のルールベースでは捕捉しにくい「文脈的な怪しさ」を見抜けるからです。最近のフィッシングメールが使う手口を理解しておきましょう。

  • ブランドなりすまし:Amazon、楽天、三菱UFJ銀行、ゆうちょ銀行、マイナポータルなど信頼性の高いブランドを偽装。ロゴや文体まで精巧にコピー
  • 緊急性の演出:「24時間以内に確認しないとアカウントを停止します」など焦りを誘う表現
  • 生成AIを使ったメール文:誤字脱字がなく、自然な日本語で書かれたフィッシングメールが増加中
  • QRコードの悪用:URLの代わりにQRコードを使い、セキュリティツールの検出を回避
  • 正規ドメインの悪用:Google FormsやMicrosoft SharePointなど、正規サービス上にフィッシングページを作成
  • 多段階攻撃:最初のメールは無害に見え、クリック後にフィッシングページへ誘導

環境構築:必要なものと準備

必要なハードウェアスペック

家庭での利用を想定し、比較的手に入りやすいスペックで動作するように設計します。

スペック最小構成推奨構成
RAM8GB(モデル用に4〜5GB使用)16GB以上
ストレージ10GB空き(モデルファイル用)30GB以上の空き
GPU不要(CPU動作可)NVIDIA RTX 3060以上(高速化)
OSWindows 10/11、macOS 12以上、Ubuntu 20.04以上同左
Python3.9以上3.11以上

Apple Siliconユーザーへ

M1/M2/M3/M4チップを搭載したMacは、Unified Memory(統合メモリ)を活用するためGPUなしでも高速に動作します。MacBook Air M2(8GBメモリ)でも、7〜8Bパラメータのモデルなら十分実用的な速度で動作します。

Ollamaのインストール

# macOS
# https://ollama.com/download/mac からインストーラーをダウンロード
# またはHomebrew経由
brew install ollama

# Linux
curl -fsSL https://ollama.com/install.sh | sh
# ※インストール前にスクリプト内容を確認することを推奨

# Windows
# https://ollama.com/download/windows からインストーラーをダウンロード

# インストール確認
ollama --version

判定AIに使うモデルの選択

フィッシング判定には、日本語の文脈理解と構造化出力(JSON)が得意なモデルが適しています。

# 推奨モデル①:Qwen2.5(日本語対応が優秀)
ollama pull qwen2.5:7b       # 約5GB、8GB RAM以上推奨
# または
ollama pull qwen2.5:3b       # 約2GB、最小スペック向け

# 推奨モデル②:Llama 3.1(バランス型)
ollama pull llama3.1:8b      # 約5GB

# 推奨モデル③:Gemma 3(Google製、多言語対応)
ollama pull gemma3:4b        # 約3GB、低スペック向け

# ダウンロードの確認
ollama list

Pythonライブラリのインストール

# 仮想環境の作成(推奨)
python -m venv phishing-ai
source phishing-ai/bin/activate  # macOS/Linux
# phishing-ai\Scripts\activate   # Windows

# 必要ライブラリのインストール
pip install requests flask gradio python-dotenv
pip install imaplib2 email-validator
pip install validators tldextract

OSINT専用Modelfile:フィッシング判定AIの設計

Ollamaの Modelfile 機能を使って、フィッシング判定に特化したAIをカスタム設計します。適切なシステムプロンプトを設計することが、判定精度を左右する最も重要なポイントです。

フィッシング判定専用Modelfileの作成

# ファイル名:PhishingDetectorModelfile
# 保存場所:~/phishing-ai/PhishingDetectorModelfile

FROM qwen2.5:7b

SYSTEM """
あなたはサイバーセキュリティの専門家として、フィッシングメール・詐欺メールの検出を行うAIです。

【あなたの役割】
ユーザーが提出したメールのテキストを分析し、フィッシング詐欺・なりすまし・マルウェア配布等の
不正なメールかどうかを判定します。

【判定基準】
以下の要素を必ず確認してください:

1. 送信者の正当性
   - 送信元メールアドレスが公式ドメインと一致しているか
   - 表示名と実際のアドレスに乖離がないか
   - 類似ドメイン(例:amaz0n.com、rakuten-jp.net)を使っていないか

2. 緊急性・恐怖の煽り
   - 「アカウント停止」「緊急確認」「24時間以内」などの表現
   - 罰則・損害の脅迫
   - 特典・懸賞当選の誇大表現

3. URLと誘導先の怪しさ
   - 本文中のURLが公式ドメインと異なる
   - URLを短縮サービスや文字列で偽装
   - ログイン・個人情報の入力を要求

4. 文体・日本語の不自然さ
   - 不自然な敬語・翻訳調の文体
   - 公式企業の文書として不適切な表現
   - 逆に:AIで生成した疑いのある不自然に完璧な文章

5. 要求内容の妥当性
   - 正規のサービスが要求しない情報(パスワード、クレジットカード番号等)
   - 通常の業務フローとかけ離れた手続き要求

【出力形式】
必ず以下のJSON形式のみで回答してください。説明文は一切不要です:

{
  "verdict": "DANGEROUS" または "SUSPICIOUS" または "SAFE",
  "confidence": 0から100の整数(判定の確信度),
  "risk_score": 0から10の数値(危険度スコア、小数点1桁まで),
  "phishing_type": "フィッシング種別(brand_impersonation/urgency_scam/malware/prize_fraud/romance_scam/safe/unknown)",
  "impersonated_brand": "なりすましているブランド名(不明の場合はnull)",
  "red_flags": ["発見した危険なポイントのリスト(日本語)"],
  "safe_indicators": ["安全を示す要素のリスト(日本語)"],
  "suspicious_urls": ["メール内の怪しいURL(発見した場合)"],
  "recommended_action": "具体的な推奨アクション(日本語・1文)",
  "explanation": "判定理由の詳しい説明(日本語・3〜5文)"
}
"""

PARAMETER temperature 0.1
# temperatureを低くして一貫性のある判定を実現
PARAMETER top_p 0.9
PARAMETER num_ctx 4096
# メール全文を処理できるコンテキスト長
# Modelfileからカスタムモデルをビルド
ollama create phishing-detector -f PhishingDetectorModelfile

# ビルド確認
ollama list

# 簡単なテスト
ollama run phishing-detector "件名:【重要】Amazonアカウントの確認が必要です\n本文:お客様のアカウントに不審なアクセスが確認されました。24時間以内に以下のリンクから確認してください。http://amazon-security-verify.net/login"

コア機能の実装:フィッシング判定エンジン

判定エンジンの本体コード

#!/usr/bin/env python3
"""
phishing_detector.py
Ollamaを使ったフィッシングメール判定エンジン
"""

import requests
import json
import re
import validators
import tldextract
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlparse
import time

# ── 設定 ──────────────────────────────────────
OLLAMA_URL   = "http://localhost:11434/api/generate"
MODEL_NAME   = "phishing-detector"   # Modelfileで作成したカスタムモデル
TIMEOUT_SEC  = 90                    # AI応答のタイムアウト(秒)

# 既知フィッシングドメインの簡易リスト(実際はより大規模なデータベースを使用)
KNOWN_PHISHING_TLDS = {
    ".tk", ".ml", ".ga", ".cf", ".gq"   # 無料ドメインはフィッシングで多用
}

# 正規ブランドの公式ドメイン(代表例)
OFFICIAL_DOMAINS = {
    "amazon":         ["amazon.co.jp", "amazon.com", "amazon-oss.com"],
    "rakuten":        ["rakuten.co.jp", "rakuten.com"],
    "yahoo":          ["yahoo.co.jp", "yahoo.com"],
    "google":         ["google.co.jp", "google.com", "gmail.com"],
    "microsoft":      ["microsoft.com", "outlook.com", "live.com", "hotmail.com"],
    "apple":          ["apple.com", "icloud.com"],
    "mufg":           ["mufg.jp", "bk.mufg.jp"],
    "smbc":           ["smbc.co.jp", "smbcgroup.com"],
    "mizuho":         ["mizuho-fg.co.jp", "mizuhobank.co.jp"],
    "yucho":          ["japanpost.jp", "yu-cho.japanpost.jp"],
    "mynumber":       ["myna.go.jp", "kojinbango-card.go.jp"],
    "ntt-docomo":     ["docomo.ne.jp", "nttdocomo.co.jp"],
    "softbank":       ["softbank.jp", "softbank.ne.jp"],
    "au":             ["au.com", "au.kddi.com"],
}


@dataclass
class PhishingAnalysis:
    """フィッシング判定の結果を格納するデータクラス"""
    verdict: str                    # DANGEROUS / SUSPICIOUS / SAFE
    confidence: int                 # 0-100
    risk_score: float               # 0-10
    phishing_type: str
    impersonated_brand: Optional[str]
    red_flags: list[str]
    safe_indicators: list[str]
    suspicious_urls: list[str]
    recommended_action: str
    explanation: str
    # 追加メタデータ
    urls_found: list[str]
    pre_analysis_warnings: list[str]
    analysis_time_sec: float


class PhishingDetector:
    """フィッシングメール判定の主クラス"""

    def __init__(self, model: str = MODEL_NAME):
        self.model = model

    # ── 前処理:URL抽出と簡易チェック ──────────────

    def extract_urls(self, text: str) -> list[str]:
        """メールテキストからURLを抽出する"""
        url_pattern = re.compile(
            r'https?://[^\s<>"{}|\\^`\[\]\'() ]+'
        )
        urls = url_pattern.findall(text)
        return list(set(urls))  # 重複除去

    def check_url_safety(self, url: str) -> list[str]:
        """URLの静的チェック(API呼び出しなし)"""
        warnings = []
        try:
            parsed   = urlparse(url)
            extracted = tldextract.extract(url)
            domain   = f"{extracted.domain}.{extracted.suffix}"
            fqdn     = parsed.netloc.lower()

            # ① 無料ドメインTLDのチェック
            for suspicious_tld in KNOWN_PHISHING_TLDS:
                if fqdn.endswith(suspicious_tld):
                    warnings.append(f"⚠ 無料TLD使用(フィッシングで多用): {suspicious_tld}")

            # ② サブドメインに有名ブランド名が含まれるが実際は別ドメイン
            #    例:amazon.security-check.net
            for brand, official_domains in OFFICIAL_DOMAINS.items():
                if brand in fqdn:
                    is_official = any(fqdn == od or fqdn.endswith("." + od)
                                      for od in official_domains)
                    if not is_official:
                        warnings.append(
                            f"⚠ ブランド名のなりすまし疑い:「{brand}」を含むが"
                            f"公式ドメインではない({domain})"
                        )

            # ③ IPアドレス直打ちURL
            ip_pattern = re.compile(r'https?://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
            if ip_pattern.match(url):
                warnings.append(f"⚠ IPアドレス直打ちURL(フィッシングで多用): {url}")

            # ④ 極端に長いURLまたはランダムな文字列
            if len(url) > 200:
                warnings.append(f"⚠ 異常に長いURL({len(url)}文字)")

            # ⑤ URLにログイン・確認・検証などの単語が含まれる
            suspicious_words = ["login", "verify", "confirm", "secure",
                                 "account", "update", "signin", "password"]
            url_lower = url.lower()
            found_words = [w for w in suspicious_words if w in url_lower]
            if found_words:
                warnings.append(
                    f"⚠ URLに不審なキーワードを含む: {', '.join(found_words)}"
                )

        except Exception as e:
            warnings.append(f"URL解析エラー: {e}")

        return warnings

    def pre_analyze(self, email_text: str) -> tuple[list[str], list[str]]:
        """
        AI呼び出し前の静的解析
        戻り値:(抽出URL一覧, 警告メッセージ一覧)
        """
        urls = self.extract_urls(email_text)
        warnings = []

        # 全URLをチェック
        for url in urls:
            url_warnings = self.check_url_safety(url)
            warnings.extend(url_warnings)

        # メール本文の簡易パターンマッチ
        text_lower = email_text.lower()

        urgency_keywords = [
            "24時間以内", "48時間以内", "本日中", "至急", "緊急",
            "アカウント停止", "利用停止", "凍結", "不審なアクセス",
            "本人確認", "身分確認", "情報更新", "期限切れ"
        ]
        found_urgency = [k for k in urgency_keywords if k in email_text]
        if len(found_urgency) >= 2:
            warnings.append(
                f"⚠ 複数の緊急性を煽るキーワード検出: {', '.join(found_urgency)}"
            )

        return urls, warnings

    # ── AI判定 ──────────────────────────────────

    def analyze(self, email_text: str) -> PhishingAnalysis:
        """メールテキストを分析してPhishingAnalysisを返す"""
        start_time = time.time()

        # 前処理
        urls_found, pre_warnings = self.pre_analyze(email_text)

        # プロンプトの構築
        prompt = f"""以下のメールを分析してください。

=== メール本文(開始) ===
{email_text[:3000]}
=== メール本文(終了) ===

発見されたURL一覧:
{json.dumps(urls_found, ensure_ascii=False)}

前処理での警告:
{json.dumps(pre_warnings, ensure_ascii=False)}

JSON形式で判定結果のみを返してください。"""

        # Ollamaへのリクエスト
        try:
            response = requests.post(
                OLLAMA_URL,
                json={
                    "model": self.model,
                    "prompt": prompt,
                    "stream": False,
                    "options": {
                        "temperature": 0.1,
                        "num_ctx": 4096
                    }
                },
                timeout=TIMEOUT_SEC
            )
            raw_response = response.json()["response"]

            # JSONの抽出(余分なテキストが含まれる場合に対応)
            json_match = re.search(r'\{.*\}', raw_response, re.DOTALL)
            if json_match:
                result_dict = json.loads(json_match.group())
            else:
                raise ValueError("AIがJSON形式で回答しませんでした")

        except requests.exceptions.Timeout:
            return self._error_result(
                "AI判定がタイムアウトしました",
                urls_found, pre_warnings,
                time.time() - start_time
            )
        except json.JSONDecodeError as e:
            return self._error_result(
                f"AI応答のJSON解析に失敗: {e}",
                urls_found, pre_warnings,
                time.time() - start_time
            )

        elapsed = time.time() - start_time

        return PhishingAnalysis(
            verdict             = result_dict.get("verdict", "SUSPICIOUS"),
            confidence          = int(result_dict.get("confidence", 50)),
            risk_score          = float(result_dict.get("risk_score", 5.0)),
            phishing_type       = result_dict.get("phishing_type", "unknown"),
            impersonated_brand  = result_dict.get("impersonated_brand"),
            red_flags           = result_dict.get("red_flags", []),
            safe_indicators     = result_dict.get("safe_indicators", []),
            suspicious_urls     = result_dict.get("suspicious_urls", []),
            recommended_action  = result_dict.get("recommended_action", ""),
            explanation         = result_dict.get("explanation", ""),
            urls_found          = urls_found,
            pre_analysis_warnings = pre_warnings,
            analysis_time_sec   = elapsed
        )

    def _error_result(self, msg, urls, warnings, elapsed) -> PhishingAnalysis:
        """エラー時のデフォルト結果"""
        return PhishingAnalysis(
            verdict="SUSPICIOUS", confidence=0, risk_score=5.0,
            phishing_type="unknown", impersonated_brand=None,
            red_flags=[f"判定エラー: {msg}"],
            safe_indicators=[], suspicious_urls=[],
            recommended_action="手動で確認してください",
            explanation=msg,
            urls_found=urls, pre_analysis_warnings=warnings,
            analysis_time_sec=elapsed
        )

    # ── 結果の表示 ────────────────────────────────

    def print_result(self, result: PhishingAnalysis) -> None:
        """判定結果をターミナルに表示する"""

        # 危険度に応じたアイコンと色
        icons = {
            "DANGEROUS":  "🔴 【危険】",
            "SUSPICIOUS": "🟡 【要注意】",
            "SAFE":       "🟢 【安全】"
        }
        icon = icons.get(result.verdict, "⚪ 【不明】")

        print("\n" + "=" * 60)
        print(f"  判定結果:{icon}")
        print(f"  危険度スコア:{result.risk_score}/10 確信度:{result.confidence}%")
        if result.impersonated_brand:
            print(f"  なりすましブランド:{result.impersonated_brand}")
        print(f"  フィッシング種別:{result.phishing_type}")
        print("=" * 60)

        if result.red_flags:
            print("\n🚩 危険なポイント:")
            for flag in result.red_flags:
                print(f"   ✗ {flag}")

        if result.safe_indicators:
            print("\n✅ 安全な要素:")
            for indicator in result.safe_indicators:
                print(f"   ✓ {indicator}")

        if result.pre_analysis_warnings:
            print("\n⚠️  静的解析の警告:")
            for warning in result.pre_analysis_warnings:
                print(f"   {warning}")

        if result.suspicious_urls:
            print("\n🔗 不審なURL:")
            for url in result.suspicious_urls:
                print(f"   → {url}")

        print(f"\n📋 判定根拠:")
        print(f"   {result.explanation}")
        print(f"\n🛡️  推奨アクション:")
        print(f"   {result.recommended_action}")
        print(f"\n⏱  分析時間:{result.analysis_time_sec:.1f}秒")
        print("=" * 60 + "\n")


# ── CLIインターフェース ─────────────────────────

def main():
    detector = PhishingDetector()

    print("=" * 60)
    print("  🔍 フィッシングメール判定AI(Ollama版)")
    print("  ローカル動作・プライバシー完全保護")
    print("=" * 60)
    print("\nメールの本文を貼り付けてください(入力終了は空行2回):\n")

    lines = []
    empty_count = 0
    while empty_count < 2:
        line = input()
        if line == "":
            empty_count += 1
        else:
            empty_count = 0
            lines.append(line)

    email_text = "\n".join(lines)
    if not email_text.strip():
        print("メールが入力されていません。")
        return

    print("\n⏳ 判定中... しばらくお待ちください")
    result = detector.analyze(email_text)
    detector.print_result(result)


if __name__ == "__main__":
    main()

Webインターフェースの構築:家族でも使えるUI

Pythonのコマンドラインに慣れていない家族でも使えるよう、GradioというライブラリでシンプルなWebアプリを作ります。Gradioは数十行のコードでAIのWebインターフェースを作れる強力なライブラリです。

#!/usr/bin/env python3
"""
web_app.py
Gradioを使ったフィッシング判定Webアプリ
起動後、ブラウザで http://localhost:7860 にアクセス
"""

import gradio as gr
import json
from phishing_detector import PhishingDetector, PhishingAnalysis

detector = PhishingDetector()

def format_verdict_html(result: PhishingAnalysis) -> str:
    """判定結果をHTMLで整形する"""

    colors = {
        "DANGEROUS":  ("#ff4444", "#ffe6e6", "🔴 危険:フィッシングメールの疑いが強い"),
        "SUSPICIOUS": ("#ff8800", "#fff8e6", "🟡 要注意:フィッシングメールの可能性あり"),
        "SAFE":       ("#22aa44", "#e6ffe9", "🟢 安全:問題は見つかりませんでした"),
    }
    color, bg, label = colors.get(result.verdict, ("#888", "#eee", "⚪ 不明"))

    red_flags_html = "".join(
        f"✗ {flag}" for flag in result.red_flags
    ) if result.red_flags else "特になし"

    safe_html = "".join(
        f"✓ {ind}" for ind in result.safe_indicators
    ) if result.safe_indicators else "特になし"

    suspicious_urls_html = "".join(
        f"{url}" for url in result.suspicious_urls
    ) if result.suspicious_urls else "なし"

    warnings_html = "".join(
        f"{w}" for w in result.pre_analysis_warnings
    ) if result.pre_analysis_warnings else "なし"

    brand_html = f"なりすましブランド:{result.impersonated_brand}" \
                 if result.impersonated_brand else ""

    return f"""
{label}{result.risk_score}/10危険度スコア{result.confidence}%判定の確信度{result.analysis_time_sec:.1f}秒分析時間

  {brand_html}

  🚩 危険なポイント{red_flags_html}✅ 安全な要素{safe_html}⚠️ 不審なURL{suspicious_urls_html}🔍 静的解析の警告{warnings_html}📋 判定の詳しい説明{result.explanation}🛡️ 推奨アクション: {result.recommended_action}
  
"""


def analyze_email(email_text: str) -> str:
    """Gradioのコールバック関数"""
    if not email_text or not email_text.strip():
        return "メールのテキストを入力してください。"

    if len(email_text) > 10000:
        email_text = email_text[:10000]

    try:
        result = detector.analyze(email_text)
        return format_verdict_html(result)
    except Exception as e:
        return f"エラーが発生しました:{e}"


# サンプルメール(デモ用)
SAMPLE_SUSPICIOUS = """件名:【重要】Amazonアカウントの確認をお願いします

Amazon.co.jpカスタマーサービス

お客様のアカウントに不審なアクセスが検出されました。
セキュリティ保護のため、24時間以内に下記リンクより本人確認をお願いします。

確認はこちら → http://amazon-security-verify.net/login?user=confirm

確認が完了しない場合、アカウントを一時停止させていただきます。

Amazon カスタマーサービス"""

SAMPLE_SAFE = """件名:ご購入いただきありがとうございます

株式会社〇〇ショッピング よりご連絡です。

このたびはご購入いただきありがとうございます。
ご注文番号:ORD-20250315-12345
商品名:ワイヤレスイヤホン(ブラック)
お届け予定:2025年3月17日〜19日

ご不明点はinfo@oo-shopping.co.jp またはフリーダイヤル 0120-XXX-XXXまでお問い合わせください。"""


# Gradioアプリの定義
with gr.Blocks(
    title="フィッシングメール判定AI",
    theme=gr.themes.Soft()
) as app:

    gr.Markdown("""
# 🔍 フィッシングメール判定AI
**完全ローカル動作・メール内容は外部に送信されません**

怪しいと思ったメールの本文をそのまま貼り付けると、AIが安全かどうか判定します。
""")

    with gr.Row():
        with gr.Column(scale=1):
            email_input = gr.Textbox(
                label="メールの本文を貼り付けてください",
                placeholder="件名や本文をそのままコピー&ペーストしてください...",
                lines=15,
                max_lines=30
            )
            with gr.Row():
                analyze_btn  = gr.Button("🔍 判定する", variant="primary", scale=2)
                clear_btn    = gr.Button("🗑 クリア", scale=1)

            gr.Examples(
                examples=[
                    [SAMPLE_SUSPICIOUS],
                    [SAMPLE_SAFE],
                ],
                inputs=email_input,
                label="サンプルメール(クリックで入力)"
            )

        with gr.Column(scale=1):
            result_output = gr.HTML(
                label="判定結果",
                value=""
                      "メールを入力して「判定する」を押してください"
            )

    analyze_btn.click(
        fn=analyze_email,
        inputs=email_input,
        outputs=result_output
    )
    clear_btn.click(
        fn=lambda: ("", ""
                        "メールを入力して「判定する」を押してください"),
        outputs=[email_input, result_output]
    )

    gr.Markdown("""
---
⚠️ **注意事項**
- 本ツールはあくまで参考情報です。最終的な判断はご自身でお願いします
- 判定結果が「安全」でも、重要な操作(送金・ログイン等)は慎重に
- 怪しいリンクは絶対にクリックしないでください
""")


if __name__ == "__main__":
    app.launch(
        server_name="127.0.0.1",  # ローカルのみ(外部公開しない)
        server_port=7860,
        share=False                # インターネット共有を無効化
    )
# Webアプリの起動
python web_app.py

# ブラウザで以下にアクセス
# http://localhost:7860

Gmailとの自動連携:受信したら即チェック

毎回手動でコピペするのは面倒です。GmailのIMAPを使って未読メールを自動取得し、バックグラウンドで判定するシステムを構築します。

Gmailのアプリパスワードについて

GmailのIMAP連携には「アプリパスワード」が必要です。Googleアカウントの2段階認証を有効にした上で、Googleアカウント設定の「セキュリティ」→「アプリパスワード」から16桁のパスワードを生成してください。通常のGoogleパスワードはコードに記述しないでください。

#!/usr/bin/env python3
"""
gmail_monitor.py
Gmail未読メールの自動フィッシング判定
"""

import imaplib
import email
from email.header import decode_header
import time
import os
import json
from pathlib import Path
from phishing_detector import PhishingDetector, PhishingAnalysis
from dotenv import load_dotenv

load_dotenv()  # .envファイルから設定を読み込む

# ── 設定(.envファイルに記述すること) ──────────
GMAIL_ADDRESS  = os.getenv("GMAIL_ADDRESS")   # Gmailアドレス
GMAIL_APP_PASS = os.getenv("GMAIL_APP_PASS")  # アプリパスワード(16桁)
CHECK_INTERVAL = 300                           # チェック間隔(秒)

# 判定済みメールIDの保存ファイル
CHECKED_IDS_FILE = Path("checked_email_ids.json")


class GmailPhishingMonitor:
    """Gmailの未読メールを自動チェックするモニター"""

    def __init__(self):
        self.detector    = PhishingDetector()
        self.checked_ids = self._load_checked_ids()

    def _load_checked_ids(self) -> set:
        """判定済みメールIDをファイルから読み込む"""
        if CHECKED_IDS_FILE.exists():
            with open(CHECKED_IDS_FILE) as f:
                return set(json.load(f))
        return set()

    def _save_checked_ids(self) -> None:
        """判定済みメールIDをファイルに保存する"""
        # 最新1000件のみ保持
        ids_list = list(self.checked_ids)[-1000:]
        with open(CHECKED_IDS_FILE, "w") as f:
            json.dump(ids_list, f)

    def decode_mime_str(self, s: str) -> str:
        """MIMEエンコードされた文字列をデコードする"""
        parts = decode_header(s)
        decoded = []
        for part, charset in parts:
            if isinstance(part, bytes):
                charset = charset or "utf-8"
                decoded.append(part.decode(charset, errors="replace"))
            else:
                decoded.append(part)
        return "".join(decoded)

    def get_email_body(self, msg) -> str:
        """メールオブジェクトから本文テキストを抽出する"""
        body_parts = []
        if msg.is_multipart():
            for part in msg.walk():
                if part.get_content_type() == "text/plain":
                    charset = part.get_content_charset() or "utf-8"
                    body_parts.append(
                        part.get_payload(decode=True).decode(charset, errors="replace")
                    )
        else:
            charset = msg.get_content_charset() or "utf-8"
            body_parts.append(
                msg.get_payload(decode=True).decode(charset, errors="replace")
            )
        return "\n".join(body_parts)

    def check_inbox(self) -> list[dict]:
        """受信ボックスの未読メールをチェックする"""
        results = []

        try:
            mail = imaplib.IMAP4_SSL("imap.gmail.com")
            mail.login(GMAIL_ADDRESS, GMAIL_APP_PASS)
            mail.select("INBOX")

            # 未読メールを検索
            _, message_ids = mail.search(None, "UNSEEN")
            ids = message_ids[0].split()

            for msg_id in ids[-20:]:   # 最新20件まで
                msg_id_str = msg_id.decode()

                # 判定済みはスキップ
                if msg_id_str in self.checked_ids:
                    continue

                # メールを取得
                _, msg_data = mail.fetch(msg_id, "(RFC822)")
                raw_email = msg_data[0][1]
                msg = email.message_from_bytes(raw_email)

                subject = self.decode_mime_str(msg.get("Subject", "(件名なし)"))
                sender  = msg.get("From", "不明")
                body    = self.get_email_body(msg)

                # 全テキストを結合して判定
                full_text = f"件名:{subject}\n差出人:{sender}\n\n{body}"

                print(f"[*] 分析中:{subject[:50]}...")
                analysis = self.detector.analyze(full_text)

                result = {
                    "msg_id":     msg_id_str,
                    "subject":    subject,
                    "sender":     sender,
                    "verdict":    analysis.verdict,
                    "risk_score": analysis.risk_score,
                    "explanation": analysis.explanation,
                    "recommended_action": analysis.recommended_action
                }
                results.append(result)
                self.checked_ids.add(msg_id_str)

                # 危険なメールは即座に通知
                if analysis.verdict == "DANGEROUS":
                    self._alert(subject, sender, analysis)
                    print(f"  🔴 危険! スコア:{analysis.risk_score}/10")
                elif analysis.verdict == "SUSPICIOUS":
                    print(f"  🟡 要注意。スコア:{analysis.risk_score}/10")
                else:
                    print(f"  🟢 安全。スコア:{analysis.risk_score}/10")

            mail.logout()
            self._save_checked_ids()

        except imaplib.IMAP4.error as e:
            print(f"[!] IMAP接続エラー: {e}")
        except Exception as e:
            print(f"[!] エラー: {e}")

        return results

    def _alert(self, subject: str, sender: str, analysis: PhishingAnalysis) -> None:
        """危険なメールが発見された場合のアラート処理"""
        print("\n" + "🚨" * 20)
        print(f"  危険なメールを検出しました!")
        print(f"  件名:{subject}")
        print(f"  差出人:{sender}")
        print(f"  危険度:{analysis.risk_score}/10")
        print(f"  推奨:{analysis.recommended_action}")
        print("🚨" * 20 + "\n")
        # ← ここにSlack/LINE/デスクトップ通知を追加(後述)

    def run_loop(self) -> None:
        """定期的に受信ボックスをチェックするループ"""
        print("📬 Gmailフィッシング監視を開始しました")
        print(f"   チェック間隔:{CHECK_INTERVAL}秒")
        print("   停止するには Ctrl+C を押してください\n")

        while True:
            print(f"[{time.strftime('%H:%M:%S')}] メールチェック開始...")
            results = self.check_inbox()

            dangerous = [r for r in results if r["verdict"] == "DANGEROUS"]
            suspicious = [r for r in results if r["verdict"] == "SUSPICIOUS"]

            print(f"   チェック完了:{len(results)}件"
                  f"(危険:{len(dangerous)} 注意:{len(suspicious)})\n")

            time.sleep(CHECK_INTERVAL)


if __name__ == "__main__":
    monitor = GmailPhishingMonitor()
    monitor.run_loop()
# .env ファイルの作成(プロジェクトディレクトリに保存)
# .gitignoreに必ず追加すること!

# .env
GMAIL_ADDRESS=your-email@gmail.com
GMAIL_APP_PASS=xxxx-xxxx-xxxx-xxxx

通知システムの構築:LINE・Slackへアラートを送る

危険なメールを発見したとき、スマートフォンに即座に通知が届けば安心です。LINE NotifyまたはSlackと連携する通知機能を追加します。

LINE Notifyの終了について

LINE Notifyは2025年3月31日にサービスを終了しました。現在はLINEのMessaging APIまたはSlack・Discord等の代替サービスの利用を推奨します。以下ではSlack Webhookとシステム通知(macOS/Linux)を使った方法を紹介します。

#!/usr/bin/env python3
"""
notifier.py
フィッシング検出時の通知モジュール
"""

import requests
import json
import os
import subprocess
import platform
from phishing_detector import PhishingAnalysis

# ── Slack通知 ──────────────────────────────────
# Slack Appを作成し、Incoming Webhooks URLを取得する
# https://api.slack.com/messaging/webhooks

SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL", "")

def notify_slack(subject: str, sender: str, analysis: PhishingAnalysis) -> bool:
    """Slackに危険メール通知を送る"""
    if not SLACK_WEBHOOK_URL:
        return False

    color_map = {
        "DANGEROUS":  "#ff0000",
        "SUSPICIOUS": "#ff8800",
        "SAFE":       "#22aa44"
    }
    color = color_map.get(analysis.verdict, "#888888")

    emoji_map = {
        "DANGEROUS":  "🔴",
        "SUSPICIOUS": "🟡",
        "SAFE":       "🟢"
    }
    emoji = emoji_map.get(analysis.verdict, "⚪")

    red_flags_text = "\n".join(
        f"• {flag}" for flag in analysis.red_flags[:5]
    ) or "なし"

    payload = {
        "text": f"{emoji} *フィッシングメール判定結果*",
        "attachments": [
            {
                "color": color,
                "fields": [
                    {"title": "件名",       "value": subject,            "short": False},
                    {"title": "差出人",     "value": sender,             "short": True},
                    {"title": "判定",       "value": analysis.verdict,   "short": True},
                    {"title": "危険度",     "value": f"{analysis.risk_score}/10",  "short": True},
                    {"title": "確信度",     "value": f"{analysis.confidence}%",    "short": True},
                    {"title": "危険なポイント", "value": red_flags_text,  "short": False},
                    {"title": "推奨アクション", "value": analysis.recommended_action, "short": False},
                ],
                "footer": "家庭用フィッシング判定AI(Ollama)",
            }
        ]
    }

    try:
        response = requests.post(
            SLACK_WEBHOOK_URL,
            json=payload,
            timeout=10
        )
        return response.status_code == 200
    except Exception as e:
        print(f"Slack通知エラー: {e}")
        return False


# ── デスクトップ通知 ────────────────────────────

def notify_desktop(subject: str, analysis: PhishingAnalysis) -> None:
    """OS標準のデスクトップ通知を送る"""
    verdict_labels = {
        "DANGEROUS":  "⚠️ 危険なメールを検出",
        "SUSPICIOUS": "⚠️ 不審なメールを検出",
        "SAFE":       "✅ 安全なメールです",
    }
    title   = verdict_labels.get(analysis.verdict, "メール判定結果")
    message = f"{subject[:50]}\n危険度: {analysis.risk_score}/10"

    system = platform.system()
    try:
        if system == "Darwin":          # macOS
            subprocess.run([
                "osascript", "-e",
                f'display notification "{message}" with title "{title}"'
            ], check=False)
        elif system == "Linux":         # Linux (notify-send)
            subprocess.run(
                ["notify-send", title, message],
                check=False
            )
        elif system == "Windows":
            # Windows Toast通知(wintoastnotifライブラリ使用)
            try:
                from wintoastnotif import ToastNotifier
                toaster = ToastNotifier()
                toaster.show_toast(title, message, duration=10)
            except ImportError:
                pass
    except Exception as e:
        print(f"デスクトップ通知エラー: {e}")


# ── Discord通知 ─────────────────────────────────

DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "")

def notify_discord(subject: str, sender: str, analysis: PhishingAnalysis) -> bool:
    """Discordに危険メール通知を送る"""
    if not DISCORD_WEBHOOK_URL:
        return False

    color_map = {"DANGEROUS": 16711680, "SUSPICIOUS": 16744192, "SAFE": 2240068}
    color = color_map.get(analysis.verdict, 8421504)

    embed = {
        "title": f"{'🔴' if analysis.verdict=='DANGEROUS' else '🟡'} フィッシングメール判定",
        "color": color,
        "fields": [
            {"name": "件名",     "value": subject[:100],      "inline": False},
            {"name": "差出人",   "value": sender[:100],        "inline": True},
            {"name": "判定",     "value": analysis.verdict,    "inline": True},
            {"name": "危険度",   "value": f"{analysis.risk_score}/10", "inline": True},
            {"name": "説明",     "value": analysis.explanation[:500], "inline": False},
            {"name": "推奨",     "value": analysis.recommended_action, "inline": False},
        ],
        "footer": {"text": "家庭用フィッシング判定AI(Ollama)"}
    }

    try:
        response = requests.post(
            DISCORD_WEBHOOK_URL,
            json={"embeds": },
            timeout=10
        )
        return response.status_code in (200, 204)
    except Exception as e:
        print(f"Discord通知エラー: {e}")
        return False


def send_all_notifications(
    subject: str, sender: str, analysis: PhishingAnalysis
) -> None:
    """設定されているすべての通知チャンネルに送信する"""
    # 危険または要注意のみ通知(SAFEは通知しない)
    if analysis.verdict == "SAFE":
        return

    notify_desktop(subject, analysis)

    if SLACK_WEBHOOK_URL:
        ok = notify_slack(subject, sender, analysis)
        print(f"  Slack通知: {'✓' if ok else '✗'}")

    if DISCORD_WEBHOOK_URL:
        ok = notify_discord(subject, sender, analysis)
        print(f"  Discord通知: {'✓' if ok else '✗'}")

判定精度を上げる:プロンプトチューニングのコツ

ローカルLLMの判定精度は、システムプロンプトの設計によって大きく変わります。実際にフィッシングメールを判定してみて精度が低いと感じた場合、以下の観点でModelfileを調整します。

偽陰性(フィッシングを見逃す)が多い場合

# Modelfileの調整例(厳格モード)
# PhishingDetectorModelfile を編集

# temperatureをさらに低く設定
PARAMETER temperature 0.05

# システムプロンプトに以下を追加:
# 「判断に迷う場合はSUSPICIOUSとすること」
# 「正規のメールでも過剰検知(偽陽性)よりフィッシングの見逃し(偽陰性)を避けること」

# 更新後はモデルを再ビルド
# ollama create phishing-detector -f PhishingDetectorModelfile

偽陽性(正規メールを誤検知)が多い場合

# システムプロンプトに以下の知識を追加:

SYSTEM """
(既存のプロンプトに追加)

【正規メールとフィッシングを区別する追加知識】

正規の企業メールの特徴:
- 差出人ドメインが会社の公式ドメインと一致する
- 件名・本文に過剰な緊急性や恐怖の表現がない
- URLが公式ドメインのサブドメインまたはパスに誘導
- 個人情報やパスワードの直接入力を求めない
- 会社の住所・電話番号・登録番号などが明記されている

日本の正規企業メールでよく使う表現:
「いつもお世話になっております」「ご確認をお願いいたします」
「ご不明な点はお問い合わせください」など

これらの特徴が揃っている場合は過剰にSUSPICIOUSと判定しないこと。
"""

Few-shot例を使った精度向上

# プロンプトにフィッシング例と正規例を数件追加することで
# AIがパターンを学習して精度が向上する(Few-shot学習)

def analyze_with_examples(self, email_text: str) -> PhishingAnalysis:
    """
    Few-shot例を含めた高精度分析
    """
    few_shot_examples = """
【例1:危険なフィッシングメール】
件名:【緊急】三菱UFJ銀行 本人確認のお願い
差出人:security@mufg-bank-verify.com
本文:お客様のアカウントに不正アクセスが検出されました。
      48時間以内に http://mufg-secure-login.net/verify にアクセスし確認してください。
→ 判定:DANGEROUS(理由:ドメインが公式と異なる、緊急性の煽り、偽ドメインへの誘導)

【例2:安全な正規メール】
件名:ご注文の発送のお知らせ
差出人:no-reply@amazon.co.jp
本文:ご注文のXXXXが発送されました。
      お届け状況はhttps://www.amazon.co.jp/gp/your-account/order-historyでご確認ください。
→ 判定:SAFE(理由:公式ドメイン、公式URLへの誘導、緊急性なし)

【例3:要注意メール】
件名:アカウント情報の更新のお願い
差出人:info@rakuten.co.jp
本文:サービス改善のためアカウント情報の更新をお願いします。
      https://member.rakuten.co.jp/app/index.gsp?act=logInFromBukkenDetail
→ 判定:SUSPICIOUS(理由:差出人は正規だがURLが複雑すぎる・要追加確認)
"""

    prompt = f"""
{few_shot_examples}

上記の判定基準と例を参考に、以下のメールを判定してください:

=== 判定対象メール ===
{email_text[:3000]}
========================

JSON形式で判定結果を返してください。
"""
    # 以降は通常の analyze() と同様の処理
    ...

実際の判定テスト:精度の確認方法

構築したシステムの判定精度を確認するためのテストコードを作成します。

#!/usr/bin/env python3
"""
test_detector.py
フィッシング判定エンジンの精度テスト
"""

from phishing_detector import PhishingDetector

detector = PhishingDetector()

# テストケース:(メール本文, 期待される判定, 説明)
TEST_CASES = [
    # ─── 危険なメールのテストケース ───
    (
        """件名:【重要】Amazon.co.jpアカウント確認のお願い
差出人:security@amazon-jp-verify.com
本文:お客様のアカウントに不正アクセスが確認されました。
24時間以内に以下よりパスワードをリセットしてください。

http://amazon-security-update.xyz/reset
対応されない場合アカウントを停止いたします。""", "DANGEROUS", "Amazonなりすまし・偽ドメイン・緊急性煽り" ), ( """件名:楽天ポイントが失効します 差出人:point-info@rakuten-point.net 本文:5,000ポイントが明日失効します。 本日中にhttps://r.r-coins.jp/pointでご確認ください。 楽天市場""", "DANGEROUS", "楽天なりすまし・偽ドメイン" ), # ─── 安全なメールのテストケース ─── ( """件名:ご注文確認 #123-4567890-1234567 差出人:shipment-tracking@amazon.co.jp 本文:お客様のご注文が発送されました。 追跡番号:1234-5678-9012 配送状況:https://www.amazon.co.jp/gp/your-account/order-history ご不明な点はカスタマーサービス(0120-XXX-XXX)までお問い合わせください。 Amazon.co.jp カスタマーサービス""", "SAFE", "正規のAmazon発送通知" ), ( """件名:定期メンテナンスのお知らせ(3月25日 2:00-4:00) 差出人:maintenance@example-corp.co.jp 本文:いつもご利用いただきありがとうございます。 下記の日程でシステムメンテナンスを実施いたします。 日時:2025年3月25日(火)午前2:00〜4:00 この間、サービスをご利用いただけません。 ご不便をおかけしますが、ご理解のほどよろしくお願いいたします。 株式会社○○ システム管理部""", "SAFE", "正規のメンテナンス通知" ), ] def run_tests(): print("=" * 60) print(" フィッシング判定AIの精度テスト") print("=" * 60) correct = 0 total = len(TEST_CASES) for i, (email_text, expected, description) in enumerate(TEST_CASES, 1): print(f"\n[テスト {i}/{total}] {description}") print(f" 期待値:{expected}") result = detector.analyze(email_text) actual = result.verdict # SUSPICIOUSはDANGEROUSまたはSUSPICIOUSのどちらの期待値にも対応 is_correct = (actual == expected) or \ (expected == "DANGEROUS" and actual == "SUSPICIOUS") or \ (expected == "SUSPICIOUS" and actual in ["DANGEROUS", "SUSPICIOUS"]) status = "✓ 正解" if is_correct else "✗ 不正解" if is_correct: correct += 1 print(f" 実際値:{actual} スコア:{result.risk_score}/10") print(f" 結果:{status}") if not is_correct: print(f" 説明:{result.explanation[:100]}...") print(f"\n{'='*60}") accuracy = correct / total * 100 print(f" 正解率:{correct}/{total}件 ({accuracy:.1f}%)") if accuracy >= 80: print(" 🟢 良好な精度です") elif accuracy >= 60: print(" 🟡 精度改善の余地があります(プロンプトチューニングを推奨)") else: print(" 🔴 精度が低いです(モデルまたはプロンプトの見直しが必要)") print("=" * 60) if __name__ == "__main__": run_tests()

運用のヒントとトラブルシューティング

よくある問題と解決策

問題原因解決策
判定が遅い(30秒以上かかる)CPU推論 / モデルが大きすぎるより小さいモデルに変更(qwen2.5:3b等)
JSON形式で返ってこないモデルがプロンプトを誤解しているtemperatureを0.1以下に設定 / モデルを変更
フィッシングを見逃す(偽陰性)システムプロンプトが緩い判定基準を厳しくするようプロンプトを修正
正規メールを誤検知(偽陽性)システムプロンプトが過剰正規メールの特徴説明をプロンプトに追加
IMAP接続エラーアプリパスワードが間違っているGoogleアカウントでアプリパスワードを再生成
メモリ不足でクラッシュRAMが足りないより小さいモデルを使用 / スワップを増やす

セキュリティ上の注意点

本システムを運用する際の注意事項

  • 判定結果を過信しない:AIは完璧ではありません。特にリンクのクリックや個人情報の入力は、AI判定が「安全」でも慎重に行ってください
  • Webアプリはローカルのみで起動server_name="127.0.0.1" の設定を変更して外部公開しないこと。メール内容が第三者に漏洩するリスクがあります
  • 認証情報の管理:GmailのアプリパスワードはGit等でバージョン管理しないこと。.envファイルを使い、必ず.gitignoreに追加してください
  • 定期的なモデルの更新:新しいフィッシング手法が登場した場合、プロンプトを更新することで対応力を維持できます

プライバシーについての確認

# 本システムがどのようにデータを扱うかの確認

# Ollamaが外部に送信する通信を確認(macOS/Linux)
# モデル推論中(メール判定中)に以下を実行

# macOS
sudo tcpdump -i any -n 'host not 127.0.0.1' and port 11434

# Linux
sudo tcpdump -i any -n 'dst port 11434 and not src host 127.0.0.1'

# 期待される結果:
# → ポート11434への外部通信は一切ない(ローカルのみ)
# モデルのダウンロード時のみ ollama.com への通信が発生

まとめ:自分だけの「セキュリティAI」を育てる

本記事では、Ollamaを使ってフィッシングメールを判定するローカルAIを一から構築しました。完成したシステムの価値を整理すると以下のとおりです。

  • プライバシー保護:メール内容が自分のPC外に出ることは一切なく、ChatGPTへのコピペよりはるかに安全
  • コスト無料:一度構築すれば月額費用はゼロ。API料金も不要
  • カスタマイズ可能:自社・自家庭のよく受け取るメールパターンに合わせてチューニングできる
  • 学習の機会:なぜそのメールが危険なのかをAIが説明してくれるため、フィッシングへの理解が深まる

今後の発展として、メールだけでなくSMSやSNSのDMに対応させることURLをサンドボックスで自動解析する機能家族の端末とWebUIを共有する機能なども実現可能です。

フィッシング詐欺は今後もAIを使ってさらに巧妙化します。その対抗手段として、私たちも自分のAIを活用するという発想が、これからのサイバーセキュリティの常識になっていくかもしれません。

最終チェックリスト:システム稼働前の確認

  • ☐ Ollamaがインストールされ、ollama list でモデルが確認できる
  • ☐ PhishingDetectorModelfile からカスタムモデルをビルドした
  • ☐ python phishing_detector.py でCLI判定が動作する
  • ☐ python test_detector.py で60%以上の正解率を確認した
  • ☐ Webアプリの server_name が 127.0.0.1(ローカルのみ)になっている
  • ☐ Gmail連携する場合、.env ファイルを .gitignore に追加した
  • ☐ 通知先(Slack/Discord)のWebhook URLを設定した
  • ☐ 実際のフィッシングメールで最終テストを実施した

※ 本記事のコードは教育・家庭内利用を目的としたサンプルです。本番環境での利用にあたっては追加のセキュリティ対策をご検討ください。フィッシング判定AIの精度はモデルやプロンプト設計によって変わります。判定結果はあくまで参考情報としてご活用ください。情報は2025年3月時点のものです。

コメント