Calcul parallèle#

Julia permet nativement le calcul parallèle plus précisément le calcul parallèle à mémoire partagée ou distribuée.

Le document référence officiel : https://docs.julialang.org/en/v1/manual/parallel-computing/

using Plots
gr(fmt=:png)
Plots.GRBackend()

En tout premier lieu on peut lancer une instance de Julia avec plusieurs processeurs (ou plusieurs instances de Julia) à l’aide de la commande shell

julia -p 4

Avec cette option le package Distributed est automatiquement importé.

Autrement on peut à la volée ajouter/enlever des processeurs ou plutôt workers à l’aide des commandes addproc() rmproc() et worker() ainsi que les commandes nprocs et nworker

using Distributed

workers() # par défaut un worker le "maitre"
1-element Vector{Int64}:
 1
addprocs(2) #ajout de 2 workers
2-element Vector{Int64}:
 2
 3
workers() # le processeur 1 n'est plus worker il est devenu master
2-element Vector{Int64}:
 2
 3

Parallélisation avec Distributed#

Julia offre une façon simple de programmer des processus asynchrones à l’aide de fonction pmap et la macro @spawnat

@everywhere begin  # la fonction slow_add est définie sur tous les processus
    function slow_add( x )
        sleep(1)
        x + 1
     end
end

@elapsed results = [slow_add(x) for x in 1:8]
8.028307405
results
8-element Vector{Int64}:
 2
 3
 4
 5
 6
 7
 8
 9
@elapsed results = pmap(slow_add, 1:8 )
4.491755599
results
8-element Vector{Int64}:
 2
 3
 4
 5
 6
 7
 8
 9
futures = []
for x in 1:8
    future = @spawnat :any slow_add(x)  # remplacez :any par un id pour exécuter 
    push!(futures, future)              # sur un processus particulier
end
@elapsed results = [fetch(f) for f in futures] # les futures ont déjà été exécutées
1.451029865
results
8-element Vector{Int64}:
 2
 3
 4
 5
 6
 7
 8
 9

Regardons plus exactement la répartition du calcul pour cela utilisons un tableau partagé SharedArray (voir également SharedMatrix, SharedVector). Autrement dit un tableau qui sera accessible en lecture et en écriture pour chaque worker (et le master).

using SharedArrays
taille = 8
a = SharedArray{Int}(taille) # Tableau partagé
@distributed for i=1:taille
    a[i]=myid() # renvoie le numéro du worker stocké dans a
end
Task (runnable) @0x00007fae4823a720
a
8-element SharedVector{Int64}:
 2
 2
 2
 2
 3
 3
 3
 3

La boucle à été partagée et envoyée pour \(i=1:4\) sur le worker 2 et pour \(i=5:7\) sur le worker 3.

Regardons si nous n’avions pas utilisé de tableau partagé

taille=8
a = zeros(taille) # Tableau non partagé il est ici en local myid()==1
@sync begin
    @distributed for i=1:taille
    a[i]=myid() # renvoie le numéro du worker
    println(a)
    end
end
      From worker 2:	[2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
      From worker 3:	[0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0]
      From worker 3:	[0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 0.0, 0.0]
      From worker 3:	[0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 3.0, 0.0]
      From worker 3:	[0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 3.0, 3.0]
      From worker 2:	[2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
      From worker 2:	[2.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0]
      From worker 2:	[2.0, 2.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0]
Task (done) @0x00007fadac225910

On peut voire que localement les workers ont accès à une copie de a dont ils peuvent modifier les valeurs “localement”.

En revanche

println(a)
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

la valeur sur master n’a pas été modifiée. Autrement dit chaque worker a utilisé une copie locale de a, localement modifiable mais une fois l’exécution terminée cette copie est effacée.

a = ones(taille)
@sync begin
    @distributed for i=1:2
        println(a) # nouvelle copie locale
    end
end
      From worker 2:	[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
      From worker 3:	[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
Task (done) @0x00007fae46a6b850

Application Bassin d’attraction de Newton :#

Plaçons nous sur un exemple de calcul de bassin d’attraction de Newton

premier calcul mono-processeur (temps de référence)#

using LinearAlgebra    # Pour la fonction `norm`
" Algorithme de Newton "
function newton(x0,f,df,epsi)
    k = 0
    x = x0
    xnew = x - df(x)\f(x)
    while (norm(x-xnew)>epsi)&(k<1000)
        x = xnew
        xnew = x-df(x)\f(x)
    end
    return xnew
end
newton
# fonction f(x)=0 à résoudre ici z=x+iy et f(z)=z^3-1
f(x)=[x[1]^3-3*x[1]*x[2]^2-1,3*x[1]^2*x[2]-x[2]^3]
# le jacobien de f
df(x)=[3*x[1]^2-3*x[2]^2 -6*x[1]*x[2];6*x[1]*x[2] 3*x[1]^2-3*x[2]^2]

"""
Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
"""
function calc_bassin(f,df,n)
    x=LinRange(-1,1,n);
    y=LinRange(-1,1,n);
    zi =zeros(n,n);
    for i=1:n
        for j=1:n
            r=newton([x[i],y[n-j+1]],f,df,1e-10)
            zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
        end
    end
    return zi
end
calc_bassin
zi = calc_bassin(f,df,4); # pour la compilation
n = 1024 # un deuxième appel est plus rapide
@time zi = calc_bassin(f,df,n); 
  5.760837 seconds (111.59 M allocations: 5.884 GiB, 7.13% gc time)
contourf(zi, aspect_ratio = :equal)

Newton en parallèle#

workers()
2-element Vector{Int64}:
 2
 3

Dans un premier temps on redéfinit les fonctions newton, f et df pour tous les workers à l’aide de la macro @everywhere

@everywhere begin
    
using LinearAlgebra

# algorithme de Newton
function newton(x0,f,df,epsi)
    k=0;
    x=x0;
    xnew=x-df(x)\f(x);
    while (norm(x-xnew)>epsi) & (k<1000)
        x=xnew
        xnew=x-df(x)\f(x);
    end
    return xnew
end
# fonction f(x)=0 à résoudre ici z=x+iy et f(z)=z^3-1
f(x)=[x[1]^3-3*x[1]*x[2]^2-1,3*x[1]^2*x[2]-x[2]^3]
# le jacobien de f
df(x)=[3*x[1]^2-3*x[2]^2 -6*x[1]*x[2];6*x[1]*x[2] 3*x[1]^2-3*x[2]^2]
# Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
end

Puis on uitlise @parallel dans la boucle extérieure ainsi que @sync pour resynchroniser les process autrement attendre que tous les workers aient fini leurs calculs

function calc_bassin_distributed(f,df,n)
    x = LinRange(-1,1,n);
    y = LinRange(-1,1,n);
    zi = SharedArray{Int}(n,n);
    @sync begin
        @distributed for i=1:n
             for j=1:n
                r=newton([x[i],y[n-j+1]],f,df,1e-10)
                zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
            end
        end
    end
    return zi
end
calc_bassin_distributed (generic function with 1 method)
n = 1024 # pour efficacité il faut relancer 2 fois
@time zi = calc_bassin_distributed(f,df,n); 
  4.508568 seconds (296.81 k allocations: 20.249 MiB, 4.95% compilation time)
contourf( zi, aspect_ratio = :equal)

Encore plus de //#

On a utilisé les macros @everywhere @distributed @sync regardons

  • remotecall appel asynchrone (n’attends pas le résultat dit appel non bloquant)

  • fetch récupère le résultat (synchronise)

  • remotecall_fetch les 2 commandes précédentes réunies ou appel bloquant (tant que le résultat n’est pas calculé la fonction ne rend pas la main.

  • remotecall_wait appel bloquant attend la fin de l’éxacution.

  • @async exécute le bloc en mode asynchrone.

  • @spawn et @spawnat macros pour remotecall

for proc=workers()
    println(remotecall_fetch(myid, proc)) # demande l'exécution de myid() sur chaque proc
end
2
3
data=zeros(nworkers())
for (i,w) in enumerate(workers())
    data[i]=remotecall_fetch(myid, w)
end
data # data contient les numéros des workers  
2-element Vector{Float64}:
 2.0
 3.0
for i=1:10
    r = @spawn myid() 
    println("process: $(r.where) int: $(fetch(r))")  
end  
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3
process: 2 int: 2
process: 3 int: 3

Newton parallel 2#

Reprenons notre algorithme de calcul de bassin

# Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
function calc_bassin_async(f,df,n)
    x=LinRange(-1,1,n);
    y=LinRange(-1,1,n);
    zi=zeros(n,n);
    k=0
    wo=workers()
    nw=nworkers()
    @sync begin
        for i=1:n
            for j=1:n
                @async begin    # lancement asynchrone      
                    k=k+1
                    wk=wo[k%nw+1] # worker suivant
                    r = @spawnat :any newton([x[i],y[n-j+1]],f,df,1e-10)
                    zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
            end # ens @async
            end
        end     
    end # end @sync
    return zi
end
calc_bassin_async (generic function with 1 method)
n=32 # efficacité pas très grande voir catastrophique !
@time zi = calc_bassin_async(f,df,n); 
  0.562257 seconds (222.36 k allocations: 13.892 MiB, 18.21% compilation time)

On peut voir que le code n’est pas très efficace on perd beaucoup de temps en communication on fait trop d’appels aux workers (\(n^2\)).

Changeons de principe et divisons la tache en nombre de workers :

# résoltion d'un block du probleme du bassin de Newtown
@everywhere function calc_block(f,df,x,y)
    n=length(x)
    m=length(y)
    zi = zeros(m,n)
    for i=1:n
        for j=1:m
            @inbounds r=newton([x[i],y[m-j+1]],f,df,1e-10)
            @inbounds zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3;
        end
    end
    return zi
end
function calc_bassin_parallel(f,df,n)
    x=LinRange(-1,1,n)
    y=LinRange(1,-1,n)
    zi=zeros(n,n)
    
    # permet la répartition du travail
    wo=workers()
    nw=nworkers()
    parts = Iterators.partition(1:n, n ÷ nw)
    
    @sync for (i,part) in enumerate(parts) # répartition sur tous les workers
        @async begin  
            zi[:,part] .=
                remotecall_fetch(calc_block, wo[i], f,df,x[part],y)
            end # end @async
    end     
    return zi
end
calc_bassin_parallel (generic function with 1 method)
n=1024 # efficacité correcte 
@time zi=calc_bassin_parallel(f,df,n); 
  3.165582 seconds (427.35 k allocations: 44.737 MiB, 0.54% gc time, 9.16% compilation time)
contourf( zi, aspect_ratio = :equal)

En diminuant le nombre d’appels on à retrouver une efficacité de l’ordre de 88% \(\left(\dfrac{T_{ref}}{T_{multiproc} * nbproc}\right)\) comme avec la macro @parallel précédente.

rmprocs(workers())
nworkers()
1

Threads#

Lisez ce billet de blog pour une présentation des fonctionnalités multi-threading de Julia.

Par défaut Julia n’utilise qu’un seul thread. Pour le démarrer avec plusieurs threads nous devons le lui dire explicitement :

using IJulia
installkernel("Julia (4 threads)", "--project=@.", env=Dict("JULIA_NUM_THREADS"=>"4"))

Note : Changer manuellement dans le menu “Kernel” pour chaque nouvelle version de Julia et vous devez recharger votre navigateur pour voir un effet.

Pour vérifier que cela a fonctionné :

import Base.Threads
using LinearAlgebra

Threads.nthreads()
8
@elapsed begin
    Threads.@threads for i in 1:Threads.nthreads()
        sleep(1)
        println(" Bonjour du thread $(Threads.threadid())")
    end
end
 Bonjour du thread 1
 Bonjour du thread 4
 Bonjour du thread 8
 Bonjour du thread 7
 Bonjour du thread 2
 Bonjour du thread 6
 Bonjour du thread 5
 Bonjour du thread 3
1.057713378
function slow_add(x, y)
    sleep(1)
    x + y
end

v = 1:8
w = zero(v)

etime = @elapsed begin
    Threads.@threads for i in eachindex(v)
        w[i] = slow_add( v[i], v[i])
    end
end

etime, w
(1.050349191, [2, 4, 6, 8, 10, 12, 14, 16])

Je reprends l’exemple ci-dessus de la fractale et je parallélise avec la mémoire partagée.

function newton(x0,f,df,epsi)
    k=0
    x=x0
    xnew=x-df(x)\f(x)
    while (norm(x-xnew)>epsi)&(k<1000)
        x=xnew
        xnew=x-df(x)\f(x)
    end
    return xnew
end

# fonction f(x)=0 à résoudre ici z=x+iy et f(z)=z^3-1
f(x)=[x[1]^3-3*x[1]*x[2]^2-1,3*x[1]^2*x[2]-x[2]^3]
# le jacobien de f
df(x)=[3*x[1]^2-3*x[2]^2 -6*x[1]*x[2];6*x[1]*x[2] 3*x[1]^2-3*x[2]^2]

# Calcul du bassin si on converge vers la Ieme racine suivant le point de départ
function calc_bassin_threads(f,df,n)
    x = LinRange(-1,1,n)
    y = LinRange(-1,1,n)
    zi = zeros(n,n)
    Threads.@threads for i in eachindex(x)
        for j in eachindex(y)
            @inbounds r = newton([x[i],y[n-j+1]],f,df,1e-10)
            @inbounds zi[j,i]=(round(atan(r[2],r[1])*3/pi)+3)%3
        end
    end
    return zi
end
calc_bassin_threads (generic function with 1 method)
n=1024 # un deuxième appel est plus rapide
@time zi=calc_bassin_threads(f,df,n); 
 14.679266 seconds (103.27 M allocations: 5.764 GiB, 3.90% gc time, 2.60% compilation time)
contourf(zi, aspect_ratio = :equal)

Cas particulier des opérations de réduction parallèle#

La fonction suivante permet de calculer l’histogramme de particules tirées au hasard avec une distribution normale

using Random, Plots

const nx = 64
const np = 100000
const xmin = -6
const xmax = +6


rng = MersenneTwister(123)
xp = randn(rng, np)

function serial_deposition( xp )

    Lx = xmax - xmin
    rho = zeros(Float64, nx)
    
    fill!(rho, 0.0)

    for i in eachindex(xp)
        x_norm = (xp[i]-xmin) / Lx
        ip = trunc(Int,  x_norm * nx)+1
        rho[ip] += 1
    end

    rho

end

histogram(xp, bins=nx)
plot!(LinRange(xmin, xmax, nx), serial_deposition(xp), lw = 5)

Si on essaye de paralléliser cette boucle, on se heurte à un problème d’accès concurrents. Chaque fil d’execution a accès à la mémoire occupée par le tableau rho. Lorsque deux threads accèdent à la mémoire en même temps c’est le dernier qui met à jour le tableau qui gagne et la modification faite par l’autre thread n’est pas toujours comptabilisé. Ce type d’opération de réduction n’est pas (encore) prise en charge par Julia.

function parallel_deposition_bad( xp )

    Lx = xmax - xmin
    rho = zeros(Float64, nx)
    Threads.@threads :static for i in eachindex(xp)
        x_norm = (xp[i]-xmin) / Lx
        ip = trunc(Int,  x_norm * nx)+1
        rho[ip] += 1
    end

    rho

end

histogram(xp, bins=nx)
plot!(LinRange(xmin, xmax, nx), parallel_deposition_bad(xp), lw = 5)

On peut résoudre ce problème en créant une version de rho nommée rho_local qui sera propre à chaque thread. Il suffira ensuite de sommer ces tableaux locaux pour calculer le tableau global.

function parallel_deposition( xp )

    Lx = xmax - xmin
    rho = zeros(Float64, nx)
    ntid = Threads.nthreads()
    rho_local = [zero(rho) for _ in 1:ntid]
    chunks = Iterators.partition(1:np, np÷ntid)

    Threads.@sync for chunk in chunks
        Threads.@spawn begin
            tid = Threads.threadid()
            fill!(rho_local[tid], 0.0)
            for i in chunk
                x_norm = (xp[i]-xmin) / Lx
                ip = trunc(Int,  x_norm * nx)+1
                rho_local[tid][ip] += 1
            end
        end
    end

    rho .= reduce(+,rho_local)

    rho

end

histogram(xp, bins=nx)
plot!(LinRange(xmin, xmax, nx), parallel_deposition(xp), lw = 5)