import unix.*; [| SizeT == UInt64] inline cmpxchgSizeT(dest: SizeT, old: SizeT, new: SizeT) --> returned: SizeT __llvm__ { %1 = load i64* %old %2 = load i64* %new %3 = cmpxchg i64* %dest, i64 %1, i64 %2 seq_cst store i64 %3, i64* %returned ret i8* null } [| SizeT == UInt32] inline overload cmpxchgSizeT(dest: SizeT, old: SizeT, new: SizeT) --> returned: SizeT __llvm__ { %1 = load i32* %old %2 = load i32* %new %3 = cmpxchg i32* %dest, i32 %1, i32 %2 seq_cst store i32 %3, i32* %returned ret i8* null } private getNumberOfCpus() { return sysconf(_SC_NPROCESSORS_ONLN); } // TODO this should be in unix alias PTHREAD_CANCELED = RawPointer(-1); private record ParallelForInfo ( callback : CodePointer[[SizeT], []], index : SizeT, end : SizeT, ); private external parallelForCallback(data: RawPointer) : RawPointer { var info = Pointer[ParallelForInfo](data); var index = info^.index; while (index < info^.end) { var old = cmpxchgSizeT(info^.index, index, index + 1); if (index == old) { info^.callback(index); } index = info^.index; } return RawPointer(0); } parallelEachNumber(numCalls: SizeT, callback) { var numCpus = getNumberOfCpus(); parallelEachNumber(numCalls, SizeT(numCpus), callback); } overload parallelEachNumber(numCalls: SizeT, nThreads: SizeT, callback) { if (numCalls == 0) { return; } var info = ParallelForInfo(CodePointer[[SizeT], []](callback), SizeT(0), numCalls); var threadIds = Vector[pthread_t](); resize(threadIds, nThreads); var attr = pthread_attr_t(); var res = pthread_attr_init(&attr); if (res != 0) { error("pthread_attr_init failed"); } for (i in range(nThreads)) { res = pthread_create(&threadIds[i], &attr, parallelForCallback, RawPointer(&info)); if (res != 0) { error("pthread_create failed"); } } res = pthread_attr_destroy(&attr); if (res != 0) { error("pthread_attr_destroy failed"); } for (i in range(nThreads)) { var threadResult = RawPointer(); var res = pthread_join(threadIds[i], &threadResult); if (res != 0) { error("pthread_join failed"); } if (not null?(threadResult) and threadResult != PTHREAD_CANCELED) { free(threadResult); } } assert(info.index == info.end); } // now, this works (modulo non-thread safe print): parallelEachNumber(SizeT(5)) : i -> { println(i); } // this doesn't var a1 = array(1, 2, 3, 4, 5, 6); var a2 = Array[Int, 6](); parallelEachNumber(size(a1)) : i -> { var x = a1[Int(i)]; a2[Int(i)] = x*x; } // what am i doing wrong?