Modern data processing workflows require scalable systems capable of handling high-throughput workloads. Traditionally, such systems rely on tools like Kafka, Celery, or Airflow for orchestration. However, with Go's concurrency model and PostgreSQL's transactional guarantees, we can build a lightweight yet distributed pipeline using only these two tools.
Code available at https://github.com/mpsingh97/simple-pipeline
In this post, I’ll walk you through the architecture and implementation of such a pipeline. We'll process video tasks across five stages:
- Ingestion - Fetching the video and storing it at a centralised storage.
- Transcoding - Converting video to desired formats.
- Metadata Extraction - Extracting metadata like duration, resolution, codec etc from the video.
- Assembly - Merging metadata and transcoded files.
- Publishing - Publishing the final video to a CDN for users.
Why Go and PostgreSQL?
- Go's Concurrency Model: With goroutines and channels, Go is ideal for handling concurrent tasks with minimal overhead.
- PostgreSQL as a Queue: PostgreSQL's FOR UPDATE SKIP LOCKED feature provides an elegant way to manage task queues without additional infrastructure.
By combining these, you get a pipeline that's simple to deploy, easy to scale, and avoids unnecessary dependencies.
System Overview
Core Features
- Distributed task orchestration using PostgreSQL row-level locks.
- Lightweight, asynchronous task processing with Go goroutines.
- Fault tolerance through retry mechanisms.
Workflow The pipeline revolves around task orchestrators for each processing stage. These orchestrators handle:
- Polling the database for pending tasks.
- Executing stage-specific operations.
- Marking tasks as succeeded or failed.
Each orchestrator works independently but respects dependencies, ensuring tasks proceed in the correct order. Orchestrators use Go's goroutines for concurrent processing and PostgreSQL for task status tracking.
Database Schema
CREATE TABLE tasks ( task_id character varying(40) NOT NULL, ingest_status character varying(10), transcode_status character varying(10), metadata_gen_status character varying(10), assemble_status character varying(10), publish_status character varying(10), overridden_by character varying(40), host_machine character varying(48), process_id integer, retries integer, start_time timestamp without time zone, error_message text );
- We have a single row (we call this
Milestone) for all stages of the pipeline for a single datapoint (in our example, a video) start_timegets updated everytime a new stage starts.- We have a retry mechanism which allows a
FAILEDstage to be retriednnumber of times. - If there is an issue causing a stage to crash again and again, and lets say it gets fixed soon. All those failed datapoints which have reached failure max threshold, can be retried by generating new entries in table. The new entries would update the old entries by setting their
overridden_byto their owntask_id.
Task Lifecycle
- Tasks are inserted with all statuses set to
pending. - Orchestrators poll for tasks with statuses which are either
pendingorfailed(but haven't reached retry threshold yet) and claim them by setting their status asinprogress. - Each task is processed in a goroutine, and its status is updated to
completedorfailed.
type TaskType string const ( INGEST TaskType = "ingest_status" TRANSCODE TaskType = "transcode_status" METADATA_GEN TaskType = "metadata_gen_status" ASSEMBLE TaskType = "assemble_status" PUBLISH TaskType = "publish_status" ) type TaskStatus string const ( PENDING TaskStatus = "pending" FAILED TaskStatus = "failed" COMPLETED TaskStatus = "completed" OVERRIDDEN TaskStatus = "overridden" INPROGRESS TaskStatus = "inprogress" ) type Task struct { For TaskType Dependencies []TaskType }
In our example, Dependencies amongst stages would be
- [] <- Ingest
- [Ingest] <- Transcoding
- [Ingest] <- Metadata Extraction
- [Transcoding, Metadata Extraction] <- Assemble
- [Assemble] <- Publish
Transcoding and Metadata Extraction can be executed in parallel, for a single datapoint.
High Level Code Walkthrough
Key components
- TaskScheduler: Manages polling and task execution.
type TaskScheduler struct { TaskProcessor TaskProcessor Interval time.Duration PgClient *postgres.Client } func (t *TaskScheduler) Run(ctx context.Context) { milestones := t.TaskProcessor.Poll(ctx, t.Interval, t.PgClient) for { select { case m, ok := <-milestones: if !ok { log.Printf("[%v] polling stopped", t.TaskProcessor.GetFor()) return } go func(m Milestone) { if err := t.TaskProcessor.Process(ctx, m, t.PgClient); err != nil { ... // log } else { ... // log } }(m) } } }
- TaskProcessor: Each orchestrator must implement this interface
type TaskProcessor interface { PreProcess(context.Context, Milestone, *postgres.Client) error Process(context.Context, Milestone, *postgres.Client) error PostProcess(context.Context, error, Milestone, *postgres.Client) error Poll(context.Context, time.Duration, *postgres.Client) <-chan Milestone GetFor() TaskType }
- DefaultTaskProcessor: Handles task lifecycle methods (pre-process, process, post-process). This helps orchestrators to only define the main business logic, and abstracting away everything else.
type DefaultTaskProcessor struct { Task // will be used in query creation for checking statuses of dependencies } func (d DefaultTaskProcessor) PreProcess(ctx context.Context, m Milestone, pgClient *postgres.Client) error { // Claim the milestone object for processing - change the status from pending to inprogess // We will discuss below how we avoid multiple race / deadlock among goroutines on this same object } func (d DefaultTaskProcessor) PostProcess(ctx context.Context, pErr error, m Milestone, pgClient *postgres.Client) error { // pErr is the error returned from Process(...) which contains the business logic of a stage. // based on if pErr is nil or not, mark the status as `completed` or `failed` } func (d DefaultTaskProcessor) Poll(ctx context.Context, interval time.Duration, pgClient *postgres.Client) <-chan Milestone { ch := make(chan Milestone, 100) // conditions for a milestone to be picked up for processing // 1. Status should be pending or failed (but with number of retries below threshold) // 2. All the dependencies should have status as completed. // create dynamic query based on the above conditions go func() { defer close(ch) for { time.Sleep(interval) // fetch all rows valid for processing for rows.Next() { // decode each row into milestone object and pass on to channel ch <- m } } }() return ch }
Example - Assemble Orchestrator
type AssembleOrchestrator struct { DefaultTaskProcessor } func (a *AssembleOrchestrator) Process(ctx context.Context, m models.Milestone, pgClient *postgres.Client) error { if err := a.PreProcess(ctx, m, pgClient); err != nil { return err } var pErr error // business logic if err := a.PostProcess(ctx, pErr, m, pgClient); err != nil { return err } return nil } assembleScheduler := TaskScheduler{ TaskProcessor: &AssembleOrchestrator{ DefaultTaskProcessor: DefaultTaskProcessor{ Task: Task{ For: ASSEMBLE, Dependencies: []TaskType{TRANSCODE, METADATA_GEN}, }, }, }, Interval: time.Second * 5, PgClient: pgClient, }
Polling with PostgreSQL
Tasks are claimed from the database using FOR UPDATE SKIP LOCKED , ensuring distributed orchestrators can process tasks without conflict. All this happens in PreProcess(...). Here's an example query for ingestion orchestrator
UPDATE tasks SET ingest_status = 'inprogress' WHERE task_id = (SELECT task_id FROM tasks WHERE task_id = '<milestone.TaskID>' AND (ingest_status = 'pending' OR (ingest_status = 'failed' AND retries < 3)) FOR UPDATE SKIP LOCKED) RETURNING *;
How does this query work?
- FOR UPDATE This clause places a lock on the rows selected by the subquery. The lock prevents other transactions from modifying or locking these rows until the current transaction releases the lock.
- SKIP LOCKED When a row is already locked by another transaction, SKIP LOCKED causes PostgreSQL to skip over it instead of waiting for the lock to be released. This is particularly useful when multiple threads or processes are processing a work queue, as it avoids deadlocks and contention.
- RETURNING * After updating the rows, this clause returns the updated rows, which is helpful for processing the records further in the application.
With the combination of FOR UPDATE and SKIP_LOCKED our multiple goroutines can perform efficiently -- any 2 goroutines trying to process same milestone either exit early due to SKIP LOCKED or their subquery no longer returns any row because it has been updated when another goroutine was holding the lock using FOR UPDATE.
Estimated lock acquisition time under normal conditions is ~1–2ms per lock. PostgreSQL can handle tens of thousands of such locks per second for simple FOR UPDATE operations, assuming no heavy contention or long-running transactions.
Scalability
- Horizontal Scaling: Spin up more worker instances. PostgreSQL ensures task distribution through locks.
- Read Replicas: Use replicas for polling tasks without overloading the primary database.
- Dynamic Intervals: Adjust polling frequency based on system load.
Advantages
- Simplicity: No need for external message brokers or task schedulers.
- Cost Efficiency: PostgreSQL serves as both the task queue and storage layer, reducing infrastructure complexity.
- Fine-Grained Control - Preprocessing and postprocessing steps allow for fine-tuned handling of task workflows.
Challenges & Mitigations (to somewhat extent)
- Database Lock Contention
The
FOR UPDATEmechanism introduces row-level locks that can become a bottleneck if multiple orchestrators frequently try to lock the same rows. This could lead to increased wait times, deadlocks, or failed transactions.- Mitigations
- Provide context with deadlines when querying, so that orchestrators can exit early under high contention and reduce contention.
- Partition tasks by categories (e.g., task type) and distribute them across multiple task tables.
- Add a
locked_atcolumn and periodically check for tasks locked but unprocessed beyond a reasonable timeframe. Reset their status topendingfor reprocessing.
- Mitigations
- Limited Flexibility The database-centric design tightly couples application logic with PostgreSQL, making it harder to switch to another task queueing or coordination system in the future.
- Task Execution Latency While task execution primarily depends on business logic, the preprocessing (locking) and postprocessing steps add latency.