*
* Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
*/
-#include <linux/ftrace_event.h>
+#include <linux/trace_events.h>
#include <linux/ring_buffer.h>
#include <linux/trace_clock.h>
#include <linux/trace_seq.h>
*
*/
-/*
- * A fast way to enable or disable all ring buffers is to
- * call tracing_on or tracing_off. Turning off the ring buffers
- * prevents all ring buffers from being recorded to.
- * Turning this switch on, makes it OK to write to the
- * ring buffer, if the ring buffer is enabled itself.
- *
- * There's three layers that must be on in order to write
- * to the ring buffer.
- *
- * 1) This global flag must be set.
- * 2) The ring buffer must be enabled for recording.
- * 3) The per cpu buffer must be enabled for recording.
- *
- * In case of an anomaly, this global flag has a bit set that
- * will permantly disable all ring buffers.
- */
-
-/*
- * Global flag to disable all recording to ring buffers
- * This has two bits: ON, DISABLED
- *
- * ON DISABLED
- * ---- ----------
- * 0 0 : ring buffers are off
- * 1 0 : ring buffers are on
- * X 1 : ring buffers are permanently disabled
- */
-
-enum {
- RB_BUFFERS_ON_BIT = 0,
- RB_BUFFERS_DISABLED_BIT = 1,
-};
-
-enum {
- RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT,
- RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT,
-};
-
-static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
-
/* Used for individual buffers (after the counter) */
#define RB_BUFFER_OFF (1 << 20)
#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
-/**
- * tracing_off_permanent - permanently disable ring buffers
- *
- * This function, once called, will disable all ring buffers
- * permanently.
- */
-void tracing_off_permanent(void)
-{
- set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
-}
-
#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
#define RB_ALIGNMENT 4U
#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
bool wakeup_full;
};
+/*
+ * Structure to hold event state and handle nested events.
+ */
+struct rb_event_info {
+ u64 ts;
+ u64 delta;
+ unsigned long length;
+ struct buffer_page *tail_page;
+ int add_timestamp;
+};
+
+/*
+ * Used for which event context the event is in.
+ * NMI = 0
+ * IRQ = 1
+ * SOFTIRQ = 2
+ * NORMAL = 3
+ *
+ * See trace_recursive_lock() comment below for more details.
+ */
+enum {
+ RB_CTX_NMI,
+ RB_CTX_IRQ,
+ RB_CTX_SOFTIRQ,
+ RB_CTX_NORMAL,
+ RB_CTX_MAX
+};
+
/*
* head_page == tail_page && head == tail then buffer is empty.
*/
arch_spinlock_t lock;
struct lock_class_key lock_key;
unsigned int nr_pages;
+ unsigned int current_context;
struct list_head *pages;
struct buffer_page *head_page; /* read from head */
struct buffer_page *tail_page; /* write to tail */
* writer is ever on it, the previous pointer never points
* back to the reader page.
*/
-static int rb_is_reader_page(struct buffer_page *page)
+static bool rb_is_reader_page(struct buffer_page *page)
{
struct list_head *list = page->list.prev;
return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
}
-static inline int
-rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
- struct ring_buffer_event *event)
-{
- unsigned long addr = (unsigned long)event;
- unsigned long index;
-
- index = rb_event_index(event);
- addr &= PAGE_MASK;
-
- return cpu_buffer->commit_page->page == (void *)addr &&
- rb_commit_index(cpu_buffer) == index;
-}
-
-static void
-rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
-{
- unsigned long max_count;
-
- /*
- * We only race with interrupts and NMIs on this CPU.
- * If we own the commit event, then we can commit
- * all others that interrupted us, since the interruptions
- * are in stack format (they finish before they come
- * back to us). This allows us to do a simple loop to
- * assign the commit to the tail.
- */
- again:
- max_count = cpu_buffer->nr_pages * 100;
-
- while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
- if (RB_WARN_ON(cpu_buffer, !(--max_count)))
- return;
- if (RB_WARN_ON(cpu_buffer,
- rb_is_reader_page(cpu_buffer->tail_page)))
- return;
- local_set(&cpu_buffer->commit_page->page->commit,
- rb_page_write(cpu_buffer->commit_page));
- rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
- cpu_buffer->write_stamp =
- cpu_buffer->commit_page->page->time_stamp;
- /* add barrier to keep gcc from optimizing too much */
- barrier();
- }
- while (rb_commit_index(cpu_buffer) !=
- rb_page_write(cpu_buffer->commit_page)) {
-
- local_set(&cpu_buffer->commit_page->page->commit,
- rb_page_write(cpu_buffer->commit_page));
- RB_WARN_ON(cpu_buffer,
- local_read(&cpu_buffer->commit_page->page->commit) &
- ~RB_WRITE_MASK);
- barrier();
- }
-
- /* again, keep gcc from optimizing */
- barrier();
-
- /*
- * If an interrupt came in just after the first while loop
- * and pushed the tail page forward, we will be left with
- * a dangling commit that will never go forward.
- */
- if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
- goto again;
-}
-
-static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
-{
- cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
- cpu_buffer->reader_page->read = 0;
-}
-
static void rb_inc_iter(struct ring_buffer_iter *iter)
{
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
iter->head = 0;
}
-/* Slow path, do not inline */
-static noinline struct ring_buffer_event *
-rb_add_time_stamp(struct ring_buffer_event *event, u64 delta)
-{
- event->type_len = RINGBUF_TYPE_TIME_EXTEND;
-
- /* Not the first event on the page? */
- if (rb_event_index(event)) {
- event->time_delta = delta & TS_MASK;
- event->array[0] = delta >> TS_SHIFT;
- } else {
- /* nope, just zero it */
- event->time_delta = 0;
- event->array[0] = 0;
- }
-
- return skip_time_extend(event);
-}
-
-/**
- * rb_update_event - update event type and data
- * @event: the event to update
- * @type: the type of event
- * @length: the size of the event field in the ring buffer
- *
- * Update the type and data fields of the event. The length
- * is the actual size that is written to the ring buffer,
- * and with this, we can determine what to place into the
- * data field.
- */
-static void
-rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
- struct ring_buffer_event *event, unsigned length,
- int add_timestamp, u64 delta)
-{
- /* Only a commit updates the timestamp */
- if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
- delta = 0;
-
- /*
- * If we need to add a timestamp, then we
- * add it to the start of the resevered space.
- */
- if (unlikely(add_timestamp)) {
- event = rb_add_time_stamp(event, delta);
- length -= RB_LEN_TIME_EXTEND;
- delta = 0;
- }
-
- event->time_delta = delta;
- length -= RB_EVNT_HDR_SIZE;
- if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
- event->type_len = 0;
- event->array[0] = length;
- } else
- event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
-}
-
/*
* rb_handle_head_page - writer hit the head page
*
return 0;
}
-static unsigned rb_calculate_event_length(unsigned length)
-{
- struct ring_buffer_event event; /* Used only for sizeof array */
-
- /* zero length can cause confusions */
- if (!length)
- length = 1;
-
- if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
- length += sizeof(event.array[0]);
-
- length += RB_EVNT_HDR_SIZE;
- length = ALIGN(length, RB_ARCH_ALIGNMENT);
-
- return length;
-}
-
static inline void
rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
- struct buffer_page *tail_page,
- unsigned long tail, unsigned long length)
+ unsigned long tail, struct rb_event_info *info)
{
+ struct buffer_page *tail_page = info->tail_page;
struct ring_buffer_event *event;
+ unsigned long length = info->length;
/*
* Only the event that crossed the page boundary
*/
static noinline struct ring_buffer_event *
rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
- unsigned long length, unsigned long tail,
- struct buffer_page *tail_page, u64 ts)
+ unsigned long tail, struct rb_event_info *info)
{
+ struct buffer_page *tail_page = info->tail_page;
struct buffer_page *commit_page = cpu_buffer->commit_page;
struct ring_buffer *buffer = cpu_buffer->buffer;
struct buffer_page *next_page;
int ret;
+ u64 ts;
next_page = tail_page;
out_again:
- rb_reset_tail(cpu_buffer, tail_page, tail, length);
+ rb_reset_tail(cpu_buffer, tail, info);
/* fail and let the caller try again */
return ERR_PTR(-EAGAIN);
out_reset:
/* reset write */
- rb_reset_tail(cpu_buffer, tail_page, tail, length);
+ rb_reset_tail(cpu_buffer, tail, info);
return NULL;
}
-static struct ring_buffer_event *
-__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
- unsigned long length, u64 ts,
- u64 delta, int add_timestamp)
+/* Slow path, do not inline */
+static noinline struct ring_buffer_event *
+rb_add_time_stamp(struct ring_buffer_event *event, u64 delta)
{
- struct buffer_page *tail_page;
- struct ring_buffer_event *event;
- unsigned long tail, write;
+ event->type_len = RINGBUF_TYPE_TIME_EXTEND;
- /*
- * If the time delta since the last event is too big to
- * hold in the time field of the event, then we append a
- * TIME EXTEND event ahead of the data event.
- */
- if (unlikely(add_timestamp))
- length += RB_LEN_TIME_EXTEND;
+ /* Not the first event on the page? */
+ if (rb_event_index(event)) {
+ event->time_delta = delta & TS_MASK;
+ event->array[0] = delta >> TS_SHIFT;
+ } else {
+ /* nope, just zero it */
+ event->time_delta = 0;
+ event->array[0] = 0;
+ }
+
+ return skip_time_extend(event);
+}
- tail_page = cpu_buffer->tail_page;
- write = local_add_return(length, &tail_page->write);
+static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
+ struct ring_buffer_event *event);
- /* set write to only the index of the write */
- write &= RB_WRITE_MASK;
- tail = write - length;
+/**
+ * rb_update_event - update event type and data
+ * @event: the event to update
+ * @type: the type of event
+ * @length: the size of the event field in the ring buffer
+ *
+ * Update the type and data fields of the event. The length
+ * is the actual size that is written to the ring buffer,
+ * and with this, we can determine what to place into the
+ * data field.
+ */
+static void
+rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
+ struct ring_buffer_event *event,
+ struct rb_event_info *info)
+{
+ unsigned length = info->length;
+ u64 delta = info->delta;
+
+ /* Only a commit updates the timestamp */
+ if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
+ delta = 0;
/*
- * If this is the first commit on the page, then it has the same
- * timestamp as the page itself.
+ * If we need to add a timestamp, then we
+ * add it to the start of the resevered space.
*/
- if (!tail)
+ if (unlikely(info->add_timestamp)) {
+ event = rb_add_time_stamp(event, delta);
+ length -= RB_LEN_TIME_EXTEND;
delta = 0;
+ }
- /* See if we shot pass the end of this buffer page */
- if (unlikely(write > BUF_PAGE_SIZE))
- return rb_move_tail(cpu_buffer, length, tail,
- tail_page, ts);
+ event->time_delta = delta;
+ length -= RB_EVNT_HDR_SIZE;
+ if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
+ event->type_len = 0;
+ event->array[0] = length;
+ } else
+ event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
+}
- /* We reserved something on the buffer */
+static unsigned rb_calculate_event_length(unsigned length)
+{
+ struct ring_buffer_event event; /* Used only for sizeof array */
- event = __rb_page_index(tail_page, tail);
- kmemcheck_annotate_bitfield(event, bitfield);
- rb_update_event(cpu_buffer, event, length, add_timestamp, delta);
+ /* zero length can cause confusions */
+ if (!length)
+ length++;
- local_inc(&tail_page->entries);
+ if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
+ length += sizeof(event.array[0]);
+
+ length += RB_EVNT_HDR_SIZE;
+ length = ALIGN(length, RB_ARCH_ALIGNMENT);
/*
- * If this is the first commit on the page, then update
- * its timestamp.
+ * In case the time delta is larger than the 27 bits for it
+ * in the header, we need to add a timestamp. If another
+ * event comes in when trying to discard this one to increase
+ * the length, then the timestamp will be added in the allocated
+ * space of this event. If length is bigger than the size needed
+ * for the TIME_EXTEND, then padding has to be used. The events
+ * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
+ * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
+ * As length is a multiple of 4, we only need to worry if it
+ * is 12 (RB_LEN_TIME_EXTEND + 4).
*/
- if (!tail)
- tail_page->page->time_stamp = ts;
+ if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
+ length += RB_ALIGNMENT;
- /* account for these added bytes */
- local_add(length, &cpu_buffer->entries_bytes);
+ return length;
+}
- return event;
+#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
+static inline bool sched_clock_stable(void)
+{
+ return true;
}
+#endif
static inline int
rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
local_inc(&cpu_buffer->commits);
}
+static void
+rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ unsigned long max_count;
+
+ /*
+ * We only race with interrupts and NMIs on this CPU.
+ * If we own the commit event, then we can commit
+ * all others that interrupted us, since the interruptions
+ * are in stack format (they finish before they come
+ * back to us). This allows us to do a simple loop to
+ * assign the commit to the tail.
+ */
+ again:
+ max_count = cpu_buffer->nr_pages * 100;
+
+ while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
+ if (RB_WARN_ON(cpu_buffer, !(--max_count)))
+ return;
+ if (RB_WARN_ON(cpu_buffer,
+ rb_is_reader_page(cpu_buffer->tail_page)))
+ return;
+ local_set(&cpu_buffer->commit_page->page->commit,
+ rb_page_write(cpu_buffer->commit_page));
+ rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
+ cpu_buffer->write_stamp =
+ cpu_buffer->commit_page->page->time_stamp;
+ /* add barrier to keep gcc from optimizing too much */
+ barrier();
+ }
+ while (rb_commit_index(cpu_buffer) !=
+ rb_page_write(cpu_buffer->commit_page)) {
+
+ local_set(&cpu_buffer->commit_page->page->commit,
+ rb_page_write(cpu_buffer->commit_page));
+ RB_WARN_ON(cpu_buffer,
+ local_read(&cpu_buffer->commit_page->page->commit) &
+ ~RB_WRITE_MASK);
+ barrier();
+ }
+
+ /* again, keep gcc from optimizing */
+ barrier();
+
+ /*
+ * If an interrupt came in just after the first while loop
+ * and pushed the tail page forward, we will be left with
+ * a dangling commit that will never go forward.
+ */
+ if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
+ goto again;
+}
+
static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
{
unsigned long commits;
}
}
-static struct ring_buffer_event *
-rb_reserve_next_event(struct ring_buffer *buffer,
- struct ring_buffer_per_cpu *cpu_buffer,
- unsigned long length)
+static inline void rb_event_discard(struct ring_buffer_event *event)
{
- struct ring_buffer_event *event;
- u64 ts, delta;
- int nr_loops = 0;
- int add_timestamp;
- u64 diff;
+ if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
+ event = skip_time_extend(event);
- rb_start_commit(cpu_buffer);
+ /* array[0] holds the actual length for the discarded event */
+ event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
+ event->type_len = RINGBUF_TYPE_PADDING;
+ /* time delta must be non zero */
+ if (!event->time_delta)
+ event->time_delta = 1;
+}
-#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
- /*
- * Due to the ability to swap a cpu buffer from a buffer
- * it is possible it was swapped before we committed.
- * (committing stops a swap). We check for it here and
- * if it happened, we have to fail the write.
- */
- barrier();
- if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
- local_dec(&cpu_buffer->committing);
- local_dec(&cpu_buffer->commits);
- return NULL;
- }
-#endif
+static inline bool
+rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
+ struct ring_buffer_event *event)
+{
+ unsigned long addr = (unsigned long)event;
+ unsigned long index;
- length = rb_calculate_event_length(length);
- again:
- add_timestamp = 0;
- delta = 0;
+ index = rb_event_index(event);
+ addr &= PAGE_MASK;
+
+ return cpu_buffer->commit_page->page == (void *)addr &&
+ rb_commit_index(cpu_buffer) == index;
+}
+
+static void
+rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
+ struct ring_buffer_event *event)
+{
+ u64 delta;
/*
- * We allow for interrupts to reenter here and do a trace.
- * If one does, it will cause this original code to loop
- * back here. Even with heavy interrupts happening, this
- * should only happen a few times in a row. If this happens
- * 1000 times in a row, there must be either an interrupt
- * storm or we have something buggy.
- * Bail!
+ * The event first in the commit queue updates the
+ * time stamp.
*/
- if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
- goto out_fail;
+ if (rb_event_is_commit(cpu_buffer, event)) {
+ /*
+ * A commit event that is first on a page
+ * updates the write timestamp with the page stamp
+ */
+ if (!rb_event_index(event))
+ cpu_buffer->write_stamp =
+ cpu_buffer->commit_page->page->time_stamp;
+ else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
+ delta = event->array[0];
+ delta <<= TS_SHIFT;
+ delta += event->time_delta;
+ cpu_buffer->write_stamp += delta;
+ } else
+ cpu_buffer->write_stamp += event->time_delta;
+ }
+}
- ts = rb_time_stamp(cpu_buffer->buffer);
- diff = ts - cpu_buffer->write_stamp;
+static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
+ struct ring_buffer_event *event)
+{
+ local_inc(&cpu_buffer->entries);
+ rb_update_write_stamp(cpu_buffer, event);
+ rb_end_commit(cpu_buffer);
+}
- /* make sure this diff is calculated here */
- barrier();
+static __always_inline void
+rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
+{
+ bool pagebusy;
- /* Did the write stamp get updated already? */
- if (likely(ts >= cpu_buffer->write_stamp)) {
- delta = diff;
- if (unlikely(test_time_stamp(delta))) {
- int local_clock_stable = 1;
-#ifdef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
- local_clock_stable = sched_clock_stable();
-#endif
- WARN_ONCE(delta > (1ULL << 59),
- KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
- (unsigned long long)delta,
- (unsigned long long)ts,
- (unsigned long long)cpu_buffer->write_stamp,
- local_clock_stable ? "" :
- "If you just came from a suspend/resume,\n"
- "please switch to the trace global clock:\n"
- " echo global > /sys/kernel/debug/tracing/trace_clock\n");
- add_timestamp = 1;
- }
+ if (buffer->irq_work.waiters_pending) {
+ buffer->irq_work.waiters_pending = false;
+ /* irq_work_queue() supplies it's own memory barriers */
+ irq_work_queue(&buffer->irq_work.work);
}
- event = __rb_reserve_next(cpu_buffer, length, ts,
- delta, add_timestamp);
- if (unlikely(PTR_ERR(event) == -EAGAIN))
- goto again;
-
- if (!event)
- goto out_fail;
+ if (cpu_buffer->irq_work.waiters_pending) {
+ cpu_buffer->irq_work.waiters_pending = false;
+ /* irq_work_queue() supplies it's own memory barriers */
+ irq_work_queue(&cpu_buffer->irq_work.work);
+ }
- return event;
+ pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
- out_fail:
- rb_end_commit(cpu_buffer);
- return NULL;
+ if (!pagebusy && cpu_buffer->irq_work.full_waiters_pending) {
+ cpu_buffer->irq_work.wakeup_full = true;
+ cpu_buffer->irq_work.full_waiters_pending = false;
+ /* irq_work_queue() supplies it's own memory barriers */
+ irq_work_queue(&cpu_buffer->irq_work.work);
+ }
}
-#ifdef CONFIG_TRACING
-
/*
* The lock and unlock are done within a preempt disable section.
* The current_context per_cpu variable can only be modified
* just so happens that it is the same bit corresponding to
* the current context.
*/
-static DEFINE_PER_CPU(unsigned int, current_context);
-static __always_inline int trace_recursive_lock(void)
+static __always_inline int
+trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
{
- unsigned int val = __this_cpu_read(current_context);
+ unsigned int val = cpu_buffer->current_context;
int bit;
if (in_interrupt()) {
if (in_nmi())
- bit = 0;
+ bit = RB_CTX_NMI;
else if (in_irq())
- bit = 1;
+ bit = RB_CTX_IRQ;
else
- bit = 2;
+ bit = RB_CTX_SOFTIRQ;
} else
- bit = 3;
+ bit = RB_CTX_NORMAL;
if (unlikely(val & (1 << bit)))
return 1;
val |= (1 << bit);
- __this_cpu_write(current_context, val);
+ cpu_buffer->current_context = val;
return 0;
}
-static __always_inline void trace_recursive_unlock(void)
+static __always_inline void
+trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
{
- __this_cpu_and(current_context, __this_cpu_read(current_context) - 1);
+ cpu_buffer->current_context &= cpu_buffer->current_context - 1;
}
-#else
-
-#define trace_recursive_lock() (0)
-#define trace_recursive_unlock() do { } while (0)
-
-#endif
-
/**
- * ring_buffer_lock_reserve - reserve a part of the buffer
- * @buffer: the ring buffer to reserve from
- * @length: the length of the data to reserve (excluding event header)
- *
- * Returns a reseverd event on the ring buffer to copy directly to.
- * The user of this interface will need to get the body to write into
- * and can use the ring_buffer_event_data() interface.
+ * ring_buffer_unlock_commit - commit a reserved
+ * @buffer: The buffer to commit to
+ * @event: The event pointer to commit.
*
- * The length is the length of the data needed, not the event length
- * which also includes the event header.
+ * This commits the data to the ring buffer, and releases any locks held.
*
- * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
- * If NULL is returned, then nothing has been allocated or locked.
+ * Must be paired with ring_buffer_lock_reserve.
*/
-struct ring_buffer_event *
-ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
+int ring_buffer_unlock_commit(struct ring_buffer *buffer,
+ struct ring_buffer_event *event)
{
struct ring_buffer_per_cpu *cpu_buffer;
- struct ring_buffer_event *event;
- int cpu;
+ int cpu = raw_smp_processor_id();
- if (ring_buffer_flags != RB_BUFFERS_ON)
- return NULL;
+ cpu_buffer = buffer->buffers[cpu];
- /* If we are tracing schedule, we don't want to recurse */
- preempt_disable_notrace();
+ rb_commit(cpu_buffer, event);
- if (atomic_read(&buffer->record_disabled))
- goto out_nocheck;
+ rb_wakeups(buffer, cpu_buffer);
- if (trace_recursive_lock())
- goto out_nocheck;
+ trace_recursive_unlock(cpu_buffer);
- cpu = raw_smp_processor_id();
+ preempt_enable_notrace();
- if (!cpumask_test_cpu(cpu, buffer->cpumask))
- goto out;
+ return 0;
+}
+EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
- cpu_buffer = buffer->buffers[cpu];
+static noinline void
+rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
+ struct rb_event_info *info)
+{
+ WARN_ONCE(info->delta > (1ULL << 59),
+ KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
+ (unsigned long long)info->delta,
+ (unsigned long long)info->ts,
+ (unsigned long long)cpu_buffer->write_stamp,
+ sched_clock_stable() ? "" :
+ "If you just came from a suspend/resume,\n"
+ "please switch to the trace global clock:\n"
+ " echo global > /sys/kernel/debug/tracing/trace_clock\n");
+ info->add_timestamp = 1;
+}
- if (atomic_read(&cpu_buffer->record_disabled))
- goto out;
+static struct ring_buffer_event *
+__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
+ struct rb_event_info *info)
+{
+ struct ring_buffer_event *event;
+ struct buffer_page *tail_page;
+ unsigned long tail, write;
- if (length > BUF_MAX_DATA_SIZE)
- goto out;
+ /*
+ * If the time delta since the last event is too big to
+ * hold in the time field of the event, then we append a
+ * TIME EXTEND event ahead of the data event.
+ */
+ if (unlikely(info->add_timestamp))
+ info->length += RB_LEN_TIME_EXTEND;
- event = rb_reserve_next_event(buffer, cpu_buffer, length);
- if (!event)
- goto out;
+ tail_page = info->tail_page = cpu_buffer->tail_page;
+ write = local_add_return(info->length, &tail_page->write);
- return event;
+ /* set write to only the index of the write */
+ write &= RB_WRITE_MASK;
+ tail = write - info->length;
- out:
- trace_recursive_unlock();
+ /*
+ * If this is the first commit on the page, then it has the same
+ * timestamp as the page itself.
+ */
+ if (!tail)
+ info->delta = 0;
- out_nocheck:
- preempt_enable_notrace();
- return NULL;
-}
-EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
+ /* See if we shot pass the end of this buffer page */
+ if (unlikely(write > BUF_PAGE_SIZE))
+ return rb_move_tail(cpu_buffer, tail, info);
-static void
-rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
- struct ring_buffer_event *event)
-{
- u64 delta;
+ /* We reserved something on the buffer */
+
+ event = __rb_page_index(tail_page, tail);
+ kmemcheck_annotate_bitfield(event, bitfield);
+ rb_update_event(cpu_buffer, event, info);
+
+ local_inc(&tail_page->entries);
/*
- * The event first in the commit queue updates the
- * time stamp.
+ * If this is the first commit on the page, then update
+ * its timestamp.
*/
- if (rb_event_is_commit(cpu_buffer, event)) {
- /*
- * A commit event that is first on a page
- * updates the write timestamp with the page stamp
- */
- if (!rb_event_index(event))
- cpu_buffer->write_stamp =
- cpu_buffer->commit_page->page->time_stamp;
- else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
- delta = event->array[0];
- delta <<= TS_SHIFT;
- delta += event->time_delta;
- cpu_buffer->write_stamp += delta;
- } else
- cpu_buffer->write_stamp += event->time_delta;
- }
-}
+ if (!tail)
+ tail_page->page->time_stamp = info->ts;
-static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
- struct ring_buffer_event *event)
-{
- local_inc(&cpu_buffer->entries);
- rb_update_write_stamp(cpu_buffer, event);
- rb_end_commit(cpu_buffer);
+ /* account for these added bytes */
+ local_add(info->length, &cpu_buffer->entries_bytes);
+
+ return event;
}
-static __always_inline void
-rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
+static struct ring_buffer_event *
+rb_reserve_next_event(struct ring_buffer *buffer,
+ struct ring_buffer_per_cpu *cpu_buffer,
+ unsigned long length)
{
- bool pagebusy;
+ struct ring_buffer_event *event;
+ struct rb_event_info info;
+ int nr_loops = 0;
+ u64 diff;
- if (buffer->irq_work.waiters_pending) {
- buffer->irq_work.waiters_pending = false;
- /* irq_work_queue() supplies it's own memory barriers */
- irq_work_queue(&buffer->irq_work.work);
+ rb_start_commit(cpu_buffer);
+
+#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
+ /*
+ * Due to the ability to swap a cpu buffer from a buffer
+ * it is possible it was swapped before we committed.
+ * (committing stops a swap). We check for it here and
+ * if it happened, we have to fail the write.
+ */
+ barrier();
+ if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
+ local_dec(&cpu_buffer->committing);
+ local_dec(&cpu_buffer->commits);
+ return NULL;
}
+#endif
- if (cpu_buffer->irq_work.waiters_pending) {
- cpu_buffer->irq_work.waiters_pending = false;
- /* irq_work_queue() supplies it's own memory barriers */
- irq_work_queue(&cpu_buffer->irq_work.work);
+ info.length = rb_calculate_event_length(length);
+ again:
+ info.add_timestamp = 0;
+ info.delta = 0;
+
+ /*
+ * We allow for interrupts to reenter here and do a trace.
+ * If one does, it will cause this original code to loop
+ * back here. Even with heavy interrupts happening, this
+ * should only happen a few times in a row. If this happens
+ * 1000 times in a row, there must be either an interrupt
+ * storm or we have something buggy.
+ * Bail!
+ */
+ if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
+ goto out_fail;
+
+ info.ts = rb_time_stamp(cpu_buffer->buffer);
+ diff = info.ts - cpu_buffer->write_stamp;
+
+ /* make sure this diff is calculated here */
+ barrier();
+
+ /* Did the write stamp get updated already? */
+ if (likely(info.ts >= cpu_buffer->write_stamp)) {
+ info.delta = diff;
+ if (unlikely(test_time_stamp(info.delta)))
+ rb_handle_timestamp(cpu_buffer, &info);
}
- pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
+ event = __rb_reserve_next(cpu_buffer, &info);
- if (!pagebusy && cpu_buffer->irq_work.full_waiters_pending) {
- cpu_buffer->irq_work.wakeup_full = true;
- cpu_buffer->irq_work.full_waiters_pending = false;
- /* irq_work_queue() supplies it's own memory barriers */
- irq_work_queue(&cpu_buffer->irq_work.work);
+ if (unlikely(PTR_ERR(event) == -EAGAIN)) {
+ if (info.add_timestamp)
+ info.length -= RB_LEN_TIME_EXTEND;
+ goto again;
}
+
+ if (!event)
+ goto out_fail;
+
+ return event;
+
+ out_fail:
+ rb_end_commit(cpu_buffer);
+ return NULL;
}
/**
- * ring_buffer_unlock_commit - commit a reserved
- * @buffer: The buffer to commit to
- * @event: The event pointer to commit.
+ * ring_buffer_lock_reserve - reserve a part of the buffer
+ * @buffer: the ring buffer to reserve from
+ * @length: the length of the data to reserve (excluding event header)
*
- * This commits the data to the ring buffer, and releases any locks held.
+ * Returns a reseverd event on the ring buffer to copy directly to.
+ * The user of this interface will need to get the body to write into
+ * and can use the ring_buffer_event_data() interface.
*
- * Must be paired with ring_buffer_lock_reserve.
+ * The length is the length of the data needed, not the event length
+ * which also includes the event header.
+ *
+ * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
+ * If NULL is returned, then nothing has been allocated or locked.
*/
-int ring_buffer_unlock_commit(struct ring_buffer *buffer,
- struct ring_buffer_event *event)
+struct ring_buffer_event *
+ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
{
struct ring_buffer_per_cpu *cpu_buffer;
- int cpu = raw_smp_processor_id();
+ struct ring_buffer_event *event;
+ int cpu;
- cpu_buffer = buffer->buffers[cpu];
+ /* If we are tracing schedule, we don't want to recurse */
+ preempt_disable_notrace();
- rb_commit(cpu_buffer, event);
+ if (unlikely(atomic_read(&buffer->record_disabled)))
+ goto out;
- rb_wakeups(buffer, cpu_buffer);
+ cpu = raw_smp_processor_id();
- trace_recursive_unlock();
+ if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
+ goto out;
- preempt_enable_notrace();
+ cpu_buffer = buffer->buffers[cpu];
- return 0;
-}
-EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
+ if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
+ goto out;
-static inline void rb_event_discard(struct ring_buffer_event *event)
-{
- if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
- event = skip_time_extend(event);
+ if (unlikely(length > BUF_MAX_DATA_SIZE))
+ goto out;
- /* array[0] holds the actual length for the discarded event */
- event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
- event->type_len = RINGBUF_TYPE_PADDING;
- /* time delta must be non zero */
- if (!event->time_delta)
- event->time_delta = 1;
+ if (unlikely(trace_recursive_lock(cpu_buffer)))
+ goto out;
+
+ event = rb_reserve_next_event(buffer, cpu_buffer, length);
+ if (!event)
+ goto out_unlock;
+
+ return event;
+
+ out_unlock:
+ trace_recursive_unlock(cpu_buffer);
+ out:
+ preempt_enable_notrace();
+ return NULL;
}
+EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
/*
* Decrement the entries to the page that an event is on.
out:
rb_end_commit(cpu_buffer);
- trace_recursive_unlock();
+ trace_recursive_unlock(cpu_buffer);
preempt_enable_notrace();
int ret = -EBUSY;
int cpu;
- if (ring_buffer_flags != RB_BUFFERS_ON)
- return -EBUSY;
-
preempt_disable_notrace();
if (atomic_read(&buffer->record_disabled))
if (length > BUF_MAX_DATA_SIZE)
goto out;
+ if (unlikely(trace_recursive_lock(cpu_buffer)))
+ goto out;
+
event = rb_reserve_next_event(buffer, cpu_buffer, length);
if (!event)
- goto out;
+ goto out_unlock;
body = rb_event_data(event);
rb_wakeups(buffer, cpu_buffer);
ret = 0;
+
+ out_unlock:
+ trace_recursive_unlock(cpu_buffer);
+
out:
preempt_enable_notrace();
}
EXPORT_SYMBOL_GPL(ring_buffer_write);
-static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
+static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *reader = cpu_buffer->reader_page;
struct buffer_page *head = rb_set_head_page(cpu_buffer);
/* In case of error, head will be NULL */
if (unlikely(!head))
- return 1;
+ return true;
return reader->read == rb_page_commit(reader) &&
(commit == reader ||
/* Finally update the reader page to the new head */
cpu_buffer->reader_page = reader;
- rb_reset_reader_page(cpu_buffer);
+ cpu_buffer->reader_page->read = 0;
if (overwrite != cpu_buffer->last_overrun) {
cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
goto again;
out:
+ /* Update the read_stamp on the first event */
+ if (reader && reader->read == 0)
+ cpu_buffer->read_stamp = reader->page->time_stamp;
+
arch_spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags);
}
EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
-static inline int rb_ok_to_lock(void)
+static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
{
+ if (likely(!in_nmi())) {
+ raw_spin_lock(&cpu_buffer->reader_lock);
+ return true;
+ }
+
/*
* If an NMI die dumps out the content of the ring buffer
- * do not grab locks. We also permanently disable the ring
- * buffer too. A one time deal is all you get from reading
- * the ring buffer from an NMI.
+ * trylock must be used to prevent a deadlock if the NMI
+ * preempted a task that holds the ring buffer locks. If
+ * we get the lock then all is fine, if not, then continue
+ * to do the read, but this can corrupt the ring buffer,
+ * so it must be permanently disabled from future writes.
+ * Reading from NMI is a oneshot deal.
*/
- if (likely(!in_nmi()))
- return 1;
+ if (raw_spin_trylock(&cpu_buffer->reader_lock))
+ return true;
- tracing_off_permanent();
- return 0;
+ /* Continue without locking, but disable the ring buffer */
+ atomic_inc(&cpu_buffer->record_disabled);
+ return false;
+}
+
+static inline void
+rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
+{
+ if (likely(locked))
+ raw_spin_unlock(&cpu_buffer->reader_lock);
+ return;
}
/**
struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
struct ring_buffer_event *event;
unsigned long flags;
- int dolock;
+ bool dolock;
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return NULL;
- dolock = rb_ok_to_lock();
again:
local_irq_save(flags);
- if (dolock)
- raw_spin_lock(&cpu_buffer->reader_lock);
+ dolock = rb_reader_lock(cpu_buffer);
event = rb_buffer_peek(cpu_buffer, ts, lost_events);
if (event && event->type_len == RINGBUF_TYPE_PADDING)
rb_advance_reader(cpu_buffer);
- if (dolock)
- raw_spin_unlock(&cpu_buffer->reader_lock);
+ rb_reader_unlock(cpu_buffer, dolock);
local_irq_restore(flags);
if (event && event->type_len == RINGBUF_TYPE_PADDING)
struct ring_buffer_per_cpu *cpu_buffer;
struct ring_buffer_event *event = NULL;
unsigned long flags;
- int dolock;
-
- dolock = rb_ok_to_lock();
+ bool dolock;
again:
/* might be called in atomic */
cpu_buffer = buffer->buffers[cpu];
local_irq_save(flags);
- if (dolock)
- raw_spin_lock(&cpu_buffer->reader_lock);
+ dolock = rb_reader_lock(cpu_buffer);
event = rb_buffer_peek(cpu_buffer, ts, lost_events);
if (event) {
rb_advance_reader(cpu_buffer);
}
- if (dolock)
- raw_spin_unlock(&cpu_buffer->reader_lock);
+ rb_reader_unlock(cpu_buffer, dolock);
local_irq_restore(flags);
out:
* rind_buffer_empty - is the ring buffer empty?
* @buffer: The ring buffer to test
*/
-int ring_buffer_empty(struct ring_buffer *buffer)
+bool ring_buffer_empty(struct ring_buffer *buffer)
{
struct ring_buffer_per_cpu *cpu_buffer;
unsigned long flags;
- int dolock;
+ bool dolock;
int cpu;
int ret;
- dolock = rb_ok_to_lock();
-
/* yes this is racy, but if you don't like the race, lock the buffer */
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
local_irq_save(flags);
- if (dolock)
- raw_spin_lock(&cpu_buffer->reader_lock);
+ dolock = rb_reader_lock(cpu_buffer);
ret = rb_per_cpu_empty(cpu_buffer);
- if (dolock)
- raw_spin_unlock(&cpu_buffer->reader_lock);
+ rb_reader_unlock(cpu_buffer, dolock);
local_irq_restore(flags);
if (!ret)
- return 0;
+ return false;
}
- return 1;
+ return true;
}
EXPORT_SYMBOL_GPL(ring_buffer_empty);
* @buffer: The ring buffer
* @cpu: The CPU buffer to test
*/
-int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
+bool ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
{
struct ring_buffer_per_cpu *cpu_buffer;
unsigned long flags;
- int dolock;
+ bool dolock;
int ret;
if (!cpumask_test_cpu(cpu, buffer->cpumask))
- return 1;
-
- dolock = rb_ok_to_lock();
+ return true;
cpu_buffer = buffer->buffers[cpu];
local_irq_save(flags);
- if (dolock)
- raw_spin_lock(&cpu_buffer->reader_lock);
+ dolock = rb_reader_lock(cpu_buffer);
ret = rb_per_cpu_empty(cpu_buffer);
- if (dolock)
- raw_spin_unlock(&cpu_buffer->reader_lock);
+ rb_reader_unlock(cpu_buffer, dolock);
local_irq_restore(flags);
return ret;
ret = -EAGAIN;
- if (ring_buffer_flags != RB_BUFFERS_ON)
- goto out;
-
if (atomic_read(&buffer_a->record_disabled))
goto out;