Paste: clay parallel for

Author: Blei
Mode: text
Date: Sat, 24 Dec 2011 22:16:53
Plain Text |
import unix.*;

[| SizeT == UInt64]
inline cmpxchgSizeT(dest: SizeT, old: SizeT, new: SizeT) --> returned: SizeT __llvm__ {
    %1 = load i64* %old
    %2 = load i64* %new
    %3 = cmpxchg i64* %dest, i64 %1, i64 %2 seq_cst
    store i64 %3, i64* %returned
    ret i8* null
}

[| SizeT == UInt32]
inline overload cmpxchgSizeT(dest: SizeT, old: SizeT, new: SizeT) --> returned: SizeT __llvm__ {
    %1 = load i32* %old
    %2 = load i32* %new
    %3 = cmpxchg i32* %dest, i32 %1, i32 %2 seq_cst
    store i32 %3, i32* %returned
    ret i8* null
}

private getNumberOfCpus() {
    return sysconf(_SC_NPROCESSORS_ONLN);
}

// TODO this should be in unix
alias PTHREAD_CANCELED = RawPointer(-1);

private record ParallelForInfo (
    callback : CodePointer[[SizeT], []],
    index : SizeT,
    end : SizeT,
);

private external parallelForCallback(data: RawPointer) : RawPointer {
    var info = Pointer[ParallelForInfo](data);
    var index = info^.index;
    while (index < info^.end) {
        var old = cmpxchgSizeT(info^.index, index, index + 1);
        if (index == old) {
            info^.callback(index);
        }
        index = info^.index;
    }
    return RawPointer(0);
}

parallelEachNumber(numCalls: SizeT, callback) {
    var numCpus = getNumberOfCpus();
    parallelEachNumber(numCalls, SizeT(numCpus), callback);
}

overload parallelEachNumber(numCalls: SizeT, nThreads: SizeT, callback) {
    if (numCalls == 0) {
        return;
    }

    var info = ParallelForInfo(CodePointer[[SizeT], []](callback), SizeT(0), numCalls);

    var threadIds = Vector[pthread_t]();
    resize(threadIds, nThreads);

    var attr = pthread_attr_t();
    var res = pthread_attr_init(&attr);
    if (res != 0) {
        error("pthread_attr_init failed");
    }

    for (i in range(nThreads)) {
        res = pthread_create(&threadIds[i], &attr, parallelForCallback, RawPointer(&info));
        if (res != 0) {
            error("pthread_create failed");
        }
    }

    res = pthread_attr_destroy(&attr);
    if (res != 0) {
        error("pthread_attr_destroy failed");
    }

    for (i in range(nThreads)) {
        var threadResult = RawPointer();
        var res = pthread_join(threadIds[i], &threadResult);
        if (res != 0) {
            error("pthread_join failed");
        }
        if (not null?(threadResult) and threadResult != PTHREAD_CANCELED) {
            free(threadResult);
        }
    }

    assert(info.index == info.end);
}


// now, this works (modulo non-thread safe print):
    parallelEachNumber(SizeT(5)) : i -> {
        println(i);
    }

// this doesn't
    var a1 = array(1, 2, 3, 4, 5, 6);
    var a2 = Array[Int, 6]();
    parallelEachNumber(size(a1)) : i -> {
        var x = a1[Int(i)];
        a2[Int(i)] = x*x;
    }
// what am i doing wrong?

New Annotation

Summary:
Author:
Mode:
Body: