RenderJobService.java
package org.fresnel.backend.jobs;
import jakarta.annotation.PreDestroy;
import org.fresnel.backend.persistence.RenderJobEntity;
import org.fresnel.backend.persistence.RenderJobRepository;
import org.fresnel.optics.PngExporter;
import org.fresnel.optics.RenderResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* Render-job registry with progress reporting via {@link RenderJob}.
*
* <p>In-flight jobs are kept in an in-memory map so SSE listeners can react to
* progress in microseconds; on terminal state changes ({@code COMPLETED} /
* {@code FAILED}) the job is persisted via {@link RenderJobRepository} so the
* result survives JVM restarts and is visible to other instances. Lookups
* ({@link #get(String)}) consult both the live map and the repository.
*
* <p>Old in-memory jobs are reaped automatically after {@link #JOB_TTL_MS};
* persisted records remain in the database until explicitly purged.
*/
@Service
public class RenderJobService {
public static final long JOB_TTL_MS = 30 * 60 * 1000L; // 30 min
public static final int WORKERS = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
private final ExecutorService executor = Executors.newFixedThreadPool(WORKERS, r -> {
Thread t = new Thread(r, "render-job");
t.setDaemon(true);
return t;
});
private final ConcurrentMap<String, RenderJob> jobs = new ConcurrentHashMap<>();
private final AtomicLong seq = new AtomicLong();
private final RenderJobRepository repository;
@Autowired
public RenderJobService(RenderJobRepository repository) {
this.repository = repository;
}
/** Submit a new render job. The supplier is given a progress callback (0..1). */
public RenderJob submit(String label, Function<RenderJob, RenderResult> work) {
String id = "j-" + System.currentTimeMillis() + "-" + seq.incrementAndGet();
RenderJob job = new RenderJob(id, label);
jobs.put(id, job);
String ownerId = currentOwnerOrNull();
// Persist a QUEUED row up-front so the job is visible across instances even
// before it starts executing.
try {
repository.save(new RenderJobEntity(id, label, ownerId));
} catch (RuntimeException ignored) {
// Persistence is best-effort; the in-memory entry is the source of truth
// until the job reaches a terminal state.
}
executor.submit(() -> {
try {
RenderResult result = work.apply(job);
job.complete(result);
persistTerminal(job, ownerId);
} catch (Throwable t) {
job.fail(t);
persistTerminal(job, ownerId);
} finally {
reapOldJobs();
}
});
return job;
}
/**
* Look up a job. First consults the live in-memory map; if absent (e.g. after a
* JVM restart or because TTL has elapsed), falls back to the repository and
* rehydrates a read-only {@link RenderJob} snapshot.
*/
public RenderJob get(String id) {
RenderJob live = jobs.get(id);
if (live != null) return live;
Optional<RenderJobEntity> persisted = repository.findById(id);
return persisted.map(RenderJobService::rehydrate).orElse(null);
}
/** Returns the persisted PNG for a completed job, if any. */
public Optional<byte[]> resultPng(String id) {
return repository.findById(id).map(RenderJobEntity::getResultPng);
}
public void remove(String id) {
jobs.remove(id);
repository.deleteById(id);
}
private void reapOldJobs() {
long cutoff = Instant.now().toEpochMilli() - JOB_TTL_MS;
jobs.entrySet().removeIf(e -> e.getValue().createdAtEpochMs() < cutoff);
}
private void persistTerminal(RenderJob job, String ownerId) {
try {
RenderJobEntity entity = repository.findById(job.id())
.orElseGet(() -> new RenderJobEntity(job.id(), job.label(), ownerId));
entity.setOwnerId(ownerId);
entity.setProgress(job.progress());
entity.setMessage(job.message());
entity.setFinishedAt(Instant.now());
if (job.state() == RenderJob.State.COMPLETED) {
entity.setState(RenderJobEntity.State.COMPLETED);
RenderResult r = job.result();
if (r != null) {
double dpi = 25.4 / r.pixelSizeMm();
entity.setResultPng(PngExporter.toPngBytes(r, dpi));
entity.setResultPixelSizeMm(r.pixelSizeMm());
entity.setResultWidthPx(r.widthPx());
entity.setResultHeightPx(r.heightPx());
}
} else {
entity.setState(RenderJobEntity.State.FAILED);
Throwable t = job.error();
if (t != null) {
String msg = t.getMessage();
entity.setErrorMessage(msg == null ? t.getClass().getSimpleName()
: msg.substring(0, Math.min(2048, msg.length())));
}
}
repository.save(entity);
} catch (IOException | RuntimeException e) {
// Persistence failure must not crash the worker; the in-memory job
// already holds the result for any active SSE subscribers.
}
}
private static RenderJob rehydrate(RenderJobEntity e) {
RenderJob job = new RenderJob(e.getId(), e.getLabel());
switch (e.getState()) {
case COMPLETED -> job.markCompletedExternally(e.getProgress(), e.getMessage());
case FAILED -> job.markFailedExternally(e.getMessage(), e.getErrorMessage());
case RUNNING -> job.reportProgress(e.getProgress(), e.getMessage());
case QUEUED -> { /* nothing */ }
}
return job;
}
private static String currentOwnerOrNull() {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if (auth == null || !auth.isAuthenticated()) return null;
String name = auth.getName();
return (name == null || "anonymousUser".equals(name)) ? null : name;
}
/**
* Stop accepting new work and tear down the worker pool when the Spring
* context closes (graceful shutdown / redeploy / test teardown). Without
* this, the executor would keep its threads alive until JVM exit even after
* the application context is destroyed.
*/
@PreDestroy
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}