Created 2024/12/02 at 06:59PM

Last Modified 2025/01/09 at 09:14PM

Modern data processing workflows require scalable systems capable of handling high-throughput workloads. Traditionally, such systems rely on tools like Kafka, Celery, or Airflow for orchestration. However, with Go's concurrency model and PostgreSQL's transactional guarantees, we can build a lightweight yet distributed pipeline using only these two tools.

Code available at https://github.com/mpsingh97/simple-pipeline

In this post, I’ll walk you through the architecture and implementation of such a pipeline. We'll process video tasks across five stages: - Ingestion - Fetching the video and storing it at a centralised storage. - Transcoding - Converting video to desired formats. - Metadata Extraction - Extracting metadata like duration, resolution, codec etc from the video. - Assembly - Merging metadata and transcoded files. - Publishing - Publishing the final video to a CDN for users.

Why Go and PostgreSQL?

By combining these, you get a pipeline that's simple to deploy, easy to scale, and avoids unnecessary dependencies.

System Overview

Core Features - Distributed task orchestration using PostgreSQL row-level locks. - Lightweight, asynchronous task processing with Go goroutines. - Fault tolerance through retry mechanisms.

Workflow The pipeline revolves around task orchestrators for each processing stage. These orchestrators handle: - Polling the database for pending tasks. - Executing stage-specific operations. - Marking tasks as succeeded or failed.

Each orchestrator works independently but respects dependencies, ensuring tasks proceed in the correct order. Orchestrators use Go's goroutines for concurrent processing and PostgreSQL for task status tracking.

Database Schema

CREATE TABLE tasks (
    task_id character varying(40) NOT NULL,
    ingest_status character varying(10),
    transcode_status character varying(10),
    metadata_gen_status character varying(10),
    assemble_status character varying(10),
    publish_status character varying(10),
    overridden_by character varying(40),
    host_machine character varying(48),
    process_id integer,
    retries integer,
    start_time timestamp without time zone,
    error_message text
);

Task Lifecycle - Tasks are inserted with all statuses set to pending. - Orchestrators poll for tasks with statuses which are either pending or failed (but haven't reached retry threshold yet) and claim them by setting their status as inprogress. - Each task is processed in a goroutine, and its status is updated to completed or failed.

type TaskType string

const (
    INGEST       TaskType = "ingest_status"
    TRANSCODE    TaskType = "transcode_status"
    METADATA_GEN TaskType = "metadata_gen_status"
    ASSEMBLE     TaskType = "assemble_status"
    PUBLISH      TaskType = "publish_status"
)

type TaskStatus string

const (
    PENDING    TaskStatus = "pending"
    FAILED     TaskStatus = "failed"
    COMPLETED  TaskStatus = "completed"
    OVERRIDDEN TaskStatus = "overridden"
    INPROGRESS TaskStatus = "inprogress"
)

type Task struct {
    For          TaskType
    Dependencies []TaskType
}

In our example, Dependencies amongst stages would be - [] <- Ingest - [Ingest] <- Transcoding - [Ingest] <- Metadata Extraction - [Transcoding, Metadata Extraction] <- Assemble - [Assemble] <- Publish

Transcoding and Metadata Extraction can be executed in parallel, for a single datapoint.

High Level Code Walkthrough

Key components

type TaskScheduler struct {
    TaskProcessor TaskProcessor
    Interval      time.Duration
    PgClient      *postgres.Client
}

func (t *TaskScheduler) Run(ctx context.Context) {
    milestones := t.TaskProcessor.Poll(ctx, t.Interval, t.PgClient)
    for {
        select {
        case m, ok := <-milestones:
            if !ok {
                log.Printf("[%v] polling stopped", t.TaskProcessor.GetFor())
                return
            }
            go func(m Milestone) {
                if err := t.TaskProcessor.Process(ctx, m, t.PgClient); err != nil {
                    ... // log
                } else {
                    ... // log
                }
            }(m)
        }
    }
}
type TaskProcessor interface {
    PreProcess(context.Context, Milestone, *postgres.Client) error
    Process(context.Context, Milestone, *postgres.Client) error
    PostProcess(context.Context, error, Milestone, *postgres.Client) error
    Poll(context.Context, time.Duration, *postgres.Client) <-chan Milestone
    GetFor() TaskType
}
type DefaultTaskProcessor struct {
    Task // will be used in query creation for checking statuses of dependencies
}

func (d DefaultTaskProcessor) PreProcess(ctx context.Context, m Milestone, pgClient *postgres.Client) error {
    // Claim the milestone object for processing - change the status from pending to inprogess
    // We will discuss below how we avoid multiple race / deadlock among goroutines on this same object
}

func (d DefaultTaskProcessor) PostProcess(ctx context.Context, pErr error, m Milestone, pgClient *postgres.Client) error {
    // pErr is the error returned from Process(...) which contains the business logic of a stage.
    // based on if pErr is nil or not, mark the status as `completed` or `failed`
}

func (d DefaultTaskProcessor) Poll(ctx context.Context, interval time.Duration, pgClient *postgres.Client) <-chan Milestone {
    ch := make(chan Milestone, 100)
    // conditions for a milestone to be picked up for processing
    // 1. Status should be pending or failed (but with number of retries below threshold)
    // 2. All the dependencies should have status as completed.
    // create dynamic query based on the above conditions
    go func() {
        defer close(ch)
        for {
            time.Sleep(interval)
            // fetch all rows valid for processing
            for rows.Next() {
                // decode each row into milestone object and pass on to channel
                ch <- m
            }
        }
    }()
    return ch
}

Example - Assemble Orchestrator

type AssembleOrchestrator struct {
    DefaultTaskProcessor
}

func (a *AssembleOrchestrator) Process(ctx context.Context, m models.Milestone, pgClient *postgres.Client) error {
    if err := a.PreProcess(ctx, m, pgClient); err != nil {
        return err
    }
    var pErr error
    // business logic
    if err := a.PostProcess(ctx, pErr, m, pgClient); err != nil {
        return err
    }
    return nil
}

assembleScheduler := TaskScheduler{
    TaskProcessor: &AssembleOrchestrator{
        DefaultTaskProcessor: DefaultTaskProcessor{
            Task: Task{
                For:          ASSEMBLE,
                Dependencies: []TaskType{TRANSCODE, METADATA_GEN},
            },
        },
    },
    Interval: time.Second * 5,
    PgClient: pgClient,
}

Polling with PostgreSQL

Tasks are claimed from the database using FOR UPDATE SKIP LOCKED , ensuring distributed orchestrators can process tasks without conflict. All this happens in PreProcess(...). Here's an example query for ingestion orchestrator

UPDATE tasks SET ingest_status = 'inprogress' WHERE task_id = (SELECT task_id FROM tasks WHERE task_id = '<milestone.TaskID>' AND (ingest_status = 'pending' OR (ingest_status = 'failed' AND retries < 3)) FOR UPDATE SKIP LOCKED) RETURNING *;

How does this query work?

With the combination of FOR UPDATE and SKIP_LOCKED our multiple goroutines can perform efficiently -- any 2 goroutines trying to process same milestone either exit early due to SKIP LOCKED or their subquery no longer returns any row because it has been updated when another goroutine was holding the lock using FOR UPDATE.

Estimated lock acquisition time under normal conditions is ~1–2ms per lock. PostgreSQL can handle tens of thousands of such locks per second for simple FOR UPDATE operations, assuming no heavy contention or long-running transactions.

Scalability

Advantages

Challenges & Mitigations (to somewhat extent)