Nextflow documentation is migrating

Nextflow documentation is being migrated to docs.seqera.io/nextflow. This site will remain available throughout the migration.

Operators (typed)

New in version 26.04.0.

This page describes the core operators that are recommended for use with static typing.

collect

Channel<E> collect() -> Value<Bag<E>>

The collect operator collects all values from a source channel into a collection and emits it as a dataflow value:

channel.of( 1, 2, 3, 4 )
    .collect()
    .view()
[1, 2, 3, 4]

combine

Channel<L> combine( other: Channel<R> ) -> Channel<Tuple>

Channel<L> combine( other: Value<R> ) -> Channel<Tuple>

The combine operator emits every pairwise combination of a source channel with another channel or dataflow value:

numbers = channel.of(1, 2, 3)
words = channel.of('hello', 'ciao')

numbers
    .combine(words)
    .view()
[1, hello]
[2, hello]
[3, hello]
[1, ciao]
[2, ciao]
[3, ciao]

Tuples in both the left- and right-hand sources are flattened in the combined tuple. For example, tuple(1, 2) and tuple('red', 'blue') are combined as tuple(1, 2, 'red', 'blue').

Channel<Record> combine( [opts] ) -> Channel<Record>

When the combine operator is called on a channel of records with named arguments, the named arguments are appended to each record in the source channel. Each named argument can be a value or dataflow value.

filter

Channel<E> filter( condition: (E) -> Boolean ) -> Channel<E>

The filter operator emits the values from a source channel that satisfy a condition, discarding all other values:

channel.of( 1, 2, 3, 4, 5 )
    .filter { v -> v % 2 == 1 }
    .view()
1
3
5

flatMap

Channel<E> flatMap( transform: (E) -> Iterable<R> ) -> Channel<R>

The flatMap operator applies a mapping function to each value from a source channel. The mapping function should return a collection, and each element in the collection is emitted separately.

For example:

channel.of( 1, 2, 3 )
    .flatMap { n -> [ n, n*2, n*3 ] }
    .view()
1
2
3
2
4
6
3
6
9

groupBy

Channel<Tuple<K, V>> groupBy() -> Channel<Tuple<K, Bag<V>>>

Channel<Tuple<K, Integer, V>> groupBy() -> Channel<Tuple<K, Bag<V>>>

The groupBy operator collects values from a source channel into groups based on a grouping key. A tuple is emitted for each group, containing the grouping key and collection of values.

The source channel should supply either 2-tuples of the form (<key>, <value>) or 3-tuples of the form (<key>, <size>, <value>).

If the source tuples do not specify a size, groupBy will not emit any groups until all inputs have been received:

channel.of( tuple(1, 'A'), tuple(1, 'B'), tuple(2, 'C'), tuple(3, 'B'), tuple(1, 'C'), tuple(2, 'A'), tuple(3, 'D') )
    .groupBy()
    .map { key, values -> tuple(key, values.toSorted()) }
    .view()
[1, [A, B, C]]
[2, [A, C]]
[3, [B, D]]

If the source tuples do specify a size, then groupBy will emit each group as soon as it is ready:

channel.of(
        tuple('chr1', ['/path/to/region1_chr1.vcf', '/path/to/region2_chr1.vcf']),
        tuple('chr2', ['/path/to/region1_chr2.vcf', '/path/to/region2_chr2.vcf', '/path/to/region3_chr2.vcf']),
    )
    .flatMap { chr, vcfs ->
        vcfs.collect { vcf ->
            tuple(chr, vcfs.size(), vcf)    // preserve group size
        }
    }
    .view { v -> "scattered: ${v}" }
    .groupBy()
    .map { key, values -> tuple(key, values.toSorted()) }
    .view { v -> "gathered: ${v}" }
scattered: [chr1, 2, /path/to/region1_chr1.vcf]
scattered: [chr1, 2, /path/to/region2_chr1.vcf]
scattered: [chr2, 3, /path/to/region1_chr2.vcf]
scattered: [chr2, 3, /path/to/region2_chr2.vcf]
scattered: [chr2, 3, /path/to/region3_chr2.vcf]
gathered: [chr1, [/path/to/region1_chr1.vcf, /path/to/region2_chr1.vcf]]
gathered: [chr2, [/path/to/region1_chr2.vcf, /path/to/region2_chr2.vcf, /path/to/region3_chr2.vcf]]

Note

When specifying the group size, make sure that the number of inputs for a given group matches the specified size for that group. Otherwise, the run will fail.

join

Channel<Record> join( other: Channel<Record>, [opts] ) -> Channel<Record>

The join operator emits the relational join of two channels of records, using a matching key given by the by option:

left  = channel.of( record(id: 'X', a: 1), record(id: 'Y', a: 2), record(id: 'Z', a: 3), record(id: 'P', a: 7) )
right = channel.of( record(id: 'Z', b: 6), record(id: 'Y', b: 5), record(id: 'X', b: 4) )

left.join(right, by: 'id').view()
[id:Z, a:3, b:6]
[id:Y, a:2, b:5]
[id:X, a:1, b:4]

Duplicate matching keys are handled by emitting each matching combination (like a relational join):

left  = channel.of( record(id: 'X', a: 1), record(id: 'X', a: 3) )
right = channel.of( record(id: 'X', b: 2), record(id: 'X', b: 4) )

left.join(right, by: 'id').view()
[id:X, a:1, b:2]
[id:X, a:1, b:4]
[id:X, a:3, b:2]
[id:X, a:3, b:4]

By default, unmatched values are discarded. The remainder option can be used to emit them at the end:

left  = channel.of( record(id: 'X', a: 1), record(id: 'Y', a: 2), record(id: 'Z', a: 3), record(id: 'P', a: 7) )
right = channel.of( record(id: 'Z', b: 6), record(id: 'Y', b: 5), record(id: 'X', b: 4), record(id: 'Q', b: 8) )

left.join(right, by: 'id', remainder: true).view()
[id:Y, a:2, b:5]
[id:Z, a:3, b:6]
[id:X, a:1, b:4]
[id:P, a:7]
[id:Q, b:8]

Available options:

by: String

This option is required.

The record field to use as the matching key.

remainder: Boolean

When true, unmatched values are emitted at the end, otherwise they are discarded (default: false).

map

Channel<E> map( transform: (E) -> R ) -> Channel<R>

The map operator applies a mapping function to each value from a source channel:

channel.of( 1, 2, 3, 4, 5 )
    .map { v -> v * v }
    .view()
1
4
9
16
25

mix

Channel<E> mix( other: Channel<E> ) -> Channel<E>

Channel<E> mix( other: Value<E> ) -> Channel<E>

The mix operator emits the values from a channel and another channel or dataflow value into a single output channel:

c1 = channel.of( '1', '2', '3' )
c2 = channel.of( 'a', 'b' )
v3 = channel.value( 'z' )

c1.mix(c2).mix(v3).view()
1
2
3
a
b
z

The values in the output channel may be emitted in any order, for example:

z
1
a
2
b
3

reduce

Channel<E> reduce( seed: R, accumulator: (R, E) -> R ) -> Value<R>

Channel<E> reduce( accumulator: (E, E) -> E ) -> Value<E>

The reduce operator applies an accumulator function sequentially to each value in a source channel, and emits the final accumulated value.

The accumulator function takes two parameters – the accumulated value and the i-th emitted value – and it should return the accumulated result, which is passed to the next invocation with the i+1-th value. This process is repeated for each value in the source channel.

For example:

channel.of( 1, 2, 3, 4, 5 )
    .reduce { a, b ->
        println "a: $a b: $b"
        a + b
    }
    .view { result -> "result = $result" }
a: 1 b: 2
a: 3 b: 3
a: 6 b: 4
a: 10 b: 5
result = 15

By default, the first value is used as the initial accumulated value (the seed). You can optionally specify a different initial value as shown below:

channel.of( 1, 2, 3, 4, 5 )
    .reduce( 'result:' ) { acc, v ->
        println acc
        acc + ' ' + v
    }
    .view { result -> "final $result" }
result:
result: 1
result: 1 2
result: 1 2 3
result: 1 2 3 4
final result: 1 2 3 4 5

subscribe

Channel<E> subscribe( action: (E) -> () )

Channel<E> subscribe( [opts] )

The subscribe operator invokes a custom function for each value from a source channel:

source = channel.of( 'alpha', 'beta', 'delta' )

source.subscribe { str ->
    println "Got: ${str}; len: ${str.length()}"
}
Got: alpha; len: 5
Got: beta; len: 4
Got: delta; len: 5

The subscribe operator supports multiple types of event handlers:

channel.of( 1, 2, 3 ).subscribe(
    onNext: { v -> println v },
    onComplete: { println 'Done' }
)
1
2
3
Done

Note

Unlike most operators, subscribe does not return anything. It should only be used for side effects, such as printing to the console, writing to a file, or making HTTP requests.

Available options:

onNext: (E) -> ()

Closure that is invoked when an value is emitted. Equivalent to providing a single closure argument.

onComplete: () -> ()

Closure that is invoked after the last value is emitted by the channel.

onError: (T) -> ()

Closure that is invoked when an exception is raised while handling the onNext event. It will not make further calls to onNext or onComplete. The onError method takes as its parameter the Throwable that caused the error.

unique

Channel<E> unique( transform: (E) -> ? ) -> Channel<E> Channel<E> unique() -> Channel<E>

The unique operator emits the unique values from a source channel:

channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 2, 3 )
    .unique()
    .view()
1
2
3

An optional closure can be used to transform each value before it is evaluated for uniqueness:

channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 4, 6 )
    .unique { v -> v % 2 }
    .view()
1
2

until

Channel<E> until( condition: (E) -> Boolean ) -> Channel<E>

The until operator emits each value from a source channel until a stopping condition is satisfied:

channel.of( 3, 2, 1, 5, 1, 5 )
    .until { v -> v == 5 }
    .view()
3
2
1

view

Channel<E> view( transform: (E) -> String, [opts] ) -> Channel<E>

Channel<E> view( [opts] ) -> Channel<E>

The view operator prints each value from a source channel to standard output:

channel.of(1, 2, 3).view()
1
2
3

An optional closure can be used to transform each value before it is printed:

channel.of(1, 2, 3)
    .map { v -> [v, v*v] }
    .view { num, sqr -> "The square of $num is $sqr" }
The square of 1 is 1
The square of 2 is 4
The square of 3 is 9

The tag option can be used to print the channel only when the -dump-channels command-line option is specified with the given tag:

channel.of( 1, 2, 3 )
    .map { v -> v + 1 }
    .view(tag: 'plus1')

channel.of( 1, 2, 3 )
    .map { v -> v ** 2 }
    .view(tag: 'exp2')

You can run this script with -dump-channels plus1 or -dump-channels exp2 to print either channel, or -dump-channels plus1,exp2 to print both.

The view operator also emits every value that it receives, allowing it to be chained with other operators.

Available options:

newLine: Boolean

Print each value to a separate line (default: true).

tag: String

Print the channel values only when -dump-channels is specified on the command line with the given tag.