I have sourced a series of datasets based on applying additional controllers and converted them to flat zips + jsons, which I then download from huggingface, extract, and create prompt files using the data within the jsons.
https://huggingface.co/datasets/AbstractPhil/unography-movie-scenes-resized-captioned-6k
https://huggingface.co/datasets/AbstractPhil/ldhnam-deepfashion_controlnet
https://huggingface.co/datasets/AbstractPhil/SargeZT-coco-stuff-captioned
https://huggingface.co/datasets/AbstractPhil/Flux_SD3_MJ_Dalle_Human_Alignment_Dataset
Along with those I've sourced nearly 100,000 images from all walks of life; from tables, to chairs, to walls, control nets, and everything between - including a multitude of Flux and human alignment datasets beyond this particular one.
Each of the datasets are are prepared with this collab notebook - made specifically to auto-source parquet datasets and convert them to something more pliable for kohya to understand. Feel free to modify it - it does not grab internal images and it does not scrape image urls. It could use a few tweaks and fixes but it'll do for the immediate need.
Updated; new collab works better, but still isn't perfect.
# ==============================================================
# NOTEBOOK: PARQUET ➜ KOHYA ZIP (Jupyter ⧸ Colab)
# ==============================================================
# ──────────────────────────────
# CELL 1 - Runtime & HF login
# ──────────────────────────────
import os, logging
from pathlib import Path
try: # Are we inside Colab?
import google.colab # noqa: F401
IN_COLAB = True
except ImportError:
IN_COLAB = False
if IN_COLAB:
!pip -q install datasets huggingface_hub pyarrow pillow
from google.colab import auth, userdata
auth.authenticate_user()
HF_TOKEN = userdata.get("HF_TOKEN") or ""
if not HF_TOKEN: # fallback → interactive widget
from huggingface_hub import notebook_login
notebook_login()
else:
HF_TOKEN = os.getenv("HF_TOKEN", "")
if not HF_TOKEN:
raise RuntimeError("Define HF_TOKEN env var for local/Jupyter use.")
from huggingface_hub import login
login(HF_TOKEN)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)5s | %(message)s",
)
# ──────────────────────────────
# CELL 2 - Configuration object
# ──────────────────────────────
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
@dataclass
class Config:
hf_token: str # required
parquet_repo: str = "tomytjandra/h-and-m-fashion-caption-12k"
output_repo: str = "AbstractPhil/tomytjandra_h-and-m-fashion-caption-12k-sd-scripts"
# tuning knobs
work_dir: Path = Path("packed_dataset")
zip_image_cap: int = 1_000
concatenate_captions: bool = True
caption_priority: List[str] = field(
default_factory=lambda: [
"sentence",
"long_caption", "long_prompt", "full_prompt",
"prompt", "caption", "tags", "text", "description"
]
)
concatenated_delim: str = ".,"
image_types: Tuple[str, ...] = (".jpg",".png",".gif",".bmp",".webp",".tif",".ico")
magic_headers: Dict[bytes, str] = field(
default_factory=lambda: {
b"\xFF\xD8\xFF":".jpg", b"\x89PNG\r\n\x1a\n":".png",
b"GIF87a":".gif", b"GIF89a":".gif", b"BM":".bmp",
b"\x00\x00\x01\x00":".ico", b"II*\x00":".tif", b"MM\x00*":".tif"
}
)
# ──────────────────────────────
# CELL 3 - Utility functions
# ──────────────────────────────
import base64, json, zipfile
from io import BytesIO
from typing import Optional, List, Tuple
import datasets
from PIL import Image, UnidentifiedImageError
from huggingface_hub import HfApi, upload_file
# -------- dig_value: recursive extraction --------------------
def dig_value(obj) -> Optional[str]:
"""Return a usable string from nested structures, prioritising any 'value' key."""
if obj is None:
return None
if isinstance(obj, (str, bytes, int, float, bool)):
return str(obj).strip() or None
if isinstance(obj, dict):
for k, v in obj.items(): # explicit 'value'
if k.lower() == "value":
return dig_value(v)
parts = [dig_value(v) for v in obj.values()]
return ", ".join(p for p in parts if p)
if isinstance(obj, list):
parts = [dig_value(v) for v in obj]
return ", ".join(p for p in parts if p)
return None
# -------- image helpers --------------------------------------
def detect_ext(data: bytes, cfg: Config) -> str:
for magic, ext in cfg.magic_headers.items():
if data.startswith(magic):
return ext
return ".bin"
def try_decode(val, out: Path, cfg: Config) -> Optional[Path]:
try:
if isinstance(val, Image.Image):
out = out.with_suffix(".jpg")
val.convert("RGB").save(out)
return out
if isinstance(val, bytes):
out = out.with_suffix(detect_ext(val, cfg))
out.write_bytes(val)
return out
if isinstance(val, str):
try:
decoded = base64.b64decode(val, validate=True)
out = out.with_suffix(detect_ext(decoded, cfg))
out.write_bytes(decoded)
return out
except base64.binascii.Error:
pass
buf = val if isinstance(val, bytes) else base64.b64decode(val)
img = Image.open(BytesIO(buf)).convert("RGB")
out = out.with_suffix(f".{(img.format or 'jpg').lower()}")
img.save(out)
return out
except (UnidentifiedImageError, ValueError, Exception) as e:
logging.warning("Decode failed %s: %s", out.name, e)
return None
# -------- caption selection ----------------------------------
def pick_caption(row: dict, cfg: Config) -> Tuple[Optional[str], Optional[str]]:
"""
Returns (selected_key, text) or (None, None) if no caption found.
"""
for key in cfg.caption_priority:
if key in row:
txt = dig_value(row[key])
if txt:
return key, txt
return None, None
# -------- per-row writer -------------------------------------
def write_row(idx:int, row:dict, split_dir:Path, cfg:Config) -> Tuple[Optional[Path], List[Path]]:
base = f"{idx+1:06d}"
written: List[Path] = []
img_path: Optional[Path] = None
# image detection by common column keywords
for col, val in row.items():
if any(t in col.lower() for t in ("image", "img", "picture", "frame")):
img_path = try_decode(val, split_dir / base, cfg)
if img_path:
written.append(img_path)
break
# caption & extras
cap_key, caption = pick_caption(row, cfg)
if caption:
extras = []
if cfg.concatenate_captions:
extras = [
dig_value(v)
for k, v in row.items()
if k != cap_key and k in cfg.caption_priority and dig_value(v)
]
txt = (split_dir / base).with_suffix(".txt")
txt.write_text(
caption + (cfg.concatenated_delim + cfg.concatenated_delim.join(extras) if extras else ""),
encoding="utf-8"
)
written.append(txt)
# OPTIONAL: dump other simple scalar fields (illustrative—comment out if not desired)
# for col, val in row.items():
# if col in cfg.caption_priority or any(t in col.lower() for t in ("image","img","picture","frame")):
# continue
# scalar = dig_value(val)
# if scalar:
# (split_dir / f"{base}_{col}.txt").write_text(scalar, encoding="utf-8")
return img_path, written
# -------- batching helper ------------------------------------
def zip_upload_clean(files: List[Path], split: str, batch_idx:int, cfg:Config, api:HfApi):
if not files:
return
zip_name = f"{split}-{batch_idx:06d}.zip"
zp = cfg.work_dir / zip_name
with zipfile.ZipFile(zp, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for f in files:
zf.write(f, arcname=f.name)
upload_file(
path_or_fileobj=str(zp),
path_in_repo=zip_name,
repo_id=cfg.output_repo,
repo_type="dataset",
token=cfg.hf_token
)
for f in files:
f.unlink(missing_ok=True)
zp.unlink(missing_ok=True)
# ──────────────────────────────
# CELL 4 - Main conversion driver
# ──────────────────────────────
def parquet_to_kohya(cfg: Config):
cfg.work_dir.mkdir(parents=True, exist_ok=True)
api = HfApi()
try:
api.create_repo(cfg.output_repo, token=cfg.hf_token, repo_type="dataset")
except Exception:
pass
ds = datasets.load_dataset(cfg.parquet_repo, streaming=True)
for split_name, split_stream in ds.items():
split_dir = cfg.work_dir / split_name
split_dir.mkdir(parents=True, exist_ok=True)
logging.info("Processing split %s …", split_name)
batch_files: List[Path] = []
imgs_in_batch = 0
batch_idx = 1
for idx, row in enumerate(split_stream):
img, files = write_row(idx, row, split_dir, cfg)
batch_files.extend(files)
imgs_in_batch += bool(img)
if imgs_in_batch >= cfg.zip_image_cap:
zip_upload_clean(batch_files, split_name, batch_idx, cfg, api)
batch_files.clear()
imgs_in_batch = 0
batch_idx += 1
zip_upload_clean(batch_files, split_name, batch_idx, cfg, api)
logging.info("✅ Finished split %s", split_name)
# ──────────────────────────────
# CELL 5 - Execute
# ──────────────────────────────
CFG = Config(hf_token=HF_TOKEN)
parquet_to_kohya(CFG)