Files
blog-writer/generator.py
2026-03-06 18:45:01 +01:00

378 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Wiki.js → Gemini Blog Post Pipeline
Commands:
fetch Download a Wiki.js page as Markdown via GraphQL
write Generate a blog post from SOURCE.md using Gemini
translate Translate BLOGPOST.md using Gemini
upload Upload TRANSLATED_BLOGPOST.md to Wiki.js under /blog/{kebab-title}
Required environment variables:
WIKI_BASE_DOMAIN e.g. https://wiki.example.com
WIKI_TOKEN Bearer token for Wiki.js API
GEMINI_API_KEY Google Gemini API key
ORIGINAL_LANG Language for the blog post (default: Hungarian)
TRANSLATE_LANG Target language for translation (default: English)
"""
import argparse
import json
import os
import re
import sys
import urllib.request
import urllib.error
# ---------------------------------------------------------------------------
# Config & Templates
# ---------------------------------------------------------------------------
OUTPUT_DIR = "output"
SOURCE_FILE = os.path.join(OUTPUT_DIR, "SOURCE.md")
SOURCE_TITLE_FILE = os.path.join(OUTPUT_DIR, "SOURCE_TITLE.txt")
BLOGPOST_FILE = os.path.join(OUTPUT_DIR, "BLOGPOST.md")
TRANSLATED_FILE = os.path.join(OUTPUT_DIR, "TRANSLATED_BLOGPOST.md")
INSTRUCTIONS_FILE = "INSTRUCTIONS.md"
GEMINI_MODEL = "gemini-flash-latest"
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models"
WRITE_PROMPT_TEMPLATE = """Please read the following instructions carefully and follow them to write a blog post.
## INSTRUCTIONS
{instructions}
## TASK
Read the source content below and write a blog post from it in {original_lang} language. Output only the blog post in Markdown format, with no additional commentary.
## SOURCE CONTENT
{source}"""
TRANSLATE_PROMPT_TEMPLATE = """Translate the following Markdown blog post into {translate_lang}. Preserve all Markdown formatting, headings, links, and code blocks exactly. Output only the translated Markdown with no additional commentary.
{blogpost}"""
# ---------------------------------------------------------------------------
# GraphQL Queries
# ---------------------------------------------------------------------------
QUERY_GET_PAGE = """
query ($path: String!) {
pages {
singleByPath(path: $path, locale: "en") {
id
title
description
content
}
}
}
"""
QUERY_FIND_PAGE = """
query ($path: String!) {
pages {
singleByPath(path: $path, locale: "en") {
id
}
}
}
"""
MUTATION_UPDATE_PAGE = """
mutation ($id: Int!, $content: String!, $description: String!) {
pages {
update(id: $id, content: $content, description: $description, tags: ["blog"]) {
responseResult { succeeded message }
}
}
}
"""
MUTATION_CREATE_PAGE = """
mutation ($path: String!, $title: String!, $content: String!, $description: String!) {
pages {
create(
path: $path
title: $title
content: $content
editor: "markdown"
locale: "en"
isPublished: true
isPrivate: false
tags: ["blog"]
description: $description
) {
responseResult { succeeded message }
page { id }
}
}
}
"""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def require_env(name: str, default: str = None) -> str:
value = os.environ.get(name, default)
if not value:
print(f"ERROR: Environment variable '{name}' is required.", file=sys.stderr)
sys.exit(1)
return value
def http_post(url: str, payload: dict, headers: dict) -> dict:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers=headers, method="POST")
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8")
print(f"ERROR: HTTP {e.code} from {url}\n{body}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as e:
print(f"ERROR: Could not reach {url}: {e.reason}", file=sys.stderr)
sys.exit(1)
def to_kebab(text: str) -> str:
text = text.lower()
text = re.sub(r"[^a-z0-9\s-]", "", text)
text = re.sub(r"[\s-]+", "-", text)
return text.strip("-")
def read_file(path: str) -> str:
if not os.path.exists(path):
print(f"ERROR: File not found: {path}", file=sys.stderr)
sys.exit(1)
with open(path, "r", encoding="utf-8") as f:
return f.read()
def write_file(path: str, content: str) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
print(f"✓ Saved to {path}")
# ---------------------------------------------------------------------------
# Classes
# ---------------------------------------------------------------------------
class WikiJS:
def __init__(self, base_domain: str, token: str):
self.base_domain = base_domain.rstrip("/")
self.token = token
self.api_url = f"{self.base_domain}/graphql"
def graphql(self, query: str, variables: dict = None) -> dict:
payload = {"query": query}
if variables:
payload["variables"] = variables
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
return http_post(self.api_url, payload, headers)
def get_page(self, path: str):
resp = self.graphql(QUERY_GET_PAGE, {"path": path})
return resp.get("data", {}).get("pages", {}).get("singleByPath"), resp
def find_page_id(self, path: str):
resp = self.graphql(QUERY_FIND_PAGE, {"path": path})
page = resp.get("data", {}).get("pages", {}).get("singleByPath")
return page.get("id") if page else None
def update_page(self, page_id: int, content: str, description: str):
variables = {"id": page_id, "content": content, "description": description}
resp = self.graphql(MUTATION_UPDATE_PAGE, variables)
return resp.get("data", {}).get("pages", {}).get("update", {}).get("responseResult", {}), resp
def create_page(self, path: str, title: str, content: str, description: str):
variables = {"path": path, "title": title, "content": content, "description": description}
resp = self.graphql(MUTATION_CREATE_PAGE, variables)
return resp.get("data", {}).get("pages", {}).get("create", {}).get("responseResult", {}), resp
class GoogleGemini:
def __init__(self, api_key: str, model: str = GEMINI_MODEL):
self.api_key = api_key
self.model = model
self.url = f"{GEMINI_BASE_URL}/{self.model}:generateContent"
def generate(self, prompt: str) -> str:
payload = {"contents": [{"parts": [{"text": prompt}]}]}
headers = {"Content-Type": "application/json", "X-goog-api-key": self.api_key}
resp = http_post(self.url, payload, headers)
try:
return resp["candidates"][0]["content"]["parts"][0]["text"]
except (KeyError, IndexError):
print(f"ERROR: Unexpected Gemini response structure: {resp}", file=sys.stderr)
sys.exit(1)
class BlogWriter:
def __init__(self):
self.wiki = WikiJS(
require_env("WIKI_BASE_DOMAIN"),
require_env("WIKI_TOKEN")
)
self.gemini = GoogleGemini(
require_env("GEMINI_API_KEY")
)
def fetch(self, url: str):
# Strip base domain from URL if full URL was given, then strip leading slash
page_path = url.replace(self.wiki.base_domain, "").lstrip("/")
print(f"→ Fetching wiki page: /{page_path}")
page, resp = self.wiki.get_page(page_path)
if not page:
errors = resp.get("errors", resp)
print(f"ERROR: Page not found at '{page_path}': {errors}", file=sys.stderr)
sys.exit(1)
write_file(SOURCE_FILE, page["content"])
write_file(SOURCE_TITLE_FILE, page["title"])
def write(self):
original_lang = require_env("ORIGINAL_LANG", "Hungarian")
instructions = read_file(INSTRUCTIONS_FILE)
source = read_file(SOURCE_FILE)
print(f"→ Generating blog post in {original_lang} with Gemini...")
prompt = WRITE_PROMPT_TEMPLATE.format(
instructions=instructions,
original_lang=original_lang,
source=source
)
result = self.gemini.generate(prompt)
write_file(BLOGPOST_FILE, result)
def translate(self):
translate_lang = require_env("TRANSLATE_LANG", "English")
blogpost = read_file(BLOGPOST_FILE)
print(f"→ Translating blog post to {translate_lang} with Gemini...")
prompt = TRANSLATE_PROMPT_TEMPLATE.format(
translate_lang=translate_lang,
blogpost=blogpost
)
result = self.gemini.generate(prompt)
write_file(TRANSLATED_FILE, result)
def upload(self):
content = read_file(TRANSLATED_FILE)
description = read_file(SOURCE_TITLE_FILE).strip()
# Extract H1 title
match = re.search(r"^#\s+(.+)", content, re.MULTILINE)
if not match:
print(f"ERROR: No H1 heading found in {TRANSLATED_FILE}", file=sys.stderr)
sys.exit(1)
title = match.group(1).strip()
content = re.sub(r"^#\s+.+\n?", "", content, count=1, flags=re.MULTILINE).lstrip("\n")
kebab = to_kebab(title)
page_path = f"blog/{kebab}"
print(f"→ Uploading to Wiki.js")
print(f" Title : {title}")
print(f" Path : /{page_path}")
print(f" Description: {description}")
existing_id = self.wiki.find_page_id(page_path)
if existing_id:
print(f" Found existing page id={existing_id}, updating...")
result, resp = self.wiki.update_page(existing_id, content, description)
else:
print(" Page not found, creating new...")
result, resp = self.wiki.create_page(page_path, title, content, description)
errors = resp.get("errors")
if errors:
print(f"ERROR: {json.dumps(errors, indent=2)}", file=sys.stderr)
sys.exit(1)
if not result.get("succeeded"):
print(f"ERROR: Operation failed: {result.get('message')}", file=sys.stderr)
sys.exit(1)
print(f"✓ Successfully uploaded to {self.wiki.base_domain}/{page_path}")
def clean(self):
"""Delete all .md files in the output directory."""
if not os.path.exists(OUTPUT_DIR):
print(f"→ Output directory '{OUTPUT_DIR}' does not exist. Nothing to clean.")
return
print(f"→ Cleaning {OUTPUT_DIR}/...")
count = 0
for filename in os.listdir(OUTPUT_DIR):
if filename.endswith(".md") or filename.endswith(".txt"):
os.remove(os.path.join(OUTPUT_DIR, filename))
count += 1
print(f"✓ Removed {count} Markdown files.")
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Wiki.js → Gemini Blog Post Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
subparsers = parser.add_subparsers(dest="command", required=True)
# fetch
p_fetch = subparsers.add_parser("fetch", help="Download a Wiki.js page as Markdown")
p_fetch.add_argument("url", help="Page path or full URL, e.g. /my-page or https://wiki.example.com/my-page")
# write
subparsers.add_parser("write", help=f"Generate blog post using Gemini")
# translate
subparsers.add_parser("translate", help=f"Translate generated blog post using Gemini")
# upload
subparsers.add_parser("upload", help=f"Upload translated blog post to Wiki.js")
# clean
subparsers.add_parser("clean", help=f"Delete all .md files in the {OUTPUT_DIR} directory")
args = parser.parse_args()
writer = BlogWriter()
if args.command == "fetch":
writer.fetch(args.url)
elif args.command == "write":
writer.write()
elif args.command == "translate":
writer.translate()
elif args.command == "upload":
writer.upload()
elif args.command == "clean":
writer.clean()
if __name__ == "__main__":
main()