The aim of the course is to introduce methods required in natural language
processing (processing huge data sets in distributed environment and performing
machine learning) and show how to effectively execute them on ÚFAL computational
Linux cluster. The course will cover ÚFAL network and cluster architecture,
Slurm, Spark, related Linux tools, and best practices.
The course follows the outline in the ÚFAL wiki:
Introduction to ÚFAL
(you will need an ÚFAL wiki account to access that site; each ÚFAL PhD student
is entitles to get a wiki account).
The whole course is taught in several first weeks of the semester.
To be able to meaningfully participate in the course and to complete the
assignments, it is
necessary to have access to the ÚFAL computational cluster
.
The course is therefore
highly suitable for ÚFAL PhD students
, but
unsuitable
for other students
, apart from exceptional cases.
About
SIS code:
NPFL118
Semester: winter
E-credits: 3
Examination: 0/2 C
Guarantors:
Martin Popel
,
Rudolf Rosa
,
Milan Straka
.
Requirements
In order to pass the course, you have to attend the meetings and do all the
required assignments.
License
Unless otherwise stated, teaching materials for this course are available under CC BY-SA 4.0.
This page describes a possible initial configuration of your Linux environment.
You are of course free to modify it in any way you want :-)
.profile
The
.profile
is run automatically when you log in (i.e., when you log in
to your desktop of to a machine via SSH). Note that
.bash_profile
has
precedence, so if you have it,
.profile
will not be used. Therefore,
if you have
.bash_profile
, move its content to
.profile
and then remove
.bash_profile
(or the other way around).
export EDITOR=vim
export PATH="$HOME/bin:$PATH"
export PATH="$HOME/.local/bin:$PATH"
if [ -f /etc/profile.d/modules.sh ]; then
source /etc/profile.d/modules.sh
export PATH="/net/projects/spark/bin:/net/projects/spark/slurm:/net/projects/spark/sbt/bin:$PATH"
if false; then
export PATH="/opt/cuda/12.3/bin:$PATH"
export XLA_FLAGS=--xla_gpu_cuda_data_dir=/opt/cuda/12.3
export LD_LIBRARY_PATH="/opt/cuda/12.3/lib64:/opt/cuda/12.3/cudnn/8.9.7/lib:/opt/cuda/12.3/nccl/2.20.3/lib:/opt/cuda/12.3/extras/CUPTI/lib64:$LD_LIBRARY_PATH"
export LC_CTYPE=cs_CZ.UTF-8 LC_COLLATE=cs_CZ.UTF-8
[ "$TERM" = linux ] && export TERM=xterm
if [ -n "$BASH_VERSION" ]; then
[ -f ~/.bashrc ] && . ~/.bashrc
.bashrc
The .bashrc
is run whenever you open a new terminal window. Note that
it is customary for your .profile
to also run .bashrc
(the last
three lines of the above file).
[ -z "$PS1" ] && return
export HISTCONTROL=ignorespace:erasedups HISTSIZE=8192 HISTFILESIZE=16384
shopt -s checkwinsize cmdhist histappend
PS1='${debian_chroot:+($debian_chroot)}\[\033[01;32m\]\u@\h\[\033[00m\]:\[\033[01;34m\]\w\[\033[00m\]\$ '
if [ -x /usr/bin/dircolors ]; then
eval "`dircolors -b`"
alias ls='ls --color=auto'
export GREP_COLORS="ms=01;32:mc=01;32:sl=:cx=:fn=34:ln=36:bn=36:se=36"
alias grep='grep --color'
export LESS_TERMCAP_{md=$'\E[01;32m',me=$'\E[0m',us=$'\E[01;33m',ue=$'\E[0m'}
if [ -f /etc/bash_completion ]; then
. /etc/bash_completion
.screenrc
If you submit screen -D -m
as an SGE job, screen
starts without
a connected terminal and cannot copy sane settings from it; it is
therefore a good idea to have them in .screenrc
.
altscreen
defbce on
defencoding utf-8
defflow off
defscrollback 65536
term xterm #or other, if you prefer
hardstatus on
hardstatus alwayslastline
hardstatus string "%{= bw}Screen: %Lw"
caption string "%?%F%{.R.}%?%3n %t%? [%h]%?"
setenv PROMPT_COMMAND "echo -ne '\ek\e\\\\\eksh '\${PWD##*/}'\e\\\\'"
shelltitle "$ |bash"
startup_message off
Quick Intro
sbatch [options] script
: submit a job for execution
-J, --job-name=name
set name of the job
-o, --output=outpath
set file with the standard output of the job; default slurm-%j.out
-e, --error=outpath
set file with the standard error of the job; default is to merge with the standard output
-W, --wait
wait till the job finishes
-n, --ntasks=number
number of tasks, allocated on possibly different machines
--spread-job
spread the tasks over many nodes to evenly distribute them
-N, --nodes=<minnodes>[-maxnodes]
minimum to maximum number of nodes (machines) to use
-N 1
allocates all the tasks on a single node
-c, --cpus-per-task=number
number of CPUs per task
-G, --gpus=number
number of GPUs for the whole job
--gpus-per-task=number
number of GPUs per task
--mem=size[KMGT]
memory per node
--mem-per-cpu=size[KMGT]
memory per CPU
--mem-per-gpu=size[KMGT]
memory per GPU
-p partition[,partition2,...]
submit only to a given partition (queue)
-w, --nodelist=node[,node[1-8],...]
submit the job to all these nodes (machines)
-x, --exclude=node[,node[1-8],...]
exclude these nodes when submitting the job
-q priority
sets the priority of your job; low
, normal
, high
are available, with normal
being the default
-d, --dependency=afterok:job_id[:job_id]
run when the mentioned jobs successfully finish
apart afterok
, there are also afternotok
, after
, afterany
, aftercorr
, singleton
--mail-type=[NONE|BEGIN|END|FAIL|REQUEUE|ALL|ARRAY_TASKS|...]
send a notification email on the specified events
SLURM automatically sets up many environment variables, which you can use in your jobs. A few of them are
SLURM_JOB_ID
the job ID
SLURM_JOB_NAME
the job name
CUDA_VISIBLE_DEVICES
the GPU devices allocated for the job
TMPDIR
local temporary directory reserved for the job; should be used instead of /tmp
You can submit only a shell script via sbatch
. If you want to just execute
a command, you can write a simple shell script that just executes the command it
is given. Such a script is for example ~straka/bin.local/run
. If you copy/link
it somewhere to your $PATH
, you can then ruse sbatch run command
.
Array Jobs
-a, --array=1-n
start array job with n tasks numbered 1…n
environmental variable SLURM_ARRAY_TASK_ID
output file slurm-%A_%a.out
-a, --array=m-n[:s]
start array job with tasks m,m+s,…,n
environmental variables SLURM_ARRAY_TASK_MIN
, SLURM_ARRAY_TASK_MAX
, SLURM_ARRAY_TASK_STEP
-a, --array=m-n[:s]%p
run at most p tasks simultaneously
scontrol update JobId=job_id ArrayTaskThrottle=p
changes the limit of
simultaneously running tasks of the given array job
-d, --dependency=aftercorr:job_id[:job_id]
run when the corresponding array task finished successfully
Partitions
We currently have the following partitions
cpu-ms
, cpu-troja
gpu-ms
, gpu-troja
, gpu-amd
squeue
: list of running jobs
all users by default
squeue --me
only me
squeue --user=user[,user2,...]
show the given users
scontrol show job -d $job_id
: detailed information about a job
scontrol show node -d $node_name
: detailed information about a node
~straka/bin.local/sq
: show overview of the whole cluster
~straka/bin.local/sq cpu
shows only cpu partitions
~straka/bin.local/sq gpu
shows only gpu partitions
scontrol update
: modify properties of a submitted/running job
only some properties can be modified, of course
scancel
: stops jobs
job_id [job_id2 ...]
stop the jobs with the given IDs
-n, --name=name, --jobname=name
stop jobs with the given name
for array jobs:
scancel $job_id
cancels the whole array job (all tasks)
scancel ${job_id}_${array_id}
cancels a single task
srun
: start an interactive shell
srun --pty bash
runs an interactive terminal (think ssh
)
--pty
connects not just standard input, output, and error, but also the
pseudo terminal
if you want a "lasting" interactive terminal, you can submit screen -D -m
using sbatch
Simple Array Job Example
The following script processes a Wikipedia file described in Assignments
and returns sorted list of article names.
articles.sh
(available in /net/data/npfl118/examples/
)
#!/bin/bash
set -e
[ "$#" -ge 3 ] || { echo Usage: "$0 input_file outdir tasks [conc_tasks]" >&2; exit 1; }
input_file="$1"
output_dir="$2"
tasks="$3"
conc_tasks="$4"
[ -f "$input_file" ] || { echo File $input_file does not exist >&2; exit 1; }
[ -d "$output_dir" ] && { echo Directory $output_dir already exists >&2; exit 1; }
mkdir -p "$output_dir"
sbatch --wait -o "$output_dir/task-%a.log" -a 1-"$tasks"${conc_tasks:+%$conc_tasks} \
./articles_distributed.sh "$tasks" "$input_file" "$output_dir"/articles.txt
sort -m $(seq -f "$output_dir/articles.txt.%g" 1 "$tasks") > "$output_dir"/articles.txt
rm $(seq -f "$output_dir/articles.txt.%g" 1 "$tasks")
articles_distributed.sh
(available in /net/data/npfl118/examples/
)
#!/bin/bash
set -e
[ "$#" -ge 3 ] || { echo Usage: $0 total_tasks input output_file >&2; exit 1; }
tasks="$1"
input_file="$2"
output_file="$3"
[ -n "$SLURM_ARRAY_TASK_ID" ] || { echo Variable SLURM_ARRAY_TASK_ID is not set >&2; exit 1; }
task="$SLURM_ARRAY_TASK_ID"
output_file="$output_file.$task"
tmp_file="$(mktemp)"
trap "rm -f \"$tmp_file\"" EXIT
split -n l/$task/$tasks "$input_file" | cut -f1 | sort > "$tmp_file"
mv "$tmp_file" "$output_file"
We give only a quick overview here, more detailed treatment of the GPU cluster
can be found in ÚFAL LRC wiki.
GPU jobs are scheduled as usual jobs, but in gpu-ms
or gpu-troja
partition.
You need to specify how many GPUs and of what kind you want, using
-G, --gpus=number
number of GPUs for the whole job
--gpus-per-task=number
number of GPUs per task
-C, --constraint=gpuramXXG
: only GPUs with the given RAM (11, 16, 24, 40, 48, 95)
are considered
--constraint=gpu_ccX.Y
: only consider GPUs with the given Compute
Capability (6.1, 7.5, 8.0, 8.6, 8.9, 9.0)
multiple constraings can be combined with -C "constraint1|constraint|..."
During execution, CUDA_VISIBILE_DEVICES
is set to the allocated GPUs.
Then, you need a framework which can use the GPU, and you might also need to set
paths correctly (note that PyTorch installs its own CUDA, so no configuration is
needed).
To use CUDA 12.3 and cuDNN 8.9.7, useexport PATH="/opt/cuda/12.3/bin:$PATH"
export LD_LIBRARY_PATH="/opt/cuda/12.3/lib64:/opt/cuda/12.3/cudnn/8.9.7/lib:/opt/cuda/12.3/nccl/2.20.3/lib:/opt/cuda/12.3/extras/CUPTI/lib64:$LD_LIBRARY_PATH"
Alternatively, you might use modules. You can list available modules usingmodule avail
an enable specific modules using for examplemodule load cuda/12.3
See https://modules.readthedocs.io/en/latest/ for reference.
Spark is a framework for distributed computations.
Natively it works in Python, Scala and Java.
Apart from embarrassingly parallel computations, Spark framework is suitable for
in-memory and/or iterative computations, making it suitable even for machine
learning and complex data processing. The Spark framework can run either locally
using one thread, locally using multiple threads or in a distributed fashion.
Initialization
You need to set PATH
variable to include Spark binaries, see
.profile
in the Config tab.
Running
An interactive ipython shell can be started using
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
(use pip3 install --user ipython
if you do not have ipython3
).
Such a command will use the current Spark cluster (detected through the MASTER
environment variable), starting a local cluster with as many threads as cores if
no cluster exists. Using MASTER=local PYSPARK_DRIVER_PYTHON=ipython3 pyspark
starts a local cluster with just a single thread.
To create a distributed cluster using Slurm, you can run one of the following
commands:
spark-srun [salloc args] workers memory_per_workerG[:python_memoryG]
: start Spark cluster and perform
a srun
inside it
spark-sbatch [sbatch args] workers memory_per_workerG[:python_memoryG] command [arguments...]
: start Spark
cluster and execute the given command inside it
A good default for memory per worker is 2G
; the default value for the
python_memoryG
is 2G
. If you want to save memory, use memory specification
1G:1G
.
Example
Start by running spark-srun 50 2G
. When the cluster starts, it prints a URL
where it can be monitored. After the cluster starts, execute
PYSPARK_DRIVER_PYTHON=ipython3 pyspark
.
Then, try running the following:
(sc.textFile("/net/data/npfl118/wiki/en/wiki.txt", 3*sc.defaultParallelism)
.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda c1, c2: c1 + c2)
.sortBy(lambda word_count: word_count[1], ascending=False)
.take(10))
Running a Script
To execute a script instead of running from an interactive shell, you need to
create the SparkContext
manually:
word_count.py
(available in /net/data/npfl118/examples/
)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("input", type=str, help="Input file/path")
parser.add_argument("output", type=str, help="Output directory")
args = parser.parse_args()
import pyspark
sc = pyspark.SparkContext()
input = sc.textFile(args.input, 3*sc.defaultParallelism)
words = input.flatMap(lambda line: line.split())
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda c1, c2: c1 + c2)
sorted = counts.sortBy(lambda word_count: word_count[1], ascending=False)
sorted.saveAsTextFile(args.output)
You can run such script using
spark-submit script.py input_path output_path
When executed inside a spark-srun/spark-sbatch
session, it connects to the
running cluster; otherwise, it starts a local cluster with as many threads as
cores (or with just a single thread if MASTER=local
is used).
If you want to use a specific virtual environment, you can use
PYSPARK_PYTHON=path_to_python_in_virtual_env spark-submit ...
Basic Methods
SparkContext.textFile
,
SparkContext.parallelize
, …
RDD.collect
,
RDD.take
,
RDD.saveAsTextFile
,
RDD.coalesce
,
RDD.repartition
, …
RDD.map
,
RDD.flatMap
,
RDD.count
,
RDD.distinct
,
RDD.sortByKey
,
RDD.reduceByKey
,
RDD.groupByKey
,
RDD.sample
, …
RDD.cache
,
RDD.unpersist
,
RDD.pipe
, …
Further Pointers
Apache Spark overview
Apache Spark documentation
PySpark API reference
Spark on ÚFAL
Reading Input on ÚFAL wiki
Writing Output on ÚFAL wiki
No Cheating
Cheating is strictly prohibited and any student found cheating will be punished. The punishment can involve failing the whole course, or, in grave cases, being expelled from the faculty.
Discussing homework assignments with your classmates is OK. Sharing code is not OK (unless explicitly allowed); by default, you must complete the assignments yourself.
All students involved in cheating will be punished. If you share your assignment with a friend, both you and your friend will be punished.
Assignment Submission
Please send us a directory where your solutions (sources and also
outputs of your solutions) are, by the 17th November.
Input Data
For the assignments, you can find the input data in /net/data/npfl118
.
Most assignments use the following Wikipedia data:
wiki/cs/wiki.txt
: Czech Wikipedia data (Sep 2009), file size 195MB, 124k articles.
wiki/en/wiki.txt
: English Wikipedia data (Sep 2009), File size 4.9GB, 2.9M articles.
wiki/cs/wiki-small.txt
, wiki/en/wiki-small.txt
: First 1000 articles of the
respective Wikipedias.
The files are in UTF-8 and contain one article per line. Article name is
separated by a \t
character from the article content.
unique_words
required
Template: You can start with /net/data/npfl118/examples/{articles.sh,articles_distributed.sh}
Implement a Slurm distributed job to create a list of unique words used in all
article texts (the article titles are not considered part of article texts).
Convert the texts to lowercase to ignore case (and make sure the lowercasing
works also for non-ASCII characters).
Because the article data is not tokenized, use one of the following scripts:
/net/data/npfl118/wiki/cs/tokenizer
for tokenization of Czech,
/net/data/npfl118/wiki/en/tokenizer
for tokenization of English.
Both scripts read untokenized UTF-8 text from standard input and produce
tokenized UTF-8 text on standard output; the line breaks are preserved and
tokens on every lines are separated by exactly one space.
inverted_index
either this or spark_inverted_index
is required
In a distributed way, compute inverted index – for every lemma from the articles, compute ascending
(article id, ascending positions of occurrences as word indices) pairs. In
order to do so, number the articles using consecutive integers and produce also
a list of articles representing this mapping (the article on line i is the
article with id i; you can use the example articles.sh
).
The output should be a file with the list of articles ordered by article id,
and a file with one lemma on a line in this format:
lemma \t article_id \t space separated occurrence indices \t article_id \t space separated occurrence indices ...
The lemmas should be sorted alphabetically, and on a single line, both the
article_id
s and the occurrence indices
should be in ascending order
To generate the lemmas, use the provided
/net/data/npfl118/wiki/{cs,en}/lemmatizer
, which again reads untokenized UTF-8
text and outputs the space separated lemmas on the output, preserving line
breaks.
gpu_multiplication
required
Template: /net/data/npfl118/assignments/gpu_multiplication.py
Install PyTorch 2.4.* in a virtual environment directory venv
:
/opt/python/3.11.4/bin/python3 -m venv venv
venv/bin/python -m pip install torch~=2.4.1 numpy
Note that CUDA 12.1 will be installed automatically, so no CUDA configuration is
necessary. To use a different GPU backend (older CUDA, ROCm, only CPU), see
PyTorch installation page.
Finally, use /net/data/npfl118/assignments/gpu_multiplication.py
to measure how long it
takes to compute matrix multiplication, both on a CPU and GPU version.
The given script measures the required time for all given matrix dimensions.
For CPU version with 1 thread, use dimensions up to 10000 with step 1000.
For CPU version with 8 threads, use dimensions up to 10000 with step 1000.
For GPU version, use dimensions up to 20000 with step 1000.
Finally, evaluate the GPU version with --tf32
(with the same dimensions),
which enables TensorFloat-32 in matrix multiplication. You need to run on
a card with Compute Capability at least 8.0 (the Ampere microarchitecture).
Finally, estimate the speedup for this task of using:
8 CPU threads instead of 1 CPU thread;
GPU instead of 1 CPU thread.
TensorFloat-32 instead of regular floats on a GPU.
spark_lemmas
required
Template: /net/data/npfl118/assignments/spark_lemmas.py
Using the provided /net/data/npfl118/wiki/{cs,en}/lemmatizer
, generate
list of 100 most frequent lemmas in Czech and English wiki on standard output.
To utilize the lemmatizer, use rdd.pipe
.
spark_anagrams
required
Template: /net/data/npfl118/assignments/spark_anagrams.py
Two words are anagrams if one is a character permutation of the other
(ignoring case).
For a given wiki language, find all anagram classes that contain at least A
different words (a parameter of the script). Output each anagram class (unique words with
the same character permutation) on a separate line.
Use the /net/data/npfl118/wiki/{cs,en}/tokenizer
, to tokenize the input,
again using rdd.pipe
.
spark_inverted_index
either this or inverted_index
is required
Template: /net/data/npfl118/assignments/spark_inverted_index.py
In a distributed way, compute an inverted index – for every lemma from the
articles, compute ascending (article id, ascending positions of occurrences as word indices)
pairs. In order to do so, number the articles (in any order) using consecutive
integers (the article id), and produce also a file (or several files) with
a list of article titles representing this mapping (the article title on line
i is the article with id i).
The output should be
a single file with the list of article titles ordered by article id;
the inverted index consisting of several files (the whole index could be
large, so it should be split into files of tens or hundreds of MBs).
Each file should contain several lemmas, each on a single line in this format:
lemma \t article_id \t space separated occurrence indices \t article_id \t space separated occurrence indices ...
where both the article_id
s and the occurrence indices
are in ascending order.
Lemmas should be sorted alphabetically (inside files and also across files),
but we allow a lemma to appear in multiple consecutive files; in that case,
the article_id
s should be in ascending order even across all lemma lines.
To be able to perform the computation in a distributed way, each worker should
alway process data of a constant size. Therefore:
you cannot fit the whole dictionary mapping article names to ids in memory
(to avoid it, see for example zipWithIndex);
you cannot fit all articles and all occurrences of a single lemma in memory at
once (that is why a lemma can appear in multiple consecutive files; the
required output format can be generated for example using
mapPartitions);
on the other hand, you can fit a single article in memory (and therefore also
occurrences of a lemma in a single document), and you can also fit one of
the index files (i.e., an RDD partition) in memory.
To generate the lemmas, use the provided
/net/data/npfl118/wiki/{cs,en}/lemmatizer
, which again reads untokenized UTF-8
text and outputs the space separated lemmas on the output, preserving line
breaks.
spark_kmeans
required
Template: /net/data/npfl118/assignments/spark_kmeans.py
Implement the basic variant of the
K-means clustering algorithm.
The algorithm should be implemented directly using RDDs
,
without using existing implementations like pyspark.ml
.
Use the following data:
Number of points
Number of dimensions
Number of clusters
Run for the given number of iterations. Print the distance by which the centers
move each iteration, and stop the algorithm if this distance is less than
a given epsilon. Once finished, print the found centers. Your implementation
should be able to converge on all the three datasets.
Materials
Introduction to ÚFAL
at ÚFAL wiki (you will need an ÚFAL wiki account to access that site; each
ÚFAL PhD student is entitles to get a wiki account).
ÚFAL cluster
Apache Spark overview,
Apache Spark documentation,
PySpark API reference,
Spark on ÚFAL.