OSINTとAIの融合:なぜローカルLLMが最適なのか

解析

OSINTの基本概念

OSINTは「公開されている情報源から得られる情報」をもとにした諜報・調査活動です。その情報源は多岐にわたります。

  • ウェブサイト・ニュース記事:企業の公式サイト、プレスリリース、技術ブログ
  • SNS・フォーラム:Twitter/X、LinkedIn、Reddit、GitHub
  • 公的記録・データベース:WHOIS情報、法人登記、特許データベース
  • 技術情報:Shodan(IoTデバイス検索)、Censys、Certificate Transparency
  • 漏洩データベース:Have I Been Pwned、パスワードリスト
  • 地理情報:衛星写真、Googleマップ、地番情報

AIなしのOSINTの限界

従来のOSINT作業は、膨大なデータを人間が手作業で確認・分類・関連付けるという時間のかかるプロセスでした。例えば、1つのドメインのOSINT調査を手動で行う場合、WHOIS情報の確認、DNSレコードの列挙、SSL証明書の確認、関連IPアドレスの調査、Shodanでのポートスキャン結果確認など、数十のステップが必要になります。収集した生データをレポートにまとめるだけでも数時間かかることがあります。

ここにLLMを組み合わせることで、データ収集の自動化、収集情報の自然言語での解析・要約、パターン認識・異常検出、レポートの自動生成が可能になります。

なぜクラウドAIではなくローカルLLMか

OSINTはその性質上、調査対象に関する機密情報を扱います。調査内容がクラウドサービスのサーバーに送信されることは、以下の観点から問題があります。

リスククラウドAI(ChatGPT等)Ollama(ローカル)
調査内容の漏洩❌ サーバーに送信される✅ 一切外部送信なし
調査対象への察知リスク❌ ログが残る可能性✅ オフライン動作可能
機密クライアント情報の保護❌ 利用規約に注意が必要✅ 完全自社管理
カスタムシステムプロンプト△ 制限あり✅ 自由にカスタマイズ
AIの安全フィルター回避❌ セキュリティ研究でも制限✅ 研究目的の設定が可能
コスト❌ API料金が発生✅ 無料(ハードウェアのみ)

ローカルLLMのOSINTにおける決定的優位性

セキュリティ研究者にとって最も重要なのは、調査内容がいかなる第三者にも渡らないという保証です。Ollamaはモデルのダウンロード後にオフライン動作可能であり、プロンプトとレスポンスは完全にローカルで処理されます。これはOSINT業務の機密性を守る上で不可欠な要件です。

OSINT専用AIの設計思想:Modelfileの構築

Ollamaの「Modelfile」機能を使うことで、OSINT調査に特化したカスタムAIを構築できます。これはDockerfileに似た設定ファイルで、ベースモデル、システムプロンプト、推論パラメータなどを定義します。

OSINTアシスタント向けModelfileの設計原則

効果的なOSINT AIを設計する際に考慮すべき要素は以下のとおりです。

  • 構造化出力の強制:JSON形式での出力を指示することで、後段処理が容易になる
  • 調査目的の明確化:法的・倫理的な調査のみを支援することを明示
  • 推論の透明性:根拠・情報源を示す形式での回答を求める
  • スコープの限定:不要な機能(一般雑談など)を無効化

基本的なOSINTアシスタントのModelfile

# Modelfile: osint-assistant
# 使用するベースモデル(日本語対応の場合はQwen2.5推奨)
FROM qwen2.5:14b

# システムプロンプト
SYSTEM """
あなたはプロのサイバーセキュリティ研究者を支援するOSINTアシスタントです。
以下の原則に従って動作してください。

【ミッション】
正当な目的(セキュリティ研究、脅威インテリジェンス、企業デューデリジェンス)
のためのオープンソース情報の収集・分析を支援する。

【出力形式】
- 分析結果は必ず「情報源の種類」「信頼度(高/中/低)」「調査推奨アクション」を含める
- 構造化データはJSON形式で提供する
- 不確かな情報には必ず「要確認」フラグを付ける

【制約】
- 違法な手段(不正アクセス、プライバシー侵害など)による情報収集を支援しない
- 個人の私生活への不当な侵害につながる可能性がある調査は拒否する
- 収集する情報は公開情報(OSINT)に限定する

【対応言語】
日本語・英語での質問に対応する。
"""

# 推論パラメータ
PARAMETER temperature 0.3
# temperatureを低く設定することで、事実に基づいた一貫した回答を促進
PARAMETER top_p 0.9
PARAMETER num_ctx 8192
# コンテキストウィンドウを大きくすることで、長いログや収集データの分析が可能

# モデルのビルドコマンド:
# ollama create osint-assistant -f Modelfile

専門化されたModelfile:ドメイン調査特化版

# Modelfile: domain-investigator
FROM llama3.1:8b

SYSTEM """
あなたはドメイン・インフラ調査の専門家AIです。
提供されたドメイン名・IPアドレス・ハッシュ値に対して、以下の分析を行います。

分析項目:
1. WHOIS情報の解釈(登録者、レジストラ、有効期限)
2. DNSレコードの分析(A, AAAA, MX, TXT, NS, CNAME)
3. SSL/TLS証明書の評価(発行機関、有効期限、SAN)
4. ASN・ネットワーク帰属の特定
5. 地理的位置情報の分析
6. 過去のIPアドレス変更履歴の重要性評価
7. 関連インフラの推定

出力は必ず以下の構造で返してください:
{
  "target": "",
  "analysis_type": "",
  "findings": [],
  "risk_indicators": [],
  "recommended_next_steps": [],
  "confidence": "HIGH|MEDIUM|LOW",
  "notes": ""
}
"""

PARAMETER temperature 0.2
PARAMETER num_ctx 4096

OSINT調査パイプラインの構築

実際のOSINT業務では、複数のツールをパイプラインで連携させます。Ollamaを中心に、各OSINTツールの出力をAIが分析・統合するアーキテクチャを構築しましょう。

全体アーキテクチャ

【OSINTパイプライン全体図】

┌─────────────────────────────────────────┐
│           入力(調査対象)                │
│  ドメイン / IP / ハッシュ / メールアドレス │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│         データ収集ツール群                │
│  whois / dig / nmap / shodan-cli         │
│  subfinder / amass / theHarvester        │
│  curl (Certificate Transparency)         │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│      Ollama(ローカルLLM)               │
│  収集データの分析・解釈・統合             │
│  パターン認識・リスク評価                │
│  レポート自動生成                        │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│           出力                           │
│  JSONレポート / Markdown / HTML          │
│  グラフ(Maltego / NetworkX)            │
└─────────────────────────────────────────┘

Pythonによる統合パイプライン実装

#!/usr/bin/env python3
"""
OSINT Pipeline with Ollama
目的:ドメインのOSINT情報を自動収集し、AIで分析する
"""

import subprocess
import json
import requests
import socket
import ssl
import datetime
from typing import Dict, List, Any

OLLAMA_URL = "http://localhost:11434/api/chat"
OSINT_MODEL = "osint-assistant"  # カスタムModelfileで作成したモデル

class OSINTPipeline:
    def __init__(self, target: str):
        self.target = target
        self.collected_data: Dict[str, Any] = {}
        self.analysis_results = []

    # ─── データ収集フェーズ ───

    def collect_whois(self) -> str:
        """WHOIS情報を収集する"""
        try:
            result = subprocess.run(
                ["whois", self.target],
                capture_output=True, text=True, timeout=15
            )
            return result.stdout[:3000]  # 最初の3000文字を取得
        except Exception as e:
            return f"WHOIS取得エラー: {e}"

    def collect_dns(self) -> Dict[str, str]:
        """DNSレコードを収集する"""
        records = {}
        record_types = ["A", "AAAA", "MX", "TXT", "NS", "CNAME"]
        for rtype in record_types:
            try:
                result = subprocess.run(
                    ["dig", "+short", self.target, rtype],
                    capture_output=True, text=True, timeout=10
                )
                records[rtype] = result.stdout.strip()
            except Exception:
                records[rtype] = "取得失敗"
        return records

    def collect_ssl_cert(self) -> Dict[str, Any]:
        """SSL証明書情報を収集する"""
        try:
            ctx = ssl.create_default_context()
            with ctx.wrap_socket(
                socket.socket(), server_hostname=self.target
            ) as s:
                s.settimeout(5)
                s.connect((self.target, 443))
                cert = s.getpeercert()

            # 有効期限の計算
            not_after = datetime.datetime.strptime(
                cert.get('notAfter', ''), '%b %d %H:%M:%S %Y %Z'
            )
            days_remaining = (not_after - datetime.datetime.utcnow()).days

            return {
                "subject": dict(x[0] for x in cert.get('subject', [])),
                "issuer": dict(x[0] for x in cert.get('issuer', [])),
                "notAfter": cert.get('notAfter'),
                "days_remaining": days_remaining,
                "subjectAltName": [v for k, v in cert.get('subjectAltName', [])
                                   if k == 'DNS']
            }
        except Exception as e:
            return {"error": str(e)}

    def collect_ip_info(self) -> Dict[str, Any]:
        """IPアドレスのASN・地理情報を収集する(ip-api.comを使用)"""
        try:
            ip = socket.gethostbyname(self.target)
            # ip-api.com は商用利用以外は無料で利用可能
            response = requests.get(
                f"http://ip-api.com/json/{ip}",
                timeout=5
            )
            data = response.json()
            data['resolved_ip'] = ip
            return data
        except Exception as e:
            return {"error": str(e)}

    # ─── AI分析フェーズ ───

    def analyze_with_ollama(self, data_type: str, data: Any) -> str:
        """収集データをOllamaで分析する"""
        prompt = f"""
以下の{data_type}データを分析してください。

調査対象: {self.target}
データ:
{json.dumps(data, ensure_ascii=False, indent=2) if isinstance(data, dict) else data}

以下の観点から分析してください:
1. 注目すべき発見事項(セキュリティ上のリスクや異常を含む)
2. このデータから推測できる組織・インフラの特徴
3. 追加調査が推奨される項目
4. 信頼度評価(HIGH/MEDIUM/LOW)

分析結果をJSON形式で返してください。
"""
        payload = {
            "model": OSINT_MODEL,
            "messages": [{"role": "user", "content": prompt}],
            "stream": False
        }
        try:
            response = requests.post(OLLAMA_URL, json=payload, timeout=120)
            result = response.json()
            return result["message"]["content"]
        except Exception as e:
            return f"AI分析エラー: {e}"

    def run(self) -> Dict[str, Any]:
        """パイプライン全体を実行する"""
        print(f"[*] OSINT調査開始: {self.target}")

        # データ収集
        print("[*] WHOIS情報収集中...")
        self.collected_data["whois"] = self.collect_whois()

        print("[*] DNS情報収集中...")
        self.collected_data["dns"] = self.collect_dns()

        print("[*] SSL証明書情報収集中...")
        self.collected_data["ssl"] = self.collect_ssl_cert()

        print("[*] IPアドレス・ASN情報収集中...")
        self.collected_data["ip_info"] = self.collect_ip_info()

        # AI分析
        print("[*] AIによるWHOIS分析中...")
        whois_analysis = self.analyze_with_ollama("WHOIS", self.collected_data["whois"])

        print("[*] AIによるDNS分析中...")
        dns_analysis = self.analyze_with_ollama("DNS", self.collected_data["dns"])

        print("[*] AIによるSSL証明書分析中...")
        ssl_analysis = self.analyze_with_ollama("SSL証明書", self.collected_data["ssl"])

        # 統合レポート生成
        print("[*] 統合レポート生成中...")
        final_report = self.generate_integrated_report({
            "whois": whois_analysis,
            "dns": dns_analysis,
            "ssl": ssl_analysis,
            "ip_info": self.collected_data["ip_info"]
        })

        return {
            "target": self.target,
            "timestamp": datetime.datetime.utcnow().isoformat(),
            "raw_data": self.collected_data,
            "ai_analysis": {
                "whois": whois_analysis,
                "dns": dns_analysis,
                "ssl": ssl_analysis
            },
            "integrated_report": final_report
        }

    def generate_integrated_report(self, analyses: Dict) -> str:
        """全分析結果を統合してレポートを生成する"""
        prompt = f"""
以下の複数の分析結果を統合して、{self.target}に関する総合OSINTレポートを作成してください。

分析結果:
{json.dumps(analyses, ensure_ascii=False, indent=2)}

レポートには以下を含めてください:
1. エグゼクティブサマリー(3文以内)
2. 主要な発見事項(箇条書き)
3. 特定されたリスク(リスクレベル:CRITICAL/HIGH/MEDIUM/LOW)
4. 組織・インフラの特徴まとめ
5. 推奨される追加調査項目
6. 総合信頼度スコア(1-10)

Markdown形式で出力してください。
"""
        payload = {
            "model": OSINT_MODEL,
            "messages": [{"role": "user", "content": prompt}],
            "stream": False
        }
        try:
            response = requests.post(OLLAMA_URL, json=payload, timeout=180)
            return response.json()["message"]["content"]
        except Exception as e:
            return f"レポート生成エラー: {e}"


# 実行例
if __name__ == "__main__":
    # 注意:調査対象は自身が管理するドメインまたは許可を得たドメインのみ
    pipeline = OSINTPipeline("example.com")
    report = pipeline.run()

    # レポートの保存
    with open("osint_report.json", "w", encoding="utf-8") as f:
        json.dump(report, f, ensure_ascii=False, indent=2)

    print("\n[+] 調査完了。osint_report.json に保存されました。")
    print("\n=== 統合レポート ===")
    print(report["integrated_report"])

脅威インテリジェンス分析:Ollamaによるログ解析

OSINT活動の重要なユースケースの1つが、大量のログデータから脅威を発見する作業です。Ollamaを使うことで、手動ではとても確認しきれない量のログを効率的に分析できます。

セキュリティログ分析ツールの実装

#!/usr/bin/env python3
"""
Threat Intelligence Log Analyzer
Ollamaを使ったセキュリティログの脅威分析
"""

import requests
import json
import re
from collections import Counter
from pathlib import Path

OLLAMA_URL = "http://localhost:11434/api/generate"
ANALYSIS_MODEL = "llama3.1:8b"

class ThreatLogAnalyzer:

    # 簡易的な不審パターン(実際はより複雑なルールが必要)
    SUSPICIOUS_PATTERNS = {
        "sql_injection": r"(union\s+select|drop\s+table|insert\s+into|'\s*or\s*'1'='1)",
        "path_traversal": r"(\.\./|\.\.\\|%2e%2e%2f)",
        "xss_attempt": r"( None:
        """ログファイルを読み込む"""
        with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
            self.log_lines = f.readlines()[:max_lines]
        print(f"[*] {len(self.log_lines)}行のログを読み込みました")

    def extract_ips(self) -> Counter:
        """アクセス元IPアドレスを抽出・集計する"""
        ip_pattern = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
        ips = []
        for line in self.log_lines:
            matches = ip_pattern.findall(line)
            ips.extend(matches)
        return Counter(ips)

    def detect_suspicious(self) -> List[Dict]:
        """不審なパターンを検出する"""
        suspicious = []
        for i, line in enumerate(self.log_lines):
            line_lower = line.lower()
            detected_patterns = []
            for pattern_name, pattern in self.SUSPICIOUS_PATTERNS.items():
                if re.search(pattern, line_lower, re.IGNORECASE):
                    detected_patterns.append(pattern_name)
            if detected_patterns:
                suspicious.append({
                    "line_number": i + 1,
                    "log_entry": line.strip()[:200],  # 200文字に制限
                    "detected_patterns": detected_patterns
                })
        self.suspicious_entries = suspicious
        return suspicious

    def analyze_with_ai(self, sample_size: int = 20) -> str:
        """不審なログエントリをAIで詳細分析する"""
        # サンプルを絞って送信(コンテキスト長の制限を考慮)
        sample = self.suspicious_entries[:sample_size]

        # IPの集計情報
        ip_counter = self.extract_ips()
        top_ips = ip_counter.most_common(10)

        prompt = f"""
以下のセキュリティログ分析結果を評価してください。

【アクセス上位IPアドレス(Top 10)】
{json.dumps(top_ips, ensure_ascii=False)}

【不審なログエントリ(サンプル {len(sample)}/{len(self.suspicious_entries)}件)】
{json.dumps(sample, ensure_ascii=False, indent=2)}

以下の観点から分析してください:
1. 攻撃の種類と手法の特定
2. 攻撃者の意図と目標の推測
3. 最も深刻な脅威の特定(理由を含む)
4. 即時対応が必要なIPアドレスまたはパターン
5. 推奨するブロックルールやセキュリティ対策
6. 類似攻撃への対策として今後実装すべき検出ルール

分析結果は構造化された形式で日本語で回答してください。
"""
        payload = {
            "model": ANALYSIS_MODEL,
            "prompt": prompt,
            "stream": False
        }
        response = requests.post(OLLAMA_URL, json=payload, timeout=180)
        return response.json()["response"]

    def run_analysis(self) -> None:
        """分析を実行する"""
        self.load_logs()

        print("[*] 不審パターンの検出中...")
        suspicious = self.detect_suspicious()
        print(f"[+] {len(suspicious)}件の不審なエントリを検出")

        if not suspicious:
            print("[+] 不審なエントリは検出されませんでした")
            return

        print("[*] AIによる詳細分析中...")
        ai_analysis = self.analyze_with_ai()

        print("\n" + "="*60)
        print("【AI脅威分析レポート】")
        print("="*60)
        print(ai_analysis)

        # 結果を保存
        output = {
            "total_logs": len(self.log_lines),
            "suspicious_count": len(suspicious),
            "suspicious_entries": suspicious,
            "ai_analysis": ai_analysis,
            "top_ips": dict(self.extract_ips().most_common(20))
        }
        with open("threat_analysis.json", "w", encoding="utf-8") as f:
            json.dump(output, f, ensure_ascii=False, indent=2)
        print("\n[+] 詳細レポートを threat_analysis.json に保存しました")


# 使用例
# analyzer = ThreatLogAnalyzer("/var/log/nginx/access.log")
# analyzer.run_analysis()

企業・組織調査への応用:デジタルフットプリント分析

企業のセキュリティチームや調査担当者が、自社のデジタルフットプリントを把握するためのツールを構築します。これは攻撃者視点で自社のリスクを評価する、いわゆる「攻撃表面の管理(Attack Surface Management)」に不可欠な作業です。

このセクションの前提条件

以下に示すツールと技術は、自社が管理するドメイン・IPアドレス・資産に対して使用することを前提とします。他社・他者の資産に対して無許可で実行することは不正アクセス禁止法等に抵触する可能性があります。ペネトレーションテストとして第三者の資産を調査する場合は、必ず書面による許可を取得してください。

サブドメイン列挙とAI分析の連携

#!/usr/bin/env python3
"""
サブドメイン列挙とAI分析の統合ツール
前提:自社管理ドメインへの使用のみ許可
"""

import subprocess
import requests
import json
import socket

def enumerate_subdomains_passive(domain: str) -> List[str]:
    """
    パッシブな方法でサブドメインを列挙する
    (実際のツールはsubfinder、amass等を使用)
    
    パッシブ手法の主なデータソース:
    - Certificate Transparency Logs (crt.sh)
    - DNS Dumpster
    - Shodan等のパブリックAPIを使用するツール
    """
    subdomains = set()

    # crt.sh(Certificate Transparency)を使ったサブドメイン収集
    try:
        response = requests.get(
            f"https://crt.sh/?q=%.{domain}&output=json",
            timeout=30
        )
        for cert in response.json():
            name = cert.get("name_value", "")
            for sub in name.split("\n"):
                sub = sub.strip().lstrip("*.")
                if sub.endswith(domain) and sub != domain:
                    subdomains.add(sub)
    except Exception as e:
        print(f"crt.sh取得エラー: {e}")

    return list(subdomains)


def analyze_attack_surface(domain: str, subdomains: List[str]) -> str:
    """発見されたサブドメインの攻撃表面をAIで分析する"""

    # 各サブドメインのDNS解決を試みる
    resolved = {}
    for sub in subdomains[:30]:  # 最大30件に制限
        try:
            ip = socket.gethostbyname(sub)
            resolved[sub] = ip
        except socket.gaierror:
            resolved[sub] = "解決不可"

    prompt = f"""
以下は {domain} のサブドメイン調査結果です。

【発見されたサブドメイン(Certificate Transparencyより)】
総数: {len(subdomains)}件

【DNS解決結果(サンプル30件)】
{json.dumps(resolved, ensure_ascii=False, indent=2)}

この情報を攻撃表面管理(ASM)の観点から分析してください:

1. 注目すべきサブドメインとその理由
   (開発環境、ステージング、管理系、APIエンドポイント等)
2. 同一IPに複数サブドメインが向いている場合の考察
3. 一般的に脆弱性が発見されやすいサブドメインのパターン
4. この組織のインフラ構成から推測される特徴
5. 優先的にセキュリティ評価を実施すべきサブドメイン Top5とその理由

結果はセキュリティレポート形式で日本語で出力してください。
"""

    response = requests.post(
        "http://localhost:11434/api/generate",
        json={
            "model": "osint-assistant",
            "prompt": prompt,
            "stream": False
        },
        timeout=120
    )
    return response.json()["response"]


# 使用例(自社ドメインのみ)
# domain = "your-company.com"  # 必ず自社ドメインを指定
# subs = enumerate_subdomains_passive(domain)
# print(f"発見されたサブドメイン: {len(subs)}件")
# analysis = analyze_attack_surface(domain, subs)
# print(analysis)

漏洩情報の自動チェックシステム

企業のセキュリティ担当者にとって重要な業務の1つが、自社の認証情報やデータが漏洩していないかを定期的に確認することです。Ollamaと公開APIを組み合わせた自動モニタリングシステムを構築できます。

Have I Been Pwned API + Ollama 連携

#!/usr/bin/env python3
"""
認証情報漏洩チェックシステム
Have I Been Pwned API + Ollama
注意:HaveIBeenPwned APIの利用には登録・APIキーが必要な場合があります
"""

import requests
import hashlib
import json
from typing import List, Dict

HIBP_API_BASE = "https://haveibeenpwned.com/api/v3"
OLLAMA_URL = "http://localhost:11434/api/generate"

def check_password_pwned(password: str) -> int:
    """
    k-Anonymityモデルを使ってパスワード漏洩チェック
    パスワードの最初の5文字のSHA1ハッシュのみをAPIに送信
    実際のパスワードは外部に送信されない
    """
    sha1 = hashlib.sha1(password.encode('utf-8')).hexdigest().upper()
    prefix, suffix = sha1[:5], sha1[5:]

    response = requests.get(
        f"https://api.pwnedpasswords.com/range/{prefix}",
        headers={"Add-Padding": "true"}
    )

    hashes = (line.split(':') for line in response.text.splitlines())
    for h, count in hashes:
        if h == suffix:
            return int(count)
    return 0

def check_email_breach(email: str, api_key: str) -> List[Dict]:
    """
    メールアドレスの漏洩チェック
    HaveIBeenPwned APIキーが必要
    """
    headers = {
        "hibp-api-key": api_key,
        "User-Agent": "SecurityAuditTool"
    }
    response = requests.get(
        f"{HIBP_API_BASE}/breachedaccount/{email}",
        headers=headers
    )
    if response.status_code == 200:
        return response.json()
    elif response.status_code == 404:
        return []
    else:
        return [{"error": f"APIエラー: {response.status_code}"}]

def analyze_breach_results(email: str, breaches: List[Dict]) -> str:
    """漏洩情報をAIで分析してリスク評価を生成する"""
    if not breaches:
        prompt = f"{email} は既知の漏洩データベースに含まれていませんでした。ただし、これは完全な安全性を保証するものではないことを説明してください。"
    else:
        breach_summary = []
        for b in breaches:
            breach_summary.append({
                "name": b.get("Name", "不明"),
                "breach_date": b.get("BreachDate", "不明"),
                "data_classes": b.get("DataClasses", []),
                "is_sensitive": b.get("IsSensitive", False)
            })

        prompt = f"""
{email} のメールアドレスが以下の{len(breaches)}件のデータ漏洩事案に含まれています。

漏洩情報:
{json.dumps(breach_summary, ensure_ascii=False, indent=2)}

セキュリティの観点から以下を評価してください:
1. 最も深刻な漏洩事案とその理由
2. 漏洩データの種類に基づくリスク評価(パスワード、メール、電話番号、クレジットカード等)
3. 即時実施すべきセキュリティ対策(優先順位付き)
4. 長期的なリスク軽減策の提案
5. 総合リスクスコア(1-10)と評価根拠

結果は実用的なアドバイスとして、当該ユーザーが理解できるような日本語で出力してください。
"""

    response = requests.post(
        OLLAMA_URL,
        json={"model": "llama3.1:8b", "prompt": prompt, "stream": False},
        timeout=60
    )
    return response.json()["response"]


# 使用例
# pwn_count = check_password_pwned("mypassword123")
# if pwn_count > 0:
#     print(f"⚠️ このパスワードは {pwn_count:,} 回漏洩データに登場しています")
# else:
#     print("✅ このパスワードは既知の漏洩データに含まれていません")

人物・組織調査の倫理的実施方法

OSINT調査の中でも特に慎重な取り扱いが求められるのが、人物・組織に関する調査です。本セクションでは、正当な目的(デューデリジェンス、セキュリティ研究、ジャーナリズム)において倫理的に実施する方法を解説します。

⚠️ 人物調査における法的・倫理的境界線

  • 個人情報保護法:特定の個人に関する情報を収集・利用・提供する際には同法の規制対象となりえます
  • ストーカー規制法:特定の個人の行動・所在を継続的に調査・監視する行為は規制対象となりえます
  • 名誉毀損:根拠のない情報を公開・共有することは名誉毀損に該当する場合があります
  • 正当な調査目的:採用審査、デューデリジェンス、ジャーナリズム、法執行のサポートなど

公開情報のみを使った正当なデューデリジェンス

企業やビジネスパートナーに対するデューデリジェンス(適正評価)は、OSINT技術の正当な活用例です。Ollamaを使って公開情報を効率的に整理・分析できます。

#!/usr/bin/env python3
"""
企業デューデリジェンス支援ツール
公開情報のみを使用した企業評価
"""

import requests
import json

OLLAMA_URL = "http://localhost:11434/api/chat"

DILIGENCE_SYSTEM_PROMPT = """
あなたは企業デューデリジェンスの専門家AIです。
提供された公開情報のみを使用して、客観的な企業評価を行います。

評価基準:
- 財務健全性の指標(公開情報から読み取れる範囲)
- 経営陣の経歴と実績
- 法的・コンプライアンスリスク
- 評判・メディア露出の分析
- サイバーセキュリティ態勢(公開情報より)

注意:
- 推測と事実を明確に区別する
- 確認できていない情報には「要確認」を付ける
- プライバシーを侵害する調査は行わない
"""

class DueDiligenceAnalyzer:
    def __init__(self):
        self.conversation_history = [
            {"role": "system", "content": DILIGENCE_SYSTEM_PROMPT}
        ]

    def analyze(self, query: str) -> str:
        self.conversation_history.append(
            {"role": "user", "content": query}
        )
        response = requests.post(
            OLLAMA_URL,
            json={
                "model": "qwen2.5:14b",
                "messages": self.conversation_history,
                "stream": False
            },
            timeout=120
        )
        assistant_msg = response.json()["message"]["content"]
        self.conversation_history.append(
            {"role": "assistant", "content": assistant_msg}
        )
        return assistant_msg

    def generate_dd_report(self, company_name: str,
                            public_info: str) -> str:
        """公開情報からデューデリジェンスレポートを生成する"""
        query = f"""
以下の公開情報をもとに、{company_name} のデューデリジェンスレポートを作成してください。

【提供された公開情報】
{public_info}

レポートには以下のセクションを含めてください:
1. 会社概要サマリー
2. ポジティブな指標
3. 注意すべきリスク要因
4. 追加確認が推奨される項目
5. 総合評価(A/B/C/D)と根拠

注意:すべての評価は提供された公開情報のみに基づくものであることを明記してください。
"""
        return self.analyze(query)

# 使用例
# analyzer = DueDiligenceAnalyzer()
# 
# public_info = """
# - 設立年:2010年
# - 従業員数:公開情報では約500名
# - 主要サービス:クラウドセキュリティ
# - 資金調達:シリーズC 30億円(2023年)
# - 主要顧客:金融機関、官公庁
# - 代表者のLinkedIn:エンジニア出身、前職は大手IT企業CTO
# - 直近ニュース:ISO 27001取得(2024年)
# - サイバーインシデント履歴:公開情報では確認されず
# """
# 
# report = analyzer.generate_dd_report("株式会社サンプル", public_info)
# print(report)

Shodan・Censysとの連携:インターネット露出分析

Shodan や Censys は、インターネット上に公開されているデバイスやサービスを検索するためのサーチエンジンです。自社インフラの外部露出を把握するために重要なツールです。

#!/usr/bin/env python3
"""
Shodan API + Ollama による露出分析
前提:shodan ライブラリのインストールが必要
pip install shodan
"""

import shodan
import requests
import json

SHODAN_API_KEY = "YOUR_SHODAN_API_KEY"  # Shodan APIキー
OLLAMA_URL = "http://localhost:11434/api/generate"

def analyze_shodan_results(query: str) -> None:
    """
    Shodanの検索結果をOllamaで分析する
    ※自社のIPアドレス範囲やドメインのみを対象にすること
    """
    api = shodan.Shodan(SHODAN_API_KEY)

    try:
        results = api.search(query, limit=20)
    except shodan.APIError as e:
        print(f"Shodan APIエラー: {e}")
        return

    # Shodanの生データを整形
    findings = []
    for match in results.get("matches", []):
        finding = {
            "ip": match.get("ip_str"),
            "port": match.get("port"),
            "transport": match.get("transport", "tcp"),
            "product": match.get("product", "不明"),
            "version": match.get("version", "不明"),
            "os": match.get("os", "不明"),
            "org": match.get("org", "不明"),
            "vulns": list(match.get("vulns", {}).keys()),
            "tags": match.get("tags", []),
            "last_update": match.get("timestamp", "")
        }
        findings.append(finding)

    # Ollamaによる分析
    prompt = f"""
以下はShodanによるインターネット公開サービスの調査結果です。

検索クエリ: {query}
ヒット件数: {results.get("total", 0)}件

調査結果(サンプル{len(findings)}件):
{json.dumps(findings, ensure_ascii=False, indent=2)}

セキュリティの観点から以下を評価してください:

1. 最も危険な公開サービス・ポートの特定と理由
2. 発見されたCVE/脆弱性の重大度評価
3. 設定ミスや不必要なサービス公開の指摘
4. 即時対応が必要なリスク(対応優先順位付き)
5. 長期的なセキュリティ改善の推奨事項
6. Attack Surface の概要評価

実用的なセキュリティアドバイスとして、技術者が理解できる形式で回答してください。
"""

    response = requests.post(
        OLLAMA_URL,
        json={"model": "llama3.1:8b", "prompt": prompt, "stream": False},
        timeout=120
    )
    print(response.json()["response"])

# 使用例(必ず自社のIPレンジを指定)
# analyze_shodan_results("org:Your-Company-Name")

ダークウェブ・漏洩情報モニタリング

企業のセキュリティチームにとって、ダークウェブに自社の情報が流れていないかを監視することは重要な業務です。ここではAIを使った効率的なモニタリング手法を紹介します。

Torネットワーク・ダークウェブアクセスの注意事項

Tor経由でのダークウェブアクセスは、日本国内の法律に基づく制限・規制の対象となる場合があります。また、マルウェアや詐欺コンテンツへの露出リスクも高いです。セキュリティ調査目的であっても、専用の隔離された環境(VM、専用端末)での実施と、必要に応じた顧問弁護士への確認を推奨します。

公開されている漏洩データベースの監視(合法的手段)

#!/usr/bin/env python3
"""
公開APIを使ったデータ漏洩モニタリング
合法・公開のデータソースのみを使用
"""

import requests
import json
from datetime import datetime

OLLAMA_URL = "http://localhost:11434/api/generate"

def check_github_exposure(org_or_user: str, keyword: str) -> List[Dict]:
    """
    GitHub上での機密情報漏洩をチェックする
    GitHub Code Search APIを使用(要認証)
    トークン、APIキー、接続文字列などの誤コミットを検出
    """
    headers = {
        "Authorization": "Bearer YOUR_GITHUB_TOKEN",
        "Accept": "application/vnd.github.v3+json"
    }

    # 検索クエリ:組織/ユーザーのリポジトリ内のキーワード検索
    query = f"org:{org_or_user} {keyword}"

    response = requests.get(
        "https://api.github.com/search/code",
        params={"q": query, "per_page": 10},
        headers=headers
    )

    if response.status_code == 200:
        items = response.json().get("items", [])
        findings = []
        for item in items:
            findings.append({
                "file": item["name"],
                "repository": item["repository"]["full_name"],
                "path": item["path"],
                "url": item["html_url"],
                "last_modified": item.get("last_modified", "不明")
            })
        return findings
    return []


def analyze_github_exposure(org: str) -> None:
    """GitHub上の自社機密情報露出をOllamaで分析する"""
    # 検索対象キーワード(自社の機密情報に関連するもの)
    sensitive_keywords = [
        "password",
        "api_key",
        "secret_key",
        "private_key",
        "connection_string",
        "database_password"
    ]

    all_findings = {}
    for keyword in sensitive_keywords:
        findings = check_github_exposure(org, keyword)
        if findings:
            all_findings[keyword] = findings

    if not all_findings:
        print(f"[+] {org} のGitHubリポジトリで潜在的な機密情報漏洩は検出されませんでした")
        return

    # AIによるリスク評価
    prompt = f"""
GitHubコード検索による {org} 組織の機密情報露出チェック結果です。

【検出された潜在的な問題】
{json.dumps(all_findings, ensure_ascii=False, indent=2)}

以下のセキュリティ評価を行ってください:

1. 最も深刻なリスクとその影響範囲
2. 各検出項目の対応優先度(CRITICAL/HIGH/MEDIUM/LOW)
3. 即時実施すべき対応手順(ステップバイステップ)
4. 再発防止のためのコーディングガイドライン提案
5. .gitignoreやpre-commitフックでの自動防止策

レポートを技術的に正確に、かつ経営層にも説明できる形式で出力してください。
"""

    response = requests.post(
        OLLAMA_URL,
        json={"model": "llama3.1:8b", "prompt": prompt, "stream": False},
        timeout=120
    )
    print(response.json()["response"])

# 使用例(自社のGitHub組織名を指定)
# analyze_github_exposure("your-company-org")

自動レポート生成とワークフロー自動化

OSINTの最終成果物はレポートです。Ollamaを使って収集した情報を自動的に整理し、プロフェッショナルなレポートとして出力するシステムを構築できます。

n8nとの連携によるワークフロー自動化

n8n(ノーコード/ローコードワークフローツール)とOllamaを組み合わせることで、定期的なOSINTモニタリングを完全自動化できます。

# n8n + Ollama 自動監視ワークフロー設定例(概念)
#
# 定期実行(毎日午前2時)
#   ↓
# 監視対象ドメインリストを読み込み
#   ↓
# 各ドメインに対してOSINTチェック実行
#   ↓
# Ollama APIで結果を分析・要約
#   ↓
# 新規リスクが発見された場合:
#   - Slackに通知
#   - メールでレポート送信
#   - インシデント管理システムに登録
#
# n8n の Ollama ノード設定例:
# {
#   "host": "http://localhost:11434",
#   "model": "osint-assistant",
#   "prompt": "{{ $json.osint_data }}を分析してください",
#   "stream": false
# }

Markdown/HTMLレポートの自動生成

#!/usr/bin/env python3
"""
OSINT調査レポートの自動生成
Ollamaを使ってプロフェッショナルなMarkdownレポートを作成
"""

import requests
import json
from datetime import datetime
from pathlib import Path

OLLAMA_URL = "http://localhost:11434/api/generate"

def generate_osint_report(
    target: str,
    raw_findings: Dict,
    report_type: str = "executive"
) -> str:
    """
    OSINTの発見事項から調査レポートを生成する
    
    report_type:
        "executive" - 経営層向け(技術用語を避ける)
        "technical" - 技術担当者向け(詳細な技術情報を含む)
        "legal"     - 法務・コンプライアンス向け(法的観点を重視)
    """

    report_instructions = {
        "executive": "非技術系の経営層が理解できる形式で、ビジネスリスクを中心に",
        "technical": "セキュリティエンジニアが詳細な対応を行えるよう、技術的詳細を含めて",
        "legal": "法務・コンプライアンス担当者向けに、法的リスクと規制への影響を中心に"
    }

    instruction = report_instructions.get(report_type, report_instructions["executive"])

    prompt = f"""
以下のOSINT調査結果をもとに、{instruction}形式のセキュリティ調査レポートを作成してください。

調査対象: {target}
調査日時: {datetime.utcnow().strftime('%Y年%m月%d日 %H:%M UTC')}

【調査結果データ】
{json.dumps(raw_findings, ensure_ascii=False, indent=2)}

以下の構成でMarkdown形式のレポートを作成してください:

# セキュリティOSINT調査レポート

## 1. エグゼクティブサマリー
(3-5行で全体を要約)

## 2. 調査概要
- 調査対象
- 調査手法
- 調査範囲

## 3. 主要な発見事項
### 3.1 重大なリスク(CRITICAL/HIGH)
### 3.2 中程度のリスク(MEDIUM)
### 3.3 低リスク・情報事項(LOW/INFO)

## 4. リスク評価マトリクス
(表形式で各リスクの影響度と発生可能性を評価)

## 5. 推奨対応策
### 即時対応(24時間以内)
### 短期対応(1週間以内)
### 長期対応(1ヶ月以内)

## 6. 制約・免責事項

---
本レポートは公開情報のみに基づいています。
"""

    response = requests.post(
        OLLAMA_URL,
        json={"model": "qwen2.5:14b", "prompt": prompt, "stream": False},
        timeout=300
    )
    return response.json()["response"]


def save_report(target: str, report_content: str) -> str:
    """レポートをファイルに保存する"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    safe_target = target.replace(".", "_").replace("/", "_")
    filename = f"osint_report_{safe_target}_{timestamp}.md"

    output_dir = Path("reports")
    output_dir.mkdir(exist_ok=True)
    output_path = output_dir / filename

    with open(output_path, "w", encoding="utf-8") as f:
        f.write(report_content)

    print(f"[+] レポートを保存しました: {output_path}")
    return str(output_path)

# 使用例
# findings = {
#     "whois": {"registrar": "example", "creation_date": "2020-01-01"},
#     "ssl": {"issuer": "Let's Encrypt", "days_remaining": 45},
#     "open_ports": [80, 443, 22],
#     "detected_technologies": ["nginx/1.18", "PHP/8.1"]
# }
# 
# report = generate_osint_report("example.com", findings, "technical")
# save_report("example.com", report)

推奨モデルとパフォーマンス比較

OSINT用途での利用において、モデルの選択は重要です。以下に各モデルのOSINT適性評価をまとめます。

モデルサイズ推論精度JSON出力日本語対応速度OSINT適性
qwen2.5:14b14B★★★★★★★★★★★★★★★★★★☆☆◎ 最推奨
llama3.1:8b8B★★★★☆★★★★☆★★★☆☆★★★★☆○ 推奨
mistral:7b7B★★★★☆★★★★☆★★☆☆☆★★★★★○ 英語向け
deepseek-r1:8b8B★★★★★★★★☆☆★★★★☆★★★☆☆△ 推論特化
gemma3:12b12B★★★★☆★★★★☆★★★★☆★★★☆☆○ バランス型
phi4:14b14B★★★★★★★★★★★★★☆☆★★★☆☆○ 高精度

OSINT用途でのモデル選択指針

  • 日本語での調査・レポート生成が主目的qwen2.5:14b が最も推奨。日本語の質が高く、構造化出力も安定している。
  • 速度重視・リアルタイム分析llama3.1:8b または mistral:7b。8GBのGPUでも快適に動作する。
  • 複雑な推論・パターン分析deepseek-r1:14b。思考プロセスを段階的に追う「Chain of Thought」が得意。
  • 低スペックマシンqwen2.5:3b または llama3.2:3b。精度は落ちるが4GB RAMでも動作可能。

OSINT活動における法的・倫理的フレームワーク

技術的な実装と同様に重要なのが、OSINT活動の法的・倫理的な枠組みです。特に日本では以下の法律が関連します。

関連法令

法令概要OSINT との関係
個人情報保護法個人情報の適切な取り扱いを規定個人に関する情報収集・利用には同法の規制が適用
不正アクセス禁止法不正なコンピュータアクセスを禁止認証回避・無許可アクセスは厳禁
ストーカー規制法つきまとい・監視行為を規制特定個人の継続的な追跡調査は該当リスクあり
不正競争防止法営業秘密の不正取得等を禁止競合他社の機密情報収集は規制対象
電気通信事業法通信の秘密を保護ネットワークトラフィックの傍受等は禁止

倫理的OSINT実施のための自己チェックリスト

✅ OSINT実施前の自己確認チェックリスト

  • ☐ 調査目的が明確かつ正当である(業務上の必要性がある)
  • ☐ 対象が自社管理資産、または書面による調査許可を得た資産である
  • ☐ 収集する情報は公開情報(OSINT)の範囲に収まっている
  • ☐ 不正アクセス・スキャン・クローリングなどの手法を使わない
  • ☐ 収集した情報の保管・管理方法が適切である
  • ☐ 調査結果の共有先・利用目的が限定されている
  • ☐ 個人情報を含む場合、個人情報保護法の要件を確認した
  • ☐ 必要に応じて法務部門・顧問弁護士に確認した
  • ☐ 調査記録(誰が・何を・いつ・なぜ調査したか)を残している

まとめ:Ollama OSINTアシスタントの展望

本記事では、OllamaをベースとしたOSINT専用AIの構築方法を体系的に解説しました。ローカルLLMがOSINT活動にもたらす最大の価値は、機密性の高い調査内容をクラウドに送信することなく、強力なAI分析を活用できる点にあります。

今後の発展として特に注目されるのが以下の方向性です。

  • マルチモーダル対応:Llama 3.2 Visionなどのビジョンモデルを組み合わせることで、画像・スクリーンショットのOSINT分析が可能になる
  • エージェント型OSINT:LangChain / LlamaIndexのエージェント機能を使って、AIが自律的に複数のツールを呼び出す完全自動化OSINT
  • グラフデータベースとの統合:Neo4jなどのグラフDBと連携し、組織・人物・インフラの関係性を可視化する
  • 継続的モニタリング:n8nやAirflowを使ったスケジュール実行による24時間365日の自動監視体制の構築

AIとOSINTの融合は急速に進んでいます。しかし技術が進化するほど、倫理的・法的な境界線を守ることの重要性も高まります。本記事で紹介した技術は、正当な目的のために、適切な権限のもとで使用することを常に心がけてください。

Ollamaでできる・できないこと(OSINT観点のまとめ)

できること:収集した公開情報の高速・高精度な分析、大量ログの自動分類・要約、調査レポートの自動生成、チームでの情報共有ワークフローの構築、完全オフライン・機密保持での調査

Ollamaだけではできないこと:情報の自動収集(別途ツールが必要)、Webブラウジング(ツール連携が必要)、Shodanなどの外部サービスへのアクセス(各APIが必要)

絶対にしてはいけないこと:無許可のシステムへのスキャン・アクセス、プライバシー侵害目的の個人追跡、違法な手段での情報収集


※ 本記事の情報は2025年3月時点のものです。記載されているツール・APIの仕様は変更される場合があります。最新情報は各公式ドキュメントをご確認ください。本記事のコードは教育・研究目的のサンプルです。実際の利用にあたっては、適用される法律・社内規定を遵守した上でご使用ください。

コメント