I'm running a Python simulations that calls compiled C code via a DLL. Specifically, Python passes and receives data using ctypes and a custom class. The C code run a loop of 100,000 iterations of a stochastic process that has 15000 iterations to converge to a value. The python code run that C code in a loop of 9 iterations. And all this is within a multiprocessing.Pool(processes=12) that has 24 iterations to run.
On my Dell Precision laptop i9, and MacBook Pro M4, everything runs fine when this code is run "solo". But sometimes (to save some time), I may open multiple parallel consoles and run this script simultaneously (on different models), I sometimes get corrupted return values.
Out of the 9x100,000 iterations (returning 7 variables each), random number of values were incorrect (approx 2,900 last time i checked) — e.g., values that should be a + b
returned just b
, as if a
became 0 unexpectedly which IS impossible in the C code. This seems like a memory overwrite or pointer issue.
Since I only noticed this problem now, when specifically looking at one value, there may be other incorrect values that I haven't seen. I would like to know where the problem is coming from to avoid wrong values and to be able to trust the simulation process.
Python code using the shared library:
# -*- coding: utf-8 -*-
import ctypes
import re
import os
import multiprocessing
import numpy as np
import pandas as pd
from scipy.stats import norm
PATH_ = re.match(r"^(.*?fits)", os.path.realpath(__file__)).group(1)
os.chdir(PATH_ + '/GCD_rTru')
myso = ctypes.cdll.LoadLibrary(f'../models/GCDM.dll')
algo = myso.GCDM
class params:
def __init__(self, x0, v, r, g, Te, Tr, xi, leak, u=0, n_sim_trials=100000):
self.x0 = x0
self.v = v
self.r = r
self.g = g
self.Te = Te
self.Tr = Tr
self.xi = xi
self.leak = leak
self.u = u
self.drift_guess = 0
self.sx0 = 0
self.sv = 0
self.sTe = 0
self.sTr = 0
self.sLeak = 0
self.s = .1
self.dt = .05
self.n = n_sim_trials
self.resp = np.zeros(self.n)
self.RT = np.zeros(self.n)
self.PMT = np.zeros(self.n)
self.MT = np.zeros(self.n)
self.firstHit = np.zeros(self.n)
self.firstHitLoc = np.zeros(self.n)
self.coactiv = np.zeros(self.n)
self.maxiter = 15000
self.randomTable = norm.ppf(np.arange(.0001, 1, .0001))
self.rangeLow = 0
self.rangeHigh = len(self.randomTable)
def run_sx(s):
cls = []
par = params_df.loc[s]
for f in [200, 1000, 2000]:
for mc in [.02, .10, .40]:
sim = params( x0=0, v=par['k']*mc, r=par[f'r_{f}'], g=par['g'], Te=par['Te'], Tr=par[f'Tr_{f}'], xi=par['xi'], leak=par[f'leak'], u=par[f'u_{f}'] )
algo(
ctypes.c_double(sim.x0), ctypes.c_double(sim.v), ctypes.c_double(sim.r), ctypes.c_double(sim.g), ctypes.c_double(sim.Te), ctypes.c_double(sim.Tr),
ctypes.c_double(sim.xi), ctypes.c_double(sim.leak), ctypes.c_double(sim.u), ctypes.c_double(sim.drift_guess),
ctypes.c_double(sim.sx0), ctypes.c_double(sim.sv), ctypes.c_double(sim.sTe), ctypes.c_double(sim.sTr), ctypes.c_double(sim.sLeak),
ctypes.c_double(sim.s), ctypes.c_double(sim.dt),
ctypes.c_void_p(sim.resp.ctypes.data), ctypes.c_void_p(sim.RT.ctypes.data),
ctypes.c_void_p(sim.PMT.ctypes.data), ctypes.c_void_p(sim.MT.ctypes.data),
ctypes.c_void_p(sim.firstHit.ctypes.data), ctypes.c_void_p(sim.firstHitLoc.ctypes.data),
ctypes.c_int(sim.n), ctypes.c_int(sim.maxiter),
ctypes.c_int(sim.rangeLow), ctypes.c_int(sim.rangeHigh), ctypes.c_void_p(sim.randomTable.ctypes.data)
)
temp = pd.DataFrame(columns=['s', 'condition_sat', 'condition_force', 'condition_mc', 'accuracy', 'RT', 'PMT', 'MT', 'latency_first_pB', 'location_first_pB'])
temp['response'] = sim.resp
temp['accuracy'] = [1 if r==1 else 0 for r in sim.resp]
temp['RT'] = sim.RT
temp['PMT'] = sim.PMT
temp['MT'] = sim.MT
temp['latency_first_pB'] = sim.firstHit
temp['location_first_pB'] = [1 if loc==1 else 0 for loc in sim.firstHitLoc]
temp['condition_force'] = np.repeat(f, 100000)
temp['condition_mc'] = np.repeat(mc, 100000)
temp['s'] = np.repeat(s, 100000)
cls.append(temp)
classifier = pd.concat(cls, ignore_index=True)
classifier.to_csv(f'processing/data/s{s}.csv')
if __name__=='__main__':
params_df = pd.read_csv('params.csv', index_col=0)
n_subjects = len(params_df.index)
with multiprocessing.Pool(processes=12) as pool:
res = pool.map(func=run_sx, iterable=params_df.index)
pool.close()
pool.join()
and C code:
#include <stdlib.h>
#include <stdio.h>
#include <sys/time.h>
#include <math.h>
void GCDM(double x0, double v, double r, double g, double Te, double Tr, double xi, double leak, double u, double drift_guess,
double sx0, double sv, double sTe, double sTr, double sleak, double s, double dt,
double *resp, double *RT, double *PMT, double *MT, double *firstHit, double *firstHitLoc,
int n, int maxiter, int rangeLow, int rangeHigh, double *randomTable)
{
double dv, x, y, urg, zU, zL, randNum;
int i, iter, randIndex, outOfGate, hit;
struct timeval t1;
gettimeofday(&t1, NULL);
srand(t1.tv_usec * t1.tv_sec);
for (i=0; i<n; i++) {
dv = x0;
x = dv;
y = dv;
resp[i] = -1.0;
RT[i] = -1.0;
PMT[i] = -1.0;
MT[i] = -1.0;
firstHit[i] = -1.0;
firstHitLoc[i] = -1.0;
iter = 0;
outOfGate = 0;
hit = 0;
do {
iter = iter+1;
randNum = rand()/(1.0 + RAND_MAX);
randIndex = (randNum * (rangeHigh - rangeLow + 1)) + rangeLow;
randNum = randomTable[randIndex];
dv = dv + (v*dt) + (sqrt(dt)*s*randNum); // decision variable
randNum = rand()/(1.0 + RAND_MAX);
randIndex = (randNum * (rangeHigh - rangeLow + 1)) + rangeLow;
randNum = randomTable[randIndex];
x = dv + (xi/sqrt(dt))*randNum; // if (xi != 0) then noisy transmission else pure transmission
y = y + leak*(x - y)*dt + drift_guess*dt; // motor preparation variable, constant gain
urg = u*iter*dt; // if (u != 0) then urgency-force signal helping else nothing
zU = y - g + urg; // neural drives
zL = -y - g + urg;
if (((zU > 0) || (zL > 0)) && (outOfGate == 0)) {
if (hit == 0) {
hit = 1; // this is the first time the gate is overcome
firstHit[i] = ((iter*dt) - (dt/2.0)) + Te; // latency of this first gate overcoming
if (zU > 0) { firstHitLoc[i] = 1.0; }
if (zL > 0) { firstHitLoc[i] = 2.0; } // location
}
outOfGate=1; // the decision variable is now located in the region between an EMG bound and the corresponding response bound
PMT[i] = ((iter*dt) - (dt/2.0)) + Te; // in case several EMG bound hits occur, PMT will store the latency of the last hit before the response
}
if ((outOfGate == 1) && ((zU <= 0) && (zL <= 0))) { outOfGate = 0; }
if (zU >= r) {
resp[i] = 1.0;
RT[i] = (iter*dt) - (dt/2.0) + Te + Tr;
MT[i] = RT[i] - PMT[i];
break;
}
if (zL >= r) {
resp[i] = 2.0;
RT[i] = (iter*dt) - (dt/2.0) + Te + Tr;
MT[i] = RT[i] - PMT[i];
break;
}
} while (iter<maxiter);
}
}