Paste: clay parallel for
Author: | Blei |
Mode: | text |
Date: | Sat, 24 Dec 2011 22:16:53 |
Plain Text |
import unix.*;
[| SizeT == UInt64]
inline cmpxchgSizeT(dest: SizeT, old: SizeT, new: SizeT) --> returned: SizeT __llvm__ {
%1 = load i64* %old
%2 = load i64* %new
%3 = cmpxchg i64* %dest, i64 %1, i64 %2 seq_cst
store i64 %3, i64* %returned
ret i8* null
}
[| SizeT == UInt32]
inline overload cmpxchgSizeT(dest: SizeT, old: SizeT, new: SizeT) --> returned: SizeT __llvm__ {
%1 = load i32* %old
%2 = load i32* %new
%3 = cmpxchg i32* %dest, i32 %1, i32 %2 seq_cst
store i32 %3, i32* %returned
ret i8* null
}
private getNumberOfCpus() {
return sysconf(_SC_NPROCESSORS_ONLN);
}
// TODO this should be in unix
alias PTHREAD_CANCELED = RawPointer(-1);
private record ParallelForInfo (
callback : CodePointer[[SizeT], []],
index : SizeT,
end : SizeT,
);
private external parallelForCallback(data: RawPointer) : RawPointer {
var info = Pointer[ParallelForInfo](data);
var index = info^.index;
while (index < info^.end) {
var old = cmpxchgSizeT(info^.index, index, index + 1);
if (index == old) {
info^.callback(index);
}
index = info^.index;
}
return RawPointer(0);
}
parallelEachNumber(numCalls: SizeT, callback) {
var numCpus = getNumberOfCpus();
parallelEachNumber(numCalls, SizeT(numCpus), callback);
}
overload parallelEachNumber(numCalls: SizeT, nThreads: SizeT, callback) {
if (numCalls == 0) {
return;
}
var info = ParallelForInfo(CodePointer[[SizeT], []](callback), SizeT(0), numCalls);
var threadIds = Vector[pthread_t]();
resize(threadIds, nThreads);
var attr = pthread_attr_t();
var res = pthread_attr_init(&attr);
if (res != 0) {
error("pthread_attr_init failed");
}
for (i in range(nThreads)) {
res = pthread_create(&threadIds[i], &attr, parallelForCallback, RawPointer(&info));
if (res != 0) {
error("pthread_create failed");
}
}
res = pthread_attr_destroy(&attr);
if (res != 0) {
error("pthread_attr_destroy failed");
}
for (i in range(nThreads)) {
var threadResult = RawPointer();
var res = pthread_join(threadIds[i], &threadResult);
if (res != 0) {
error("pthread_join failed");
}
if (not null?(threadResult) and threadResult != PTHREAD_CANCELED) {
free(threadResult);
}
}
assert(info.index == info.end);
}
// now, this works (modulo non-thread safe print):
parallelEachNumber(SizeT(5)) : i -> {
println(i);
}
// this doesn't
var a1 = array(1, 2, 3, 4, 5, 6);
var a2 = Array[Int, 6]();
parallelEachNumber(size(a1)) : i -> {
var x = a1[Int(i)];
a2[Int(i)] = x*x;
}
// what am i doing wrong?
New Annotation