Created 2024/12/02 at 06:59PM
Last Modified 2025/01/09 at 09:14PM
Modern data processing workflows require scalable systems capable of handling high-throughput workloads. Traditionally, such systems rely on tools like Kafka, Celery, or Airflow for orchestration. However, with Go's concurrency model and PostgreSQL's transactional guarantees, we can build a lightweight yet distributed pipeline using only these two tools.
Code available at https://github.com/mpsingh97/simple-pipeline
In this post, I’ll walk you through the architecture and implementation of such a pipeline. We'll process video tasks across five stages: - Ingestion - Fetching the video and storing it at a centralised storage. - Transcoding - Converting video to desired formats. - Metadata Extraction - Extracting metadata like duration, resolution, codec etc from the video. - Assembly - Merging metadata and transcoded files. - Publishing - Publishing the final video to a CDN for users.
By combining these, you get a pipeline that's simple to deploy, easy to scale, and avoids unnecessary dependencies.
Core Features - Distributed task orchestration using PostgreSQL row-level locks. - Lightweight, asynchronous task processing with Go goroutines. - Fault tolerance through retry mechanisms.
Workflow The pipeline revolves around task orchestrators for each processing stage. These orchestrators handle: - Polling the database for pending tasks. - Executing stage-specific operations. - Marking tasks as succeeded or failed.
Each orchestrator works independently but respects dependencies, ensuring tasks proceed in the correct order. Orchestrators use Go's goroutines for concurrent processing and PostgreSQL for task status tracking.
Database Schema
CREATE TABLE tasks (
task_id character varying(40) NOT NULL,
ingest_status character varying(10),
transcode_status character varying(10),
metadata_gen_status character varying(10),
assemble_status character varying(10),
publish_status character varying(10),
overridden_by character varying(40),
host_machine character varying(48),
process_id integer,
retries integer,
start_time timestamp without time zone,
error_message text
);
Milestone
) for all stages of the pipeline for a single datapoint (in our example, a video)start_time
gets updated everytime a new stage starts.FAILED
stage to be retried n
number of times.overridden_by
to their own task_id
.Task Lifecycle
- Tasks are inserted with all statuses set to pending
.
- Orchestrators poll for tasks with statuses which are either pending
or failed
(but haven't reached retry threshold yet) and claim them by setting their status as inprogress
.
- Each task is processed in a goroutine, and its status is updated to completed
or failed
.
type TaskType string
const (
INGEST TaskType = "ingest_status"
TRANSCODE TaskType = "transcode_status"
METADATA_GEN TaskType = "metadata_gen_status"
ASSEMBLE TaskType = "assemble_status"
PUBLISH TaskType = "publish_status"
)
type TaskStatus string
const (
PENDING TaskStatus = "pending"
FAILED TaskStatus = "failed"
COMPLETED TaskStatus = "completed"
OVERRIDDEN TaskStatus = "overridden"
INPROGRESS TaskStatus = "inprogress"
)
type Task struct {
For TaskType
Dependencies []TaskType
}
In our example, Dependencies amongst stages would be - [] <- Ingest - [Ingest] <- Transcoding - [Ingest] <- Metadata Extraction - [Transcoding, Metadata Extraction] <- Assemble - [Assemble] <- Publish
Transcoding and Metadata Extraction can be executed in parallel, for a single datapoint.
type TaskScheduler struct {
TaskProcessor TaskProcessor
Interval time.Duration
PgClient *postgres.Client
}
func (t *TaskScheduler) Run(ctx context.Context) {
milestones := t.TaskProcessor.Poll(ctx, t.Interval, t.PgClient)
for {
select {
case m, ok := <-milestones:
if !ok {
log.Printf("[%v] polling stopped", t.TaskProcessor.GetFor())
return
}
go func(m Milestone) {
if err := t.TaskProcessor.Process(ctx, m, t.PgClient); err != nil {
... // log
} else {
... // log
}
}(m)
}
}
}
type TaskProcessor interface {
PreProcess(context.Context, Milestone, *postgres.Client) error
Process(context.Context, Milestone, *postgres.Client) error
PostProcess(context.Context, error, Milestone, *postgres.Client) error
Poll(context.Context, time.Duration, *postgres.Client) <-chan Milestone
GetFor() TaskType
}
type DefaultTaskProcessor struct {
Task // will be used in query creation for checking statuses of dependencies
}
func (d DefaultTaskProcessor) PreProcess(ctx context.Context, m Milestone, pgClient *postgres.Client) error {
// Claim the milestone object for processing - change the status from pending to inprogess
// We will discuss below how we avoid multiple race / deadlock among goroutines on this same object
}
func (d DefaultTaskProcessor) PostProcess(ctx context.Context, pErr error, m Milestone, pgClient *postgres.Client) error {
// pErr is the error returned from Process(...) which contains the business logic of a stage.
// based on if pErr is nil or not, mark the status as `completed` or `failed`
}
func (d DefaultTaskProcessor) Poll(ctx context.Context, interval time.Duration, pgClient *postgres.Client) <-chan Milestone {
ch := make(chan Milestone, 100)
// conditions for a milestone to be picked up for processing
// 1. Status should be pending or failed (but with number of retries below threshold)
// 2. All the dependencies should have status as completed.
// create dynamic query based on the above conditions
go func() {
defer close(ch)
for {
time.Sleep(interval)
// fetch all rows valid for processing
for rows.Next() {
// decode each row into milestone object and pass on to channel
ch <- m
}
}
}()
return ch
}
type AssembleOrchestrator struct {
DefaultTaskProcessor
}
func (a *AssembleOrchestrator) Process(ctx context.Context, m models.Milestone, pgClient *postgres.Client) error {
if err := a.PreProcess(ctx, m, pgClient); err != nil {
return err
}
var pErr error
// business logic
if err := a.PostProcess(ctx, pErr, m, pgClient); err != nil {
return err
}
return nil
}
assembleScheduler := TaskScheduler{
TaskProcessor: &AssembleOrchestrator{
DefaultTaskProcessor: DefaultTaskProcessor{
Task: Task{
For: ASSEMBLE,
Dependencies: []TaskType{TRANSCODE, METADATA_GEN},
},
},
},
Interval: time.Second * 5,
PgClient: pgClient,
}
Tasks are claimed from the database using FOR UPDATE SKIP LOCKED , ensuring distributed orchestrators can process tasks without conflict. All this happens in PreProcess(...)
. Here's an example query for ingestion orchestrator
UPDATE tasks SET ingest_status = 'inprogress' WHERE task_id = (SELECT task_id FROM tasks WHERE task_id = '<milestone.TaskID>' AND (ingest_status = 'pending' OR (ingest_status = 'failed' AND retries < 3)) FOR UPDATE SKIP LOCKED) RETURNING *;
With the combination of FOR UPDATE and SKIP_LOCKED our multiple goroutines can perform efficiently -- any 2 goroutines trying to process same milestone either exit early due to SKIP LOCKED or their subquery no longer returns any row because it has been updated when another goroutine was holding the lock using FOR UPDATE.
Estimated lock acquisition time under normal conditions is ~1–2ms per lock. PostgreSQL can handle tens of thousands of such locks per second for simple FOR UPDATE
operations, assuming no heavy contention or long-running transactions.
FOR UPDATE
mechanism introduces row-level locks that can become a bottleneck if multiple orchestrators frequently try to lock the same rows. This could lead to increased wait times, deadlocks, or failed transactions.locked_at
column and periodically check for tasks locked but unprocessed beyond a reasonable timeframe. Reset their status to pending
for reprocessing.