-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Fluent Bit Scheduled Timer with coroutine design
Background
Coroutines
Currently, Fluent Bit use a coroutines for concurrency, see the description here: https://github.com/fluent/fluent-bit/blob/master/DEVELOPER_GUIDE.md#concurrency
Currently, the code internally only supports coroutines for output flush tasks- jobs for flushing data to a destination via an output plugin. Each coroutine flushes a single chunk (~2 MB of data) via a single output plugin.
Async vs Sync network code
Fluent Bit has two networking stacks in its core code, sync and async.
The async stack is actively maintained, given bug fixes, and is used by all output plugins except for the S3 output. It is also more performant because it uses coroutines. This means that when one job is waiting on network IO, others can run.
The sync networking stack is not as actively maintained, and S3 is the only output plugin that uses it. AWS Distro for Fluent Bit team has seen issues with the sync networking stack, including the high impact “CloudWatch output hang issue”. The sync stack is also wasteful as it blocks an entire thread for pending network IO.
The async stack requires the use of coroutines.
S3 output plugin unique use case
The S3 plugin generally does not upload data via the output coroutines. This is because customer desire large log files in S3, greater than the 2MB chunk size. Instead the coroutines simply buffer data on the filesystem to create large files.
Please see full explanation of S3 uniqueness here: https://docs.fluentbit.io/manual/pipeline/outputs/s3#differences-between-s3-and-other-fluent-bit-outputs
Therefore, S3 uses a “timer callback” which is invoked periodically to check for ready or timed out files and upload them to S3. The timer callback is invoked at a set frequency by the Fluent Bit engine. However, since timers do not have coroutines currently, the timer callback runs via the engine directly and does not run in a coroutine. This means it must use the sync networking stack.
Given the past issues with sync networking in outputs, and the AWS Distro team would like to migrate the S3 output to async networking.
Problem Statement and Goal
To migrate to the async networking stack for S3 output file uploads, the timer callback must run a coroutine.
Therefore, this document outlines a proposal to implement a scheduled timer that runs its callback in a coroutine.
Design
Creating output timer coroutines
[recommend] Option 1: re-use existing flb_sched_timer_cb_create
The existing flb_sched_timer_cb_create code can be re-used without modification: https://github.com/fluent/fluent-bit/blob/1.9/src/flb_scheduler.c#L435
This function tells the event loop to invoke a callback at a set frequency.
Currently, the function accepts a callback function pointer and a context pointer.
We will create a new function in flb_output.h that creates and invokes coroutines for timers.
void flb_output_coro_timer_cb(struct flb_config *config, void *data)
This function will be passed as the callback function pointer in flb_sched_timer_cb_create.
The data pointer passed to this callback must allow it to invoke a callback function in the output plugin and pass it some data. The flb_output_coro_timer_cb_create also needs to flb_output_instance to associate the coroutine with an output instance. Thus, we will create a new structure:
struct flb_output_coro_timer_data {
struct flb_output_instance ins; /* associate coro with this output instance */
void (*cb) (struct flb_config *config, void *data); /* call this output callback in the coro */
void *data; /* opaque data to pass to the above cb */
}
[Not recommended] Option 2: New flb_sched_timer_output_coro_create
An alternate option would be creating a new timer create function in flb_scheduler.c.
int flb_sched_timer_output_coro_create(struct flb_sched *sched, int type, int ms,
struct flb_output_instance ins
void (*cb)(struct flb_config *, void *),
void *data, struct flb_sched_timer **out_timer)
This function would need to call out to some function in flb_output.h that would create a coroutine and invoke a callback. This output function would be set as the timer event callback, and it would then call the passed in callback. That function would need to be similar to or the same as the option 1 function in flb_output.h. It would need a custom struct to hold the output instance, passed in callback and callback data.
This option is just as complicated as the above, simply makes things more complicated by adding additional code changes in flb_scheduler.c. Therefore, it is not recommended.
Yielding and resuming output timer coroutines
Once the coroutine is created, its execution is no different than other/existing coroutines. It can use the async network stack and yield and resume on network IO events the same as existing flush coroutines.
Tracking output timer coroutines
Currently, flush coroutines are tracked via a flush_list. This list exists either on the flb_output_instance in the zero worker case, or on the flb_thread_instance for worker mode.
There are two reasons why coroutines must be tracked:
- Clean Up: Coroutines can not clean up and free themselves. Instead they can put themselves on a destroy list and yield to the engine which can subsequently free them.
- Graceful Shutdown: on graceful shutdown, if a coroutine is still performing work, we want it to complete before the engine shuts down Fluent Bit. Tracking coroutines that are active makes this easy.
For flush coroutines, currently the following lists exist. The same design will be followed for output scheduled timer coroutines:
- No worker case: There is a
flush_listandflush_list_destroyon eachflb_output_instance. For output timer coroutines, we can call ittimer_coro_listandtimer_coro_list_destroy. - Worker case: Same as above, except the lists are stored on the
flb_thread_instance.
Currently, the structure stored on the list is a flb_output_flush, which can hold necessary metadata about the coroutine. We can create a similar structure:
struct flb_output_timer_coro {
struct flb_config *config; /* FLB context */
struct flb_output_instance *o_ins; /* output instance */
struct flb_coro *coro; /* coro addr */
struct mk_list _head; /* list */
};
For discussion: Unlike the flb_output_flush this structure lacks an ID. I was not able to discern the purpose of the ID in the current code for flush coroutines. Do we need an ID?
Cleaning up output timer coroutines
When a coroutine is complete, it needs to be added to the destroy list and then yield back to the engine.
For flush coroutines, the output is required to call the FLB_OUTPUT_RETURN macro which adds the coro to destroy list and yields.
For output timer coroutines, we could follow a similar model, and have a FLB_OUTPUT_TIMER_RETURN. The callback passed by the output plugin would be required to call this when complete. This macro will add the timer coroutine to the destroy list.
For discussion: Is there a simpler way to do this without a new macro?
Currently in the event loop (both engine and worker) there are clean up functions that run at the end of each iteration. Similar for output timer coroutines we would need to add a new clean up function that checks the timer_coro_list_destroy for coros that are ready to be destroyed.
Output timer coroutines and graceful shutdown
Tracking active coroutines is necessary for graceful shutdown. This ensures that pending uploads can complete when the engine receives a SIGTERM. Currently, active coroutines interact with graceful shutdown in the following places:
- Worker:
[flb_output_thread.c](https://github.com/fluent/fluent-bit/blob/master/src/flb_output_thread.c#L336): Each worker thread waits for its active flush coroutines to complete by checking theflush_listlength on shutdown. - Engine:
flb_engine.c: The engine callsflb_task_running_countto determine if any tasks are pending. This is essentially a proxy for active and pending flushes (which use flush coroutines). It also callsflb_task_running_printto show the user that the graceful shutdown is working on finishing pending tasks.
1. Worker change: checking output timer coroutines on shutdown
On shutdown, the worker must check the length of both the flush_list and the timer_coro_list.
2. Engine change: tracking and printing running tasks and output coroutines
The engine graceful shutdown should wait for scheduled timer coroutines to complete. In addition, the timer coroutines created by S3 count as in-progress work for an output, same as tasks. Therefore, we must make changes to this code to support the new coroutine type.
Therefore, in flb_engine.c we will create new functions that wrap the existing and new use cases:
flb_running_count: count of running tasks + output timer coroutinesflb_running_print: print running tasks + output timer coroutines
For the print function, the format of the output will be:
[output] {flb_output_name(out)} has %d pending custom job(s)
For example:
[output] s3.1 has 1 pending custom job(s)
Questions for comment:
- is
flb_engine.cthe most ideal place for this new code? - is “custom job” the best/right term for this for the user? An alternate would be to allow the output to expose a timer coroutine “name” which would used. So S3 could set this to be
uploadso that the message says:[output] s3.1 has 1 pending upload(s)
Other changes
Currently there is a function flb_output_coros_size. This function seems to only be used for one purpose, for the FLB_OUTPUT_NO_MULTIPLEX function to ensure that only one flush runs at a time: https://github.com/fluent/fluent-bit/blob/v2.1.3/src/flb_engine_dispatch.c#L193
Consequently, it would appear that we do not need to add timer coroutine count to this function.
For Discussion: Is this the right long term decision?