Calcul parallèle#
Julia permet nativement le calcul parallèle plus précisément le calcul parallèle à mémoire partagée ou distribuée.
Le document référence officiel : https://docs.julialang.org/en/v1/manual/parallel-computing/
using Plots
gr(fmt=:png)
Plots.GRBackend()
En tout premier lieu on peut lancer une instance de Julia avec plusieurs processeurs (ou plusieurs instances de Julia) à l’aide de la commande shell
julia -p 4
Avec cette option le package Distributed
est automatiquement importé.
Autrement on peut à la volée ajouter/enlever des processeurs ou plutôt workers à l’aide des commandes addproc()
rmproc()
et worker()
ainsi que les commandes nprocs
et nworker
using Distributed
workers() # par défaut un worker le "maitre"
1-element Vector{Int64}:
1
addprocs(2) #ajout de 2 workers
2-element Vector{Int64}:
2
3
workers() # le processeur 1 n'est plus worker il est devenu master
2-element Vector{Int64}:
2
3
Parallélisation avec Distributed#
Julia offre une façon simple de programmer des processus asynchrones à l’aide de fonction pmap
et la macro @spawnat
@everywhere begin # la fonction slow_add est définie sur tous les processus
function slow_add( x )
sleep(1)
x + 1
end
end
@elapsed results = [slow_add(x) for x in 1:8]
8.028307405
results
8-element Vector{Int64}:
2
3
4
5
6
7
8
9
@elapsed results = pmap(slow_add, 1:8 )
4.491755599
results
8-element Vector{Int64}:
2
3
4
5
6
7
8
9
futures = []
for x in 1:8
future = @spawnat :any slow_add(x) # remplacez :any par un id pour exécuter
push!(futures, future) # sur un processus particulier
end
@elapsed results = [fetch(f) for f in futures] # les futures ont déjà été exécutées
1.451029865
results
8-element Vector{Int64}:
2
3
4
5
6
7
8
9
Regardons plus exactement la répartition du calcul pour cela utilisons un tableau partagé SharedArray
(voir également SharedMatrix
, SharedVector
). Autrement dit un tableau qui sera accessible en lecture et en écriture pour chaque worker (et le master).
using SharedArrays
taille = 8
a = SharedArray{Int}(taille) # Tableau partagé
@distributed for i=1:taille
a[i]=myid() # renvoie le numéro du worker stocké dans a
end
Task (runnable) @0x00007fae4823a720
a
8-element SharedVector{Int64}:
2
2
2
2
3
3
3
3
La boucle à été partagée et envoyée pour \(i=1:4\) sur le worker 2 et pour \(i=5:7\) sur le worker 3.
Regardons si nous n’avions pas utilisé de tableau partagé
taille=8
a = zeros(taille) # Tableau non partagé il est ici en local myid()==1
@sync begin
@distributed for i=1:taille
a[i]=myid() # renvoie le numéro du worker
println(a)
end
end
From worker 2: [2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
From worker 3: [0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0]
From worker 3: [0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 0.0, 0.0]
From worker 3: [0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 3.0, 0.0]
From worker 3: [0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 3.0, 3.0]
From worker 2: [2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
From worker 2: [2.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0]
From worker 2: [2.0, 2.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0]
Task (done) @0x00007fadac225910
On peut voire que localement les workers ont accès à une copie de a
dont ils peuvent modifier les valeurs “localement”.
En revanche
println(a)
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
la valeur sur master n’a pas été modifiée. Autrement dit chaque worker a utilisé une copie locale de a, localement modifiable mais une fois l’exécution terminée cette copie est effacée.
a = ones(taille)
@sync begin
@distributed for i=1:2
println(a) # nouvelle copie locale
end
end
From worker 2: [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
From worker 3: [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
Task (done) @0x00007fae46a6b850
Application Bassin d’attraction de Newton :#
Plaçons nous sur un exemple de calcul de bassin d’attraction de Newton
premier calcul mono-processeur (temps de référence)#
using LinearAlgebra # Pour la fonction `norm`
" Algorithme de Newton "
function newton(x0,f,df,epsi)
k = 0
x = x0
xnew = x - df(x)\f(x)
while (norm(x-xnew)>epsi)&(k<1000)
x = xnew
xnew = x-df(x)\f(x)
end
return xnew
end
newton
# fonction f(x)=0 à résoudre ici z=x+iy et f(z)=z^3-1
f(x)=[x[1]^3-3*x[1]*x[2]^2-1,3*x[1]^2*x[2]-x[2]^3]
# le jacobien de f
df(x)=[3*x[1]^2-3*x[2]^2 -6*x[1]*x[2];6*x[1]*x[2] 3*x[1]^2-3*x[2]^2]
"""
Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
"""
function calc_bassin(f,df,n)
x=LinRange(-1,1,n);
y=LinRange(-1,1,n);
zi =zeros(n,n);
for i=1:n
for j=1:n
r=newton([x[i],y[n-j+1]],f,df,1e-10)
zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
end
end
return zi
end
calc_bassin
zi = calc_bassin(f,df,4); # pour la compilation
n = 1024 # un deuxième appel est plus rapide
@time zi = calc_bassin(f,df,n);
5.760837 seconds (111.59 M allocations: 5.884 GiB, 7.13% gc time)
contourf(zi, aspect_ratio = :equal)
Newton en parallèle#
workers()
2-element Vector{Int64}:
2
3
Dans un premier temps on redéfinit les fonctions newton
, f
et df
pour tous les workers à l’aide de la macro @everywhere
@everywhere begin
using LinearAlgebra
# algorithme de Newton
function newton(x0,f,df,epsi)
k=0;
x=x0;
xnew=x-df(x)\f(x);
while (norm(x-xnew)>epsi) & (k<1000)
x=xnew
xnew=x-df(x)\f(x);
end
return xnew
end
# fonction f(x)=0 à résoudre ici z=x+iy et f(z)=z^3-1
f(x)=[x[1]^3-3*x[1]*x[2]^2-1,3*x[1]^2*x[2]-x[2]^3]
# le jacobien de f
df(x)=[3*x[1]^2-3*x[2]^2 -6*x[1]*x[2];6*x[1]*x[2] 3*x[1]^2-3*x[2]^2]
# Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
end
Puis on uitlise @parallel
dans la boucle extérieure ainsi que @sync
pour resynchroniser les process autrement attendre que tous les workers aient fini leurs calculs
function calc_bassin_distributed(f,df,n)
x = LinRange(-1,1,n);
y = LinRange(-1,1,n);
zi = SharedArray{Int}(n,n);
@sync begin
@distributed for i=1:n
for j=1:n
r=newton([x[i],y[n-j+1]],f,df,1e-10)
zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
end
end
end
return zi
end
calc_bassin_distributed (generic function with 1 method)
n = 1024 # pour efficacité il faut relancer 2 fois
@time zi = calc_bassin_distributed(f,df,n);
4.508568 seconds (296.81 k allocations: 20.249 MiB, 4.95% compilation time)
contourf( zi, aspect_ratio = :equal)
Encore plus de //#
On a utilisé les macros @everywhere
@distributed
@sync
regardons
remotecall
appel asynchrone (n’attends pas le résultat dit appel non bloquant)fetch
récupère le résultat (synchronise)remotecall_fetch
les 2 commandes précédentes réunies ou appel bloquant (tant que le résultat n’est pas calculé la fonction ne rend pas la main.remotecall_wait
appel bloquant attend la fin de l’éxacution.@async
exécute le bloc en mode asynchrone.@spawn
et@spawnat
macros pourremotecall
for proc=workers()
println(remotecall_fetch(myid, proc)) # demande l'exécution de myid() sur chaque proc
end
2
3
data=zeros(nworkers())
for (i,w) in enumerate(workers())
data[i]=remotecall_fetch(myid, w)
end
data # data contient les numéros des workers
2-element Vector{Float64}:
2.0
3.0
for i=1:10
r = @spawn myid()
println("process: $(r.where) int: $(fetch(r))")
end
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3
Newton parallel 2#
Reprenons notre algorithme de calcul de bassin
# Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
function calc_bassin_async(f,df,n)
x=LinRange(-1,1,n);
y=LinRange(-1,1,n);
zi=zeros(n,n);
k=0
wo=workers()
nw=nworkers()
@sync begin
for i=1:n
for j=1:n
@async begin # lancement asynchrone
k=k+1
wk=wo[k%nw+1] # worker suivant
r = @spawnat :any newton([x[i],y[n-j+1]],f,df,1e-10)
zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
end # ens @async
end
end
end # end @sync
return zi
end
calc_bassin_async (generic function with 1 method)
n=32 # efficacité pas très grande voir catastrophique !
@time zi = calc_bassin_async(f,df,n);
0.562257 seconds (222.36 k allocations: 13.892 MiB, 18.21% compilation time)
On peut voir que le code n’est pas très efficace on perd beaucoup de temps en communication on fait trop d’appels aux workers (\(n^2\)).
Changeons de principe et divisons la tache en nombre de workers :
# résoltion d'un block du probleme du bassin de Newtown
@everywhere function calc_block(f,df,x,y)
n=length(x)
m=length(y)
zi = zeros(m,n)
for i=1:n
for j=1:m
@inbounds r=newton([x[i],y[m-j+1]],f,df,1e-10)
@inbounds zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
end
end
return zi
end
function calc_bassin_parallel(f,df,n)
x=LinRange(-1,1,n)
y=LinRange(1,-1,n)
zi=zeros(n,n)
# permet la répartition du travail
wo=workers()
nw=nworkers()
parts = Iterators.partition(1:n, n ÷ nw)
@sync for (i,part) in enumerate(parts) # répartition sur tous les workers
@async begin
zi[:,part] .=
remotecall_fetch(calc_block, wo[i], f,df,x[part],y)
end # end @async
end
return zi
end
calc_bassin_parallel (generic function with 1 method)
n=1024 # efficacité correcte
@time zi=calc_bassin_parallel(f,df,n);
3.165582 seconds (427.35 k allocations: 44.737 MiB, 0.54% gc time, 9.16% compilation time)
contourf( zi, aspect_ratio = :equal)
En diminuant le nombre d’appels on à retrouver une efficacité de l’ordre de 88% \(\left(\dfrac{T_{ref}}{T_{multiproc} * nbproc}\right)\) comme avec la macro @parallel
précédente.
rmprocs(workers())
nworkers()
1
Threads#
Lisez ce billet de blog pour une présentation des fonctionnalités multi-threading de Julia.
Par défaut Julia n’utilise qu’un seul thread. Pour le démarrer avec plusieurs threads nous devons le lui dire explicitement :
using IJulia
installkernel("Julia (4 threads)", "--project=@.", env=Dict("JULIA_NUM_THREADS"=>"4"))
Note : Changer manuellement dans le menu “Kernel” pour chaque nouvelle version de Julia et vous devez recharger votre navigateur pour voir un effet.
Pour vérifier que cela a fonctionné :
import Base.Threads
using LinearAlgebra
Threads.nthreads()
8
@elapsed begin
Threads.@threads for i in 1:Threads.nthreads()
sleep(1)
println(" Bonjour du thread $(Threads.threadid())")
end
end
Bonjour du thread 1
Bonjour du thread 4
Bonjour du thread 8
Bonjour du thread 7
Bonjour du thread 2
Bonjour du thread 6
Bonjour du thread 5
Bonjour du thread 3
1.057713378
function slow_add(x, y)
sleep(1)
x + y
end
v = 1:8
w = zero(v)
etime = @elapsed begin
Threads.@threads for i in eachindex(v)
w[i] = slow_add( v[i], v[i])
end
end
etime, w
(1.050349191, [2, 4, 6, 8, 10, 12, 14, 16])
Je reprends l’exemple ci-dessus de la fractale et je parallélise avec la mémoire partagée.
function newton(x0,f,df,epsi)
k=0
x=x0
xnew=x-df(x)\f(x)
while (norm(x-xnew)>epsi)&(k<1000)
x=xnew
xnew=x-df(x)\f(x)
end
return xnew
end
# fonction f(x)=0 à résoudre ici z=x+iy et f(z)=z^3-1
f(x)=[x[1]^3-3*x[1]*x[2]^2-1,3*x[1]^2*x[2]-x[2]^3]
# le jacobien de f
df(x)=[3*x[1]^2-3*x[2]^2 -6*x[1]*x[2];6*x[1]*x[2] 3*x[1]^2-3*x[2]^2]
# Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
function calc_bassin_threads(f,df,n)
x = LinRange(-1,1,n)
y = LinRange(-1,1,n)
zi = zeros(n,n)
Threads.@threads for i in eachindex(x)
for j in eachindex(y)
@inbounds r = newton([x[i],y[n-j+1]],f,df,1e-10)
@inbounds zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3
end
end
return zi
end
calc_bassin_threads (generic function with 1 method)
n=1024 # un deuxième appel est plus rapide
@time zi=calc_bassin_threads(f,df,n);
14.679266 seconds (103.27 M allocations: 5.764 GiB, 3.90% gc time, 2.60% compilation time)
contourf(zi, aspect_ratio = :equal)
Cas particulier des opérations de réduction parallèle#
La fonction suivante permet de calculer l’histogramme de particules tirées au hasard avec une distribution normale
using Random, Plots
const nx = 64
const np = 100000
const xmin = -6
const xmax = +6
rng = MersenneTwister(123)
xp = randn(rng, np)
function serial_deposition( xp )
Lx = xmax - xmin
rho = zeros(Float64, nx)
fill!(rho, 0.0)
for i in eachindex(xp)
x_norm = (xp[i]-xmin) / Lx
ip = trunc(Int, x_norm * nx)+1
rho[ip] += 1
end
rho
end
histogram(xp, bins=nx)
plot!(LinRange(xmin, xmax, nx), serial_deposition(xp), lw = 5)
Si on essaye de paralléliser cette boucle, on se heurte à un problème d’accès concurrents. Chaque fil d’execution a accès à la mémoire occupée par le tableau rho
. Lorsque deux threads accèdent à la mémoire en même temps c’est le dernier qui met à jour le tableau qui gagne et la modification faite par l’autre thread n’est pas toujours comptabilisé. Ce type d’opération de réduction n’est pas (encore) prise en charge par Julia.
function parallel_deposition_bad( xp )
Lx = xmax - xmin
rho = zeros(Float64, nx)
Threads.@threads :static for i in eachindex(xp)
x_norm = (xp[i]-xmin) / Lx
ip = trunc(Int, x_norm * nx)+1
rho[ip] += 1
end
rho
end
histogram(xp, bins=nx)
plot!(LinRange(xmin, xmax, nx), parallel_deposition_bad(xp), lw = 5)
On peut résoudre ce problème en créant une version de rho
nommée rho_local
qui sera propre à chaque thread.
Il suffira ensuite de sommer ces tableaux locaux pour calculer le tableau global.
function parallel_deposition( xp )
Lx = xmax - xmin
rho = zeros(Float64, nx)
ntid = Threads.nthreads()
rho_local = [zero(rho) for _ in 1:ntid]
chunks = Iterators.partition(1:np, np÷ntid)
Threads.@sync for chunk in chunks
Threads.@spawn begin
tid = Threads.threadid()
fill!(rho_local[tid], 0.0)
for i in chunk
x_norm = (xp[i]-xmin) / Lx
ip = trunc(Int, x_norm * nx)+1
rho_local[tid][ip] += 1
end
end
end
rho .= reduce(+,rho_local)
rho
end
histogram(xp, bins=nx)
plot!(LinRange(xmin, xmax, nx), parallel_deposition(xp), lw = 5)