# NAS 文件分类系统

## 项目结构

```
nas-file-organizer/
├── config.yaml          # 配置文件
├── nas_client.py        # NAS 客户端（支持多种协议）
├── classifier.py        # 文件分类逻辑
├── organizer.py         # 主程序
├── requirements.txt     # 依赖
└── logs/                # 日志目录
```

## 1. 依赖安装 (requirements.txt)

```
pyyaml>=6.0
smbprotocol>=1.9.0
paramiko>=3.4.0
watchdog>=3.0.0
tqdm>=4.66.0
python-dateutil>=2.8.2
```

## 2. 配置文件 (config.yaml)

```yaml
# NAS 连接配置
nas:
  # 方式 1: SMB/CIFS (群晖、威联通等通用)
  protocol: smb
  host: 192.168.1.100
  port: 445
  username: admin
  password: your_password
  share: media
  
  # 方式 2: SFTP/SSH
  # protocol: sftp
  # host: 192.168.1.100
  # port: 22
  # username: admin
  # password: your_password
  
  # 方式 3: WebDAV
  # protocol: webdav
  # host: 192.168.1.100
  # port: 5005
  # username: admin
  # password: your_password

# 文件分类规则
classification:
  # 按文件扩展名分类
  by_extension:
    enabled: true
    categories:
      documents: [pdf, doc, docx, xls, xlsx, ppt, pptx, txt, md]
      images: [jpg, jpeg, png, gif, bmp, raw, heic]
      videos: [mp4, mkv, avi, mov, wmv, flv]
      audio: [mp3, flac, wav, aac, ogg]
      archives: [zip, rar, 7z, tar, gz]
      code: [py, js, ts, java, cpp, c, h, go, rs]
      other: []
  
  # 按日期分类（年/月）
  by_date:
    enabled: false
    format: "%Y/%m"  # 按年/月 文件夹
  
  # 按文件大小分类
  by_size:
    enabled: false
    categories:
      small: { max: 1048576 }      # < 1MB
      medium: { min: 1048576, max: 104857600 }  # 1MB - 100MB
      large: { min: 104857600 }    # > 100MB

# 源文件夹和目标文件夹
paths:
  source: /unsorted        # NAS 上待整理的文件夹
  destination: /organized  # NAS 上整理后的文件夹
  
# 运行模式
mode:
  # once: 运行一次后退出
  # watch: 持续监控新文件
  type: once
  
  # 如果是 watch 模式
  watch:
    interval: 60  # 检查间隔（秒）

# 日志配置
logging:
  level: INFO
  file: logs/nas-organizer.log
```

## 3. NAS 客户端 (nas_client.py)

```python
#!/usr/bin/env python3
"""NAS 客户端 - 支持 SMB、SFTP、WebDAV 协议"""

import os
from pathlib import Path
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod

class NASClient(ABC):
    """NAS 客户端基类"""
    
    @abstractmethod
    def connect(self) -> bool:
        pass
    
    @abstractmethod
    def list_files(self, path: str) -> List[Dict[str, Any]]:
        pass
    
    @abstractmethod
    def download_file(self, remote_path: str, local_path: str) -> bool:
        pass
    
    @abstractmethod
    def upload_file(self, local_path: str, remote_path: str) -> bool:
        pass
    
    @abstractmethod
    def move_file(self, src: str, dst: str) -> bool:
        pass
    
    @abstractmethod
    def create_dir(self, path: str) -> bool:
        pass


class SMBClient(NASClient):
    """SMB/CIFS 协议客户端（适用于群晖、威联通等）"""
    
    def __init__(self, host: str, username: str, password: str, 
                 share: str, port: int = 445):
        self.host = host
        self.username = username
        self.password = password
        self.share = share
        self.port = port
        self.session = None
    
    def connect(self) -> bool:
        try:
            from smbclient import register_session, listdir, stat
            register_session(
                self.host,
                username=self.username,
                password=self.password,
                port=self.port
            )
            # 测试连接
            listdir(f"//{self.host}/{self.share}")
            return True
        except Exception as e:
            print(f"SMB 连接失败：{e}")
            return False
    
    def list_files(self, path: str) -> List[Dict[str, Any]]:
        from smbclient import listdir, stat
        from datetime import datetime
        
        files = []
        full_path = f"//{self.host}/{self.share}/{path}"
        
        for name in listdir(full_path):
            file_path = f"{full_path}/{name}"
            try:
                file_stat = stat(file_path)
                files.append({
                    'name': name,
                    'path': f"{path}/{name}",
                    'size': file_stat.st_size,
                    'is_dir': False,
                    'modified': datetime.fromtimestamp(file_stat.st_mtime)
                })
            except:
                pass  # 跳过无法访问的文件
        
        return files
    
    def move_file(self, src: str, dst: str) -> bool:
        import shutil
        from smbclient import open_file
        
        try:
            # 确保目标目录存在
            dst_dir = str(Path(dst).parent)
            self.create_dir(dst_dir)
            
            # SMB 移动文件
            shutil.move(
                f"//{self.host}/{self.share}/{src}",
                f"//{self.host}/{self.share}/{dst}"
            )
            return True
        except Exception as e:
            print(f"移动文件失败 {src} -> {dst}: {e}")
            return False
    
    def create_dir(self, path: str) -> bool:
        from smbclient import makedirs
        try:
            makedirs(f"//{self.host}/{self.share}/{path}", exist_ok=True)
            return True
        except:
            return True  # 目录已存在


class SFTPClient(NASClient):
    """SFTP 协议客户端"""
    
    def __init__(self, host: str, username: str, password: str, 
                 port: int = 22):
        self.host = host
        self.username = username
        self.password = password
        self.port = port
        self.client = None
    
    def connect(self) -> bool:
        try:
            import paramiko
            self.client = paramiko.SSHClient()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.client.connect(
                hostname=self.host,
                port=self.port,
                username=self.username,
                password=self.password
            )
            self.sftp = self.client.open_sftp()
            return True
        except Exception as e:
            print(f"SFTP 连接失败：{e}")
            return False
    
    def list_files(self, path: str) -> List[Dict[str, Any]]:
        files = []
        try:
            for entry in self.sftp.listdir_attr(path):
                files.append({
                    'name': entry.filename,
                    'path': f"{path}/{entry.filename}",
                    'size': entry.st_size,
                    'is_dir': entry.st_mode & 0o40000 != 0,
                    'modified': entry.st_mtime
                })
        except:
            pass
        return files
    
    def move_file(self, src: str, dst: str) -> bool:
        try:
            self.sftp.rename(src, dst)
            return True
        except Exception as e:
            print(f"移动文件失败 {src} -> {dst}: {e}")
            return False
    
    def create_dir(self, path: str) -> bool:
        try:
            self.sftp.makedirs(path)
            return True
        except:
            return True


def create_nas_client(config: Dict) -> Optional[NASClient]:
    """根据配置创建 NAS 客户端"""
    protocol = config.get('protocol', 'smb').lower()
    
    if protocol == 'smb':
        return SMBClient(
            host=config['host'],
            username=config['username'],
            password=config['password'],
            share=config['share'],
            port=config.get('port', 445)
        )
    elif protocol == 'sftp':
        return SFTPClient(
            host=config['host'],
            username=config['username'],
            password=config['password'],
            port=config.get('port', 22)
        )
    else:
        raise ValueError(f"不支持的协议：{protocol}")
```

## 4. 文件分类器 (classifier.py)

```python
#!/usr/bin/env python3
"""文件分类器 - 根据规则对文件进行分类"""

from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional

class FileClassifier:
    """文件分类器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.ext_rules = config.get('by_extension', {})
        self.date_rules = config.get('by_date', {})
        self.size_rules = config.get('by_size', {})
    
    def classify(self, file_info: Dict[str, Any]) -> str:
        """
        对文件进行分类，返回目标文件夹路径
        """
        name = file_info['name']
        size = file_info.get('size', 0)
        modified = file_info.get('modified', datetime.now())
        
        categories = []
        
        # 1. 按扩展名分类
        if self.ext_rules.get('enabled', False):
            ext_category = self._classify_by_extension(name)
            if ext_category:
                categories.append(ext_category)
        
        # 2. 按日期分类
        if self.date_rules.get('enabled', False):
            date_category = self._classify_by_date(modified)
            if date_category:
                categories.append(date_category)
        
        # 3. 按大小分类
        if self.size_rules.get('enabled', False):
            size_category = self._classify_by_size(size)
            if size_category:
                categories.append(size_category)
        
        # 组合分类路径
        if categories:
            return '/'.join(categories)
        return 'other'
    
    def _classify_by_extension(self, filename: str) -> Optional[str]:
        """按扩展名分类"""
        ext = Path(filename).suffix.lower().lstrip('.')
        if not ext:
            return None
        
        categories = self.ext_rules.get('categories', {})
        
        for category, extensions in categories.items():
            if not extensions:  # other 类别
                continue
            if ext in extensions:
                return category
        
        return 'other'
    
    def _classify_by_date(self, modified: datetime) -> str:
        """按日期分类"""
        date_format = self.date_rules.get('format', '%Y/%m')
        return modified.strftime(date_format)
    
    def _classify_by_size(self, size: int) -> Optional[str]:
        """按大小分类"""
        categories = self.size_rules.get('categories', {})
        
        for category, rules in categories.items():
            min_size = rules.get('min', 0)
            max_size = rules.get('max', float('inf'))
            
            if min_size <= size <= max_size:
                return category
        
        return None
```

## 5. 主程序 (organizer.py)

```python
#!/usr/bin/env python3
"""NAS 文件整理主程序"""

import os
import sys
import yaml
import logging
from pathlib import Path
from datetime import datetime
from tqdm import tqdm

from nas_client import create_nas_client
from classifier import FileClassifier

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('logs/nas-organizer.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class NASOrganizer:
    """NAS 文件整理器"""
    
    def __init__(self, config_path: str = 'config.yaml'):
        with open(config_path, 'r', encoding='utf-8') as f:
            self.config = yaml.safe_load(f)
        
        self.nas = create_nas_client(self.config['nas'])
        self.classifier = FileClassifier(self.config['classification'])
        self.source = self.config['paths']['source']
        self.destination = self.config['paths']['destination']
    
    def run(self):
        """运行文件整理"""
        logger.info("开始连接 NAS...")
        
        if not self.nas.connect():
            logger.error("NAS 连接失败")
            return False
        
        logger.info("NAS 连接成功")
        logger.info(f"扫描目录：{self.source}")
        
        # 获取文件列表
        files = self.nas.list_files(self.source)
        files = [f for f in files if not f['is_dir']]
        
        logger.info(f"发现 {len(files)} 个文件")
        
        # 整理文件
        success = 0
        failed = 0
        
        for file_info in tqdm(files, desc="整理文件"):
            try:
                # 获取分类路径
                category = self.classifier.classify(file_info)
                
                # 构建目标路径
                dst_path = f"{self.destination}/{category}/{file_info['name']}"
                src_path = file_info['path']
                
                # 移动文件
                if self.nas.move_file(src_path, dst_path):
                    logger.debug(f"✓ {file_info['name']} -> {category}/")
                    success += 1
                else:
                    logger.warning(f"✗ 移动失败：{file_info['name']}")
                    failed += 1
                    
            except Exception as e:
                logger.error(f"处理文件 {file_info['name']} 失败：{e}")
                failed += 1
        
        logger.info(f"整理完成：成功 {success} 个，失败 {failed} 个")
        return failed == 0


def main():
    """主函数"""
    config_file = sys.argv[1] if len(sys.argv) > 1 else 'config.yaml'
    
    if not os.path.exists(config_file):
        print(f"配置文件不存在：{config_file}")
        print("请先创建 config.yaml 配置文件")
        sys.exit(1)
    
    organizer = NASOrganizer(config_file)
    success = organizer.run()
    
    sys.exit(0 if success else 1)


if __name__ == '__main__':
    main()
```

## 6. 快速开始

```bash
# 1. 安装依赖
pip install -r requirements.txt

# 2. 编辑配置文件
vim config.yaml

# 3. 运行整理
python organizer.py

# 4. 后台持续监控（watch 模式）
# 修改 config.yaml 中 mode.type 为 watch
python organizer.py &
```

## 7. 进阶功能

### 7.1 定时任务 (cron)

```bash
# 每天凌晨 2 点运行
0 2 * * * cd /path/to/nas-organizer && python organizer.py >> logs/cron.log 2>&1
```

### 7.2 Docker 部署

```dockerfile
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .
CMD ["python", "organizer.py"]
```

### 7.3 添加 AI 内容分类

可以集成视觉模型对图片/视频内容进行智能分类：
- 人物、风景、文档截图
- 视频类型（电影、教程、监控）

---

## 下一步

告诉我你的 NAS 具体信息，我可以：
1. **生成完整的可运行代码**
2. **针对你的 NAS 品牌优化连接方式**
3. **定制分类规则**（按你的文件类型偏好）
4. **添加额外功能**（去重、压缩、备份等）

你的 NAS 是什么品牌和型号？
