AudioRingBuffer.java

package org.hammer.audio.buffer;

import java.util.concurrent.atomic.AtomicLong;

/**
 * Bounded, lock-free single-producer / single-consumer (SPSC) ring buffer.
 *
 * <p>This buffer is designed for realtime audio workloads where the audio capture thread is the
 * sole producer and a downstream DSP/analysis thread is the sole consumer. It avoids locks on the
 * hot path entirely: synchronization is performed via two {@link AtomicLong} sequences using
 * acquire/release semantics implicit in {@link AtomicLong#get()} / {@link AtomicLong#lazySet(long)}
 * /{@link AtomicLong#set(long)}.
 *
 * <p><strong>Capacity</strong> is rounded up to the next power of two so that the index calculation
 * can use a bitmask instead of modulo.
 *
 * <p><strong>Allocation</strong>: zero allocations on the hot path. The internal storage array is
 * allocated once at construction; {@link #offer(Object)} and {@link #poll()} only update sequence
 * counters.
 *
 * <p><strong>Concurrency contract</strong>: at most one producer thread may call {@link
 * #offer(Object)}, and at most one consumer thread may call {@link #poll()}. Read-only inspection
 * methods ({@link #size()}, {@link #isEmpty()}, {@link #isFull()}, {@link #capacity()}) are safe to
 * call from any thread.
 *
 * <p>This is intentionally specialized for SPSC: it is faster than a general-purpose queue and
 * matches the producer/consumer topology of an audio capture pipeline. For multi-producer or
 * multi-consumer scenarios use {@link java.util.concurrent.LinkedBlockingQueue} or similar.
 *
 * @param <T> element type; typically {@link org.hammer.audio.core.AudioBlock}
 * @author refactoring
 */
public final class AudioRingBuffer<T> {

  private final Object[] elements;
  private final int mask;
  private final int capacity;

  /** Next index a producer will write to (modulo capacity). Producer-write, consumer-read. */
  private final AtomicLong tail = new AtomicLong(0);

  /** Next index a consumer will read from (modulo capacity). Consumer-write, producer-read. */
  private final AtomicLong head = new AtomicLong(0);

  /**
   * Create a new SPSC ring buffer with at least the requested capacity.
   *
   * @param requestedCapacity minimum capacity; will be rounded up to the next power of two. Must be
   *     {@code >= 1} and {@code <= 2^30}.
   * @throws IllegalArgumentException if {@code requestedCapacity} is out of range
   */
  public AudioRingBuffer(int requestedCapacity) {
    if (requestedCapacity < 1) {
      throw new IllegalArgumentException("requestedCapacity must be >= 1");
    }
    if (requestedCapacity > (1 << 30)) {
      throw new IllegalArgumentException("requestedCapacity too large (max 2^30)");
    }
    int cap = 1;
    while (cap < requestedCapacity) {
      cap <<= 1;
    }
    this.capacity = cap;
    this.mask = cap - 1;
    this.elements = new Object[cap];
  }

  /**
   * @return the actual capacity (next power of two &ge; the requested value)
   */
  public int capacity() {
    return capacity;
  }

  /**
   * Approximate current number of elements in the buffer. Safe to call from any thread; may be
   * slightly stale because head and tail are not read atomically together.
   *
   * @return number of elements currently buffered
   */
  public int size() {
    long t = tail.get();
    long h = head.get();
    long s = t - h;
    if (s < 0) {
      return 0;
    }
    return s > capacity ? capacity : (int) s;
  }

  /**
   * @return {@code true} if empty (snapshot, may be stale)
   */
  public boolean isEmpty() {
    return tail.get() == head.get();
  }

  /**
   * @return {@code true} if full (snapshot, may be stale)
   */
  public boolean isFull() {
    return (tail.get() - head.get()) >= capacity;
  }

  /**
   * Offer an element to the buffer. Producer-only operation.
   *
   * <p>This method never blocks. If the buffer is full the element is rejected and {@code false} is
   * returned: realtime audio capture should drop the oldest data only via an explicit overwrite
   * strategy (see {@link #offerOverwrite(Object)}), never silently stall.
   *
   * @param element element to enqueue; must not be {@code null}
   * @return {@code true} if accepted, {@code false} if the buffer is full
   * @throws NullPointerException if {@code element} is {@code null}
   */
  public boolean offer(T element) {
    if (element == null) {
      throw new NullPointerException("element");
    }
    long t = tail.get();
    long h = head.get();
    if (t - h >= capacity) {
      return false;
    }
    elements[(int) (t & mask)] = element;
    tail.lazySet(t + 1);
    return true;
  }

  /**
   * Offer an element to the buffer, dropping the oldest element if the buffer is full.
   *
   * <p><strong>Concurrency restriction.</strong> This method writes to {@code head} from the
   * producer thread (it advances {@code head} past the dropped element). The strict SPSC contract
   * documented on this class — where {@code head} is consumer-write and {@code tail} is
   * producer-write — therefore <em>does not</em> hold while {@code offerOverwrite} is in use:
   *
   * <ul>
   *   <li><strong>Safe</strong>: single-threaded usage, or any topology where the consumer is
   *       <em>not</em> concurrently calling {@link #poll()} / {@link #drainTo(Object[], int)} /
   *       {@link #clear()}. Typical example: a producer that owns the buffer and drains it from the
   *       same thread (capture loop publishing to a "latest-wins" cache it later reads).
   *   <li><strong>Not safe</strong>: a separate consumer thread concurrently calling {@link
   *       #poll()} or {@link #drainTo(Object[], int)}. Use {@link #offer(Object)} and let the
   *       caller handle the {@code false} return instead.
   * </ul>
   *
   * <p>If you need "drop oldest" semantics with a concurrent consumer, prefer publishing the latest
   * element through a separate {@code volatile} reference (see {@link
   * org.hammer.audio.AudioCaptureServiceImpl#getLatestBlock()}) while calling {@link
   * #offer(Object)} on this buffer.
   *
   * @param element element to enqueue; must not be {@code null}
   * @return the element dropped because the buffer was full, or {@code null} if nothing was dropped
   * @throws NullPointerException if {@code element} is {@code null}
   */
  @SuppressWarnings("unchecked")
  public T offerOverwrite(T element) {
    if (element == null) {
      throw new NullPointerException("element");
    }
    long t = tail.get();
    long h = head.get();
    T dropped = null;
    if (t - h >= capacity) {
      long desiredHead = t - capacity + 1;
      // Best-effort advance: only move head forward.
      while (true) {
        long curHead = head.get();
        if (curHead >= desiredHead) {
          break;
        }
        if (head.compareAndSet(curHead, desiredHead)) {
          dropped = (T) elements[(int) (curHead & mask)];
          break;
        }
      }
    }
    elements[(int) (t & mask)] = element;
    tail.lazySet(t + 1);
    return dropped;
  }

  /**
   * Remove and return the oldest element, or {@code null} if the buffer is empty.
   *
   * <p>Consumer-only operation.
   *
   * @return the dequeued element, or {@code null} if the buffer is empty
   */
  @SuppressWarnings("unchecked")
  public T poll() {
    long h = head.get();
    long t = tail.get();
    if (h >= t) {
      return null;
    }
    int idx = (int) (h & mask);
    T element = (T) elements[idx];
    elements[idx] = null; // help GC
    head.lazySet(h + 1);
    return element;
  }

  /**
   * Drain up to {@code max} elements into the supplied destination array, starting at index 0.
   * Consumer-only operation.
   *
   * @param dest destination array; must not be {@code null} and must have length {@code >= max}
   * @param max maximum number of elements to drain
   * @return number of elements actually drained ({@code 0..max})
   * @throws IllegalArgumentException if {@code max < 0} or {@code dest.length < max}
   */
  @SuppressWarnings("unchecked")
  public int drainTo(T[] dest, int max) {
    if (max < 0) {
      throw new IllegalArgumentException("max must be >= 0");
    }
    if (dest.length < max) {
      throw new IllegalArgumentException("dest is too small");
    }
    int drained = 0;
    long h = head.get();
    long t = tail.get();
    while (drained < max && h < t) {
      int idx = (int) (h & mask);
      dest[drained++] = (T) elements[idx];
      elements[idx] = null;
      h++;
    }
    head.lazySet(h);
    return drained;
  }

  /**
   * Reset the buffer to empty. Not safe to call concurrently with {@link #offer} or {@link #poll}.
   */
  public void clear() {
    long h = head.get();
    long t = tail.get();
    while (h < t) {
      elements[(int) (h & mask)] = null;
      h++;
    }
    head.set(t);
  }
}