En el artículo anterior vimos los conceptos básicos de Nextflow (process, channel y workflow). En este, vamos a profundizar un poco más en el concepto de canales (channel)
El Channel es el medio por el que "hacemos llegar" datos a los procesos.
Existen múltiples formas de crear un canal. Algunos ejemplos:
ch1 = Channel.create()
ch1 << 1
ch1 << 'hola'
ch2 = Channel.of( 1, 3, 5, 7 )
ch3 = Channel.fromPath( '/data/some/*.txt' )
ch4 = Channel.watchPath( '/path/*.fa' )
y el canal irá emitiendo los elementos disponibles según se vayan consumiendo (es decir, el canal es una cola no bloqueante que conecta consumidores)
- INFO
-
También existe el tipo de Channel
value
que emite siempre el mismo valor y que puede ser leído mútiples veces. Por ejemplochn = Channel.value('hi')
crea un canal que siempre emitirá el stringhi
cuando se lea de él.
Operadores
Los canales disponen de un conjunto de operadores (extensibles, puedes añadir los tuyos mediante el mecanismo de plugins que hablaremos en otro post) que permiten modificar los valores emitidos por estos (por ejemplo, un canal emite todos los ficheros de un directorio pero queremos filtrar por un criterio diferente al nombre)
Algunos de los operadores disponibles:
filter (filtra por una expresion regular o por una lógica tuya, por ejemplo
channel.of(1,2,3).filter{ it % 2 == 1 }.view()
unique
first, last
until
map
collect
toSortedList
… (la lista completa de los operadores por defecto está en https://www.nextflow.io/docs/latest/operator.html#)
Ejemplo
Como caso "práctico" vamos a crear un pipeline que descargue un CSV del Ayuntamiento de Madrid, filtre algunos registros y los muestre
El csv en concreto es la relacion de accidentes de trafico ocurridos durante el último año, donde se especifican, entre otros, el distrito y si hubo rastros de alcohol (campos 6 y 17 respectivamente) y el tipo de accidente (campo 7)
La página del catálogo de datos
parsecsv.nf
workflow {
Channel.fromPath( "https://datos.madrid.es/egob/catalogo/300228-24-accidentes-trafico-detalle.csv" )
| splitCsv(sep:';')
| filter{ row ->
// distrito && alchohol
row[6] == 'MORATALAZ' && row[17] == 'S'
}
| map { row ->
// tipo
row[7]
}
| view
}
Si ejecutas este pipeline obtendras algo como:
nextflow run ./parsecsv.nf
N E X T F L O W ~ version 22.04.5
Launching `./parsecsv.nf` [shrivelled_meninsky] DSL2 - revision: 93c84a05a0
Atropello a persona
Vuelco
Colisión múltiple
Colisión múltiple
Choque contra obstáculo fijo
Vuelco
Choque contra obstáculo fijo
Choque contra obstáculo fijo
Colisión fronto-lateral
Alcance
Choque contra obstáculo fijo
Vuelco
Colisión fronto-lateral
Alcance
Choque contra obstáculo fijo
Vuelco
Choque contra obstáculo fijo
Choque contra obstáculo fijo
Choque contra obstáculo fijo
- WARNING
-
la utilidad real de los canales, y en realidad de Nextflow, va mucho más allá de este ejemplo pero creo que estos pequeños ejercicios nos sirven para ir aproximandonos al lenguaje e ir intuyendo su potencial
- INFO
-
Como puedes observar el DSL permite concatenar operadores mediante el uso del pipe "|"
-
Al ejecutar este pipeline, Nextflow creará un canal (anónimo) que emitirá un elemento
path
(Nextflow es capaz de manejar URLs como si fueran ficheros locales, como en este caso) -
Este
path
será consumido por el operadorsplitCSV
el cual a su ve emitirá las líneas del CSV parseadas como una lista -
Cada línea será filtrada por la ejecución de una closure que devolverá
true
ofalse
para indicar si el elemento tiene que ser emitido o no -
El operador
map
recibe una lista de elementos (por cada linea filtrada) y emite un sólo campo de esta lista -
Por último usamos un operador
view
que simplemente muestra por consola lo que lee por su canal
-
Conclusión
Como se puede intuir, los canales son una herramienta imprescindible para diseñar nuestros pipelines. Su naturaleza asíncrona y capacidad de paralelizar el envío de los mensajes nos ofrecen una potencia para realizar tareas complejas muy interesante.