Skip to main content

Advanced Patterns

Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam. Patterns you reach for once the basics aren't enough.

Conditional branches

import { pipeline, step, when } from '@flux/core';

export default pipeline('orders.route')
.step(step('classify').run(classify))
.branch(
when((ctx) => ctx.data.country === 'SE').then(
step('vat-se').run(applySwedishVat),
),
when((ctx) => ctx.data.country === 'NO').then(
step('vat-no').run(applyNorwegianVat),
),
when.otherwise().then(step('vat-eu').run(applyEuVat)),
);

Retry with jittered back-off

import { step } from '@flux/core';

const publish = step('publish')
.retry({
maxAttempts: 6,
backoff: 'exponential',
initialMs: 250,
maxMs: 30_000,
jitter: 'full',
retryOn: (err) => err.code !== 'EPERM',
})
.to('kafka://orders.v1');

Fan-in / merge

import { pipeline, step, merge } from '@flux/core';

export default pipeline('report.monthly')
.parallel(
step('orders').from('postgres://orders/last-month'),
step('refunds').from('postgres://refunds/last-month'),
step('shipping').from('postgres://shipments/last-month'),
)
.step(
merge('combine').into((parts) => ({
orders: parts.orders.length,
refunds: parts.refunds.length,
shipping: parts.shipping.reduce((n, s) => n + s.weight_kg, 0),
})),
)
.step(step('email').to(`email://finance@example.com`));

Custom source

Implement the Source<T> interface to plug in anything Flux doesn't natively support:

import type { Source } from '@flux/core';

export function shopifyOrdersSince(cursor: string): Source<Order> {
return {
async *read(ctx) {
let next: string | null = cursor;
while (next) {
const page = await ctx.http.get(`/orders.json?since=${next}`);
for (const order of page.orders) yield order;
next = page.nextCursor;
}
},
};
}
Prefer generators

Sources are async iterables so Flux can back-pressure them naturally. Don't pre-materialise large result sets into arrays — you'll just OOM the process.