Task- und Datenparallelität mit Rust

Stefan LankesJens Breitbart

stl

jbreitbart

github.com/stlankes

github.com/jbreitbart

eduOS-rs, HermitCore

jensbreitbart.de

System Software @ RWTH

Here for fun and no profit

Hinweise zum Vortrag

curl https://sh.rustup.rs -sSf | sh

Was ist Rust?

Rust ist eine (relativ) neue Programmiersprache für systemnahe Software

fn main() {
    // Die Statements werden ausgeführt sobald
    // das compilierte Binary gestartet wird.

    // Ausgabe auf stdout
    println!("Hello para//el 2019!");
}

Bekannt u.a. für den Einsatz in Firefox

⇒ Rust Code läuft somit auf Millionen von Rechnern

Woher kommt Rust?

rust
  • Rust ist ein open-source (MIT + Apache) Projekt

  • Wird aktuell primär von Mozilla Research gesponsort

  • Die Weiterentwicklung selbst wird allerdings stark durch die Community getrieben

Vorteile von Rust

  • C/C++ ähnliche Performance

  • Compilerbasierte Überprüfungen welche z.B.

    • Speichersicherheit (ohne Garbage Collection) garantieren

    • Data Races verhindern

Falscher Code compiliert nicht

Inhaltsübersicht

  • Rayon

  • SIMD

  • Tokio.rs

Keine Einfühung zu Threads in Rust, siehe dafür z.B. Heise Developer

Rayon: Parallelism in Rust

  • https://github.com/rayon-rs/rayon

  • Unterstützt parallele Berechungen auf Basis von Task-Parallelität, Fork-Join-Prinzipien und Work Stealing

    • Ähnlich zu Cilk (daher auch der Name)

  • Bietet aber auch Daten-Parallelität über Iteratoren an

Zielsetzung von Rayon (I)

  • Zitat: Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel one. It also guarantees data-race freedom.

    • Compiler erkennt mehrfach Benutzung

      • Es kann nur einen (Owner) geben

    • Zudem müssen Datentypen als Threadsafe definiert werden

Zielsetzung von Rayon (II)

  • Einführung spezieller Traits, um Threadsicherheit zu gewährleisten.

// A type is `Send` if it is safe to send it to another thread.
// A type is `Sync` if it is safe to share between threads.

fn foo<T:Send+Sync>(value: T) {
	...
}

Beispiel: N-Body

  • Interative Berechnung der Gravitations- bzw. Coulomb-Kraft zwischen Partikeln im Raum.

  • Kraft: \(\vec{F}(n+1)=\vec{r}(n) \cdot |\vec{r}(n)|^{-3/2}\)

  • Geschwindigkeit: \(\vec{v}(n+1)=\Delta t \cdot \vec{F}(n+1) + \vec{v}(n)\)

  • Position: \(\vec{x}(n+1)=\Delta t \cdot \vec{v}(n+1) + \vec{x}(n)\)

  • Nur zum Erläutern der Prinzipien

  • Besser optimierte Lösungen verfügbar

N-Body

N-Body

N-Body: Naive Lösung

  • Ein Körper wird beschrieben durch seine Postion sowie seine Geschwindigkeit

  • Position und Geschwindigkeit sind Vektoren im 3-dimensionalen Raum

#[derive(Copy, Clone, Debug)]
pub struct Vector<T> {
	pub x: T,
	pub y: T,
	pub z: T
}

N-Body: Vektor-Operationen

  • Für diese Vektoren müssen die Vektor-Operation definiert werden

    • Hier selber geschrieben

    • Standard-Implementierungen (z.B. cgmath) sind verfügbar

/// Definition des Operators +=
impl<T: Add<Output=T> + AddAssign> AddAssign for Vector<T> {
	fn add_assign(&mut self, other: Vector<T>) {
		self.x += other.x;
		self.y += other.y;
		self.z += other.z;
	}
}

N-Body: Vektor-Operationen

  • Die Körper werden durch ein zusammenhängender, erweiterbarer Feld-Typ, Vec<T> beschrieben.

  • Teil der Standard-Laufzeitumgebung

#[derive(Clone, Debug, PartialEq)]
pub struct NBody {
	pub position: Vec<Vector<Precision>>,
	pub velocity: Vec<Vector<Precision>>
}

N-Body: Algorithmus

  • Abstand zwischen allen Körper bestimmen

  • Kraft herleiten, Geschwindigkeit und Position neu bestimmen

position.iter().zip(velocity.iter_mut())
  .for_each(|(item_pi, item_vi)| {
	let mut f: Vector<Precision> = Vector::new(0.0, 0.0, 0.0);

	position.iter().for_each(|item_pj| {
		// Newton’s law of universal gravity calculation.
		let diff = *item_pj - *item_pi;
		let n2 = diff * diff;
		let power = 1.0 / (n2.sqrt() * n2);
		f += diff*power;
	});
	*item_vi += f*DELTA_T;
});

N-Body: Trick

  • 2 Iteratoren über die Geschwindigkeit und Position im Gleichschritt bewegen ⇒ zip-Operator in Rust

position.iter().zip(velocity.iter_mut())
  .for_each(|(item_pi, item_vi)| {
	let mut f: Vector<Precision> = Vector::new(0.0, 0.0, 0.0);

	position.iter().for_each(|item_pj| {
		// Newton’s law of universal gravity calculation.
		let diff = *item_pj - *item_pi;
		let n2 = diff * diff;
		let power = 1.0 / (n2.sqrt() * n2);
		f += diff*power;
	});

	*item_vi += f*DELTA_T;
});

Parallelisierung mit Rayon

  • Definition von parallelen Itertoren (ähnlich zu TBB)

  • Stehen für alle Algorithem aus der Standardumgebung zur Verfügung

position.par_iter().zip(velocity.par_iter_mut())
  .for_each(|(item_pi, item_vi)| {
	let mut f: Vector<Precision> = Vector::new(0.0, 0.0, 0.0);

	position.iter().for_each(|item_pj| {
		// Newton’s law of universal gravity calculation.
		let diff = *item_pj - *item_pi;
		let n2 = diff.square();
		let power = 1.0 / (n2.sqrt() * n2);
		f += diff*power;
	});
	*item_vi += f*DELTA_T;
});

Array of Structures

  • Bisherige Lösung ist anschaulich, aber nicht für SIMD geeignet

  • Einzelne Komponenten (z.B. alle x-Einträge) liege nicht kontinuierlich im Speicher

  • Alignment nicht zwingend garantiert

  • Idee: Structure of Arrays

#[derive(Clone, Debug, PartialEq)]
pub struct NBody {
	pub position: Vec<Vector<Precision>>,
	pub velocity: Vec<Vector<Precision>>
}

Structure of Arrays

  • Statt je Körper eine Struktur in einem Array abzulegen, wird eine Struktur von Arrays angelegt

    • Mögliche Optimierung von Vektor-Operationen

  • Typische Lösung in C

typedef float Scalar;

typedef struct {
	Scalar x[nParticles];
	Scalar y[nParticles];
	Scalar z[nParticles];
	Scalar vx[nParticles];
	Scalar vy[nParticles];
	Scalar vz[nParticles];
} NBody;

SIMD-Programmierung

  • Mögliche Lösungen (allgemein)

    • Optimierungen durch den Compiler (häufig nicht optimal)

    • Hinweise an den Compiler (z.B. OpenMP)

    • Direkte Programmierung durch Intrinsics

// Entspricht direkter Assembler-Progrommierung
__m256 _mm256_add_ps (__m256 a, __m256 b);

packed_simd

  • Portable SIMD-Vektoren durch packed-simd

  • Zurzeit nur mit dem nightly-Compiler verwendbar

  • Einführung neuer Typen, welche SIMD-Register entsprechen

  • Standard-Operatoren existieren bereits für diese Typen

// Vektor-Addition mit 8 einfach, genaue Fließkommazahlen
pub fn sum(a: f32x8, b: f32x8) -> f32x8 {
	a+b
}

Structure of Arrays in Rust

  • T ist hier ein SIMD-Datentyp (z.B. f32x8)

  • Es fehlt ein Iterator, um parallel über x, y und z zu wandern

pub struct Array<T>([T; N_PARTICLES_SOA]);

pub struct StructOfArrays<T> {
	pub x: Array<T>,
	pub y: Array<T>,
	pub z: Array<T>
}

pub struct NBodySoA {
	position: StructOfArrays<PrecisionSoA>,
	velocity: StructOfArrays<PrecisionSoA>
}

Neuer Iterator

  • Aktuelle Position (pos) festhalten und auswerten

pub struct StructOfArraysIter<'a, T: 'a> {
	inner: &'a StructOfArrays<T>,
	pos: usize, len: usize
}

impl<'a, T> Iterator for StructOfArraysIter<'a, T> {
	type Item = (&'a T, &'a T, &'a T);
	fn next(&mut self) -> Option<Self::Item> {
		if self.pos < self.len {
			let result = (&self.inner.x.0[self.pos],
				      &self.inner.y.0[self.pos],
				      &self.inner.z.0[self.pos]);
			self.pos += 1;
			Some(result)
	} } }

Anpassung des Algorithmuses

  • Iteratoren liefern nun (x, y, z) zurück.

position.iter().zip(velocity.iter_mut())
  .for_each(|((pix, piy, piz), (vix, viy, viz))| {
	let mut fx: PrecisionSoA = ...
	position.iter().for_each(|(pjx, pjy, pjz)| {
		let mut dx: ...
		for lane in 0..PrecisionSoA::lanes() {
			dx +=  *pjx - PrecisionSoA::splat(pix.extract(lane));
			dy +=  ..
		}
		let n2 = dx*dx + dy*dy + dz*dz;
		let power = 1.0 / (n2.sqrt() * n2);
		fx += dx*power; fy += dy*power; fz += dz*power;
	});
	*vix += fx * dt; *viy += fy * dt; *viz += fz * dt;
});

Paralleler Algorithmus

  • Parallele Iteratoren existiert nicht für eigene Iteratoren

  • Lösung:

    • Selber definieren

    • Oder aus sequentielle Iteratoren ableiten

      • Eventuell sind diese nicht optimal

position.iter().zip(velocity.iter_mut()).par_bridge()
  .for_each(|((pix, piy, piz), (vix, viy, viz))| {
	..
}

Performance / Traue keiner Statistik

nbody results

tokio.rs

  • Eine Bibliothek für Nebenläufigkeit

    • basierend auf Futures

    • und einer Laufzeitumgebung

  • Anwendungsgebiet: asynchrones I/O

  • Link

Laufzeitumgebung

  • Multithreaded + work stealing

    • Multiplexing

  • Futures sind poll-basiert

    • Backpressure im allgemeinen kein Problem

    • Unnötige Futures werden automatisch verworfen und erzeugen keine Last

Beispiel: Client / Server

  • Einfacher TCP Client und Server

    • Client sendet viele Anfragen parallel

    • Server sendet empfangene Daten wieder zurück

  • Idee

    • Eine Task pro Anfrage

    • Wechsel der Task, wenn sie nichts tun kann

Server (i)

  • Unendlicher Stream an Verbindungen

let listener = TcpListener::bind(&addr).unwrap();
let server = listener
    .incoming()
    .for_each(|socket| {
        // für jede Verbindung
        // siehe nächste Folie
    })
    .map_err(|err| {
        // Fehlerbehandlung
        println!("accept error = {:?}", err);
    });
tokio::run(server);

Server (ii) - pro Verbindung

  • Wird ausgeführt sobald eine Verbindung aufgebaut wurde

let (reader, writer) = socket.split();
let amountF = io::copy(reader, writer);

let msg = amountF.then(|result| {
    match result {
        Ok((amount, _, _)) => println!("wrote {} bytes", amount),
        Err(e) => println!("error: {}", e),
    }

    Ok(())
});

tokio::spawn(msg);
Ok(())

Client (i) - Server Verbindung

  • Future für das Versenden von "hello para//el"

TcpStream::connect(&addr)
    .and_then(|stream| {
        println!("connected");

        io::write_all(stream, "hello para//el\n").then(|result| {
            println!("wrote to connection; success={:?}",
                     result.is_ok());
            Ok(())
        })
    })
    .map_err(|err| {
        println!("connection error = {:?}", err);
    })

Client (ii) - Fehlerbehandlung

  • Retry mit exponential backoff

let number_of_connections = 100_000;
let retry_strategy = ExponentialBackoff::from_millis(10)
                     .map(jitter).take(3);

let client = stream::iter_ok(0..number_of_connections)
    .for_each(move |_| Retry::spawn(
        retry_strategy.clone(),
        action
    ).then(|_| Ok(())));

Performance

  • Es passiert nicht viel in dem Beispiel

  • Die meiste Zeit wird im OS verbracht (oder gewartet)

  • Linux Desktop (Ryzen 2700X): ~33.000 Verbindungen pro Sekunde

    • <25% last auf allen 16 Kernen

Ausblick Nebenläufigkeit

fn main() {
    let closure = async || {
         println("Hello from async closure.");
    };
    println!("Hello from main");
    let future = closure();
    println!("Hello from main again");
    futures::block_on(future);
}

Zusammenfassung

  • Rust ist für einen old school Entwickler gewönnungsbedürftig

  • Rayon und Tokio vereinfachen viele Aufgaben

  • packed-simd emöglicht eine transparente Verwendung von Vektorregister

  • Performance ist im allgemeinen sehr gut