Sign In

Updated; HF parquet converter to kohya_ss sd-scripts data format

1
Updated; HF parquet converter to kohya_ss sd-scripts data format

I have sourced a series of datasets based on applying additional controllers and converted them to flat zips + jsons, which I then download from huggingface, extract, and create prompt files using the data within the jsons.

https://huggingface.co/datasets/AbstractPhil/unography-movie-scenes-resized-captioned-6k

https://huggingface.co/datasets/AbstractPhil/ldhnam-deepfashion_controlnet

https://huggingface.co/datasets/AbstractPhil/SargeZT-coco-stuff-captioned

https://huggingface.co/datasets/AbstractPhil/Flux_SD3_MJ_Dalle_Human_Alignment_Dataset

Along with those I've sourced nearly 100,000 images from all walks of life; from tables, to chairs, to walls, control nets, and everything between - including a multitude of Flux and human alignment datasets beyond this particular one.

Each of the datasets are are prepared with this collab notebook - made specifically to auto-source parquet datasets and convert them to something more pliable for kohya to understand. Feel free to modify it - it does not grab internal images and it does not scrape image urls. It could use a few tweaks and fixes but it'll do for the immediate need.

Updated; new collab works better, but still isn't perfect.

# ==============================================================
# NOTEBOOK: PARQUET ➜ KOHYA ZIP  (Jupyter ⧸ Colab)
# ==============================================================


# ──────────────────────────────
# CELL 1 - Runtime & HF login
# ──────────────────────────────
import os, logging
from pathlib import Path

try:                                # Are we inside Colab?
    import google.colab             # noqa: F401
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    !pip -q install datasets huggingface_hub pyarrow pillow
    from google.colab import auth, userdata
    auth.authenticate_user()

    HF_TOKEN = userdata.get("HF_TOKEN") or ""
    if not HF_TOKEN:                # fallback → interactive widget
        from huggingface_hub import notebook_login
        notebook_login()
else:
    HF_TOKEN = os.getenv("HF_TOKEN", "")
    if not HF_TOKEN:
        raise RuntimeError("Define HF_TOKEN env var for local/Jupyter use.")

from huggingface_hub import login
login(HF_TOKEN)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)5s | %(message)s",
)



# ──────────────────────────────
# CELL 2 - Configuration object
# ──────────────────────────────
from dataclasses import dataclass, field
from typing import Dict, List, Tuple

@dataclass
class Config:
    hf_token: str                                   # required
    parquet_repo: str = "tomytjandra/h-and-m-fashion-caption-12k"
    output_repo: str  = "AbstractPhil/tomytjandra_h-and-m-fashion-caption-12k-sd-scripts"

    # tuning knobs ­­
    work_dir: Path = Path("packed_dataset")
    zip_image_cap: int = 1_000
    concatenate_captions: bool = True
    caption_priority: List[str] = field(
        default_factory=lambda: [
            "sentence",
            "long_caption", "long_prompt", "full_prompt",
            "prompt", "caption", "tags", "text", "description"
        ]
    )
    concatenated_delim: str = ".,"
    image_types: Tuple[str, ...] = (".jpg",".png",".gif",".bmp",".webp",".tif",".ico")
    magic_headers: Dict[bytes, str] = field(
        default_factory=lambda: {
            b"\xFF\xD8\xFF":".jpg", b"\x89PNG\r\n\x1a\n":".png",
            b"GIF87a":".gif", b"GIF89a":".gif", b"BM":".bmp",
            b"\x00\x00\x01\x00":".ico", b"II*\x00":".tif", b"MM\x00*":".tif"
        }
    )



# ──────────────────────────────
# CELL 3 - Utility functions
# ──────────────────────────────
import base64, json, zipfile
from io import BytesIO
from typing import Optional, List, Tuple
import datasets
from PIL import Image, UnidentifiedImageError
from huggingface_hub import HfApi, upload_file

# -------- dig_value: recursive extraction --------------------
def dig_value(obj) -> Optional[str]:
    """Return a usable string from nested structures, prioritising any 'value' key."""
    if obj is None:
        return None
    if isinstance(obj, (str, bytes, int, float, bool)):
        return str(obj).strip() or None
    if isinstance(obj, dict):
        for k, v in obj.items():               # explicit 'value'
            if k.lower() == "value":
                return dig_value(v)
        parts = [dig_value(v) for v in obj.values()]
        return ", ".join(p for p in parts if p)
    if isinstance(obj, list):
        parts = [dig_value(v) for v in obj]
        return ", ".join(p for p in parts if p)
    return None

# -------- image helpers --------------------------------------
def detect_ext(data: bytes, cfg: Config) -> str:
    for magic, ext in cfg.magic_headers.items():
        if data.startswith(magic):
            return ext
    return ".bin"

def try_decode(val, out: Path, cfg: Config) -> Optional[Path]:
    try:
        if isinstance(val, Image.Image):
            out = out.with_suffix(".jpg")
            val.convert("RGB").save(out)
            return out
        if isinstance(val, bytes):
            out = out.with_suffix(detect_ext(val, cfg))
            out.write_bytes(val)
            return out
        if isinstance(val, str):
            try:
                decoded = base64.b64decode(val, validate=True)
                out = out.with_suffix(detect_ext(decoded, cfg))
                out.write_bytes(decoded)
                return out
            except base64.binascii.Error:
                pass
        buf = val if isinstance(val, bytes) else base64.b64decode(val)
        img = Image.open(BytesIO(buf)).convert("RGB")
        out = out.with_suffix(f".{(img.format or 'jpg').lower()}")
        img.save(out)
        return out
    except (UnidentifiedImageError, ValueError, Exception) as e:
        logging.warning("Decode failed %s: %s", out.name, e)
        return None

# -------- caption selection ----------------------------------
def pick_caption(row: dict, cfg: Config) -> Tuple[Optional[str], Optional[str]]:
    """
    Returns (selected_key, text) or (None, None) if no caption found.
    """
    for key in cfg.caption_priority:
        if key in row:
            txt = dig_value(row[key])
            if txt:
                return key, txt
    return None, None

# -------- per-row writer -------------------------------------
def write_row(idx:int, row:dict, split_dir:Path, cfg:Config) -> Tuple[Optional[Path], List[Path]]:
    base = f"{idx+1:06d}"
    written: List[Path] = []
    img_path: Optional[Path] = None

    # image detection by common column keywords
    for col, val in row.items():
        if any(t in col.lower() for t in ("image", "img", "picture", "frame")):
            img_path = try_decode(val, split_dir / base, cfg)
            if img_path:
                written.append(img_path)
                break

    # caption & extras
    cap_key, caption = pick_caption(row, cfg)
    if caption:
        extras = []
        if cfg.concatenate_captions:
            extras = [
                dig_value(v)
                for k, v in row.items()
                if k != cap_key and k in cfg.caption_priority and dig_value(v)
            ]
        txt = (split_dir / base).with_suffix(".txt")
        txt.write_text(
            caption + (cfg.concatenated_delim + cfg.concatenated_delim.join(extras) if extras else ""),
            encoding="utf-8"
        )
        written.append(txt)

    # OPTIONAL: dump other simple scalar fields (illustrative—comment out if not desired)
    # for col, val in row.items():
    #     if col in cfg.caption_priority or any(t in col.lower() for t in ("image","img","picture","frame")):
    #         continue
    #     scalar = dig_value(val)
    #     if scalar:
    #         (split_dir / f"{base}_{col}.txt").write_text(scalar, encoding="utf-8")

    return img_path, written

# -------- batching helper ------------------------------------
def zip_upload_clean(files: List[Path], split: str, batch_idx:int, cfg:Config, api:HfApi):
    if not files:
        return
    zip_name = f"{split}-{batch_idx:06d}.zip"
    zp = cfg.work_dir / zip_name
    with zipfile.ZipFile(zp, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for f in files:
            zf.write(f, arcname=f.name)
    upload_file(
        path_or_fileobj=str(zp),
        path_in_repo=zip_name,
        repo_id=cfg.output_repo,
        repo_type="dataset",
        token=cfg.hf_token
    )
    for f in files:
        f.unlink(missing_ok=True)
    zp.unlink(missing_ok=True)



# ──────────────────────────────
# CELL 4 - Main conversion driver
# ──────────────────────────────
def parquet_to_kohya(cfg: Config):
    cfg.work_dir.mkdir(parents=True, exist_ok=True)
    api = HfApi()
    try:
        api.create_repo(cfg.output_repo, token=cfg.hf_token, repo_type="dataset")
    except Exception:
        pass

    ds = datasets.load_dataset(cfg.parquet_repo, streaming=True)

    for split_name, split_stream in ds.items():
        split_dir = cfg.work_dir / split_name
        split_dir.mkdir(parents=True, exist_ok=True)
        logging.info("Processing split %s …", split_name)

        batch_files: List[Path] = []
        imgs_in_batch = 0
        batch_idx = 1

        for idx, row in enumerate(split_stream):
            img, files = write_row(idx, row, split_dir, cfg)
            batch_files.extend(files)
            imgs_in_batch += bool(img)

            if imgs_in_batch >= cfg.zip_image_cap:
                zip_upload_clean(batch_files, split_name, batch_idx, cfg, api)
                batch_files.clear()
                imgs_in_batch = 0
                batch_idx += 1

        zip_upload_clean(batch_files, split_name, batch_idx, cfg, api)
        logging.info("✅ Finished split %s", split_name)



# ──────────────────────────────
# CELL 5 - Execute
# ──────────────────────────────
CFG = Config(hf_token=HF_TOKEN)
parquet_to_kohya(CFG)

1

Comments