最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Strange behavior when running multiple parallel Python processes calling C DLL - Stack Overflow

programmeradmin3浏览0评论

I'm running a Python simulations that calls compiled C code via a DLL. Specifically, Python passes and receives data using ctypes and a custom class. The C code run a loop of 100,000 iterations of a stochastic process that has 15000 iterations to converge to a value. The python code run that C code in a loop of 9 iterations. And all this is within a multiprocessing.Pool(processes=12) that has 24 iterations to run.

On my Dell Precision laptop i9, and MacBook Pro M4, everything runs fine when this code is run "solo". But sometimes (to save some time), I may open multiple parallel consoles and run this script simultaneously (on different models), I sometimes get corrupted return values.

Out of the 9x100,000 iterations (returning 7 variables each), random number of values were incorrect (approx 2,900 last time i checked) — e.g., values that should be a + b returned just b, as if a became 0 unexpectedly which IS impossible in the C code. This seems like a memory overwrite or pointer issue.

Since I only noticed this problem now, when specifically looking at one value, there may be other incorrect values that I haven't seen. I would like to know where the problem is coming from to avoid wrong values and to be able to trust the simulation process.

Python code using the shared library:

# -*- coding: utf-8 -*-
import ctypes
import re
import os
import multiprocessing
import numpy as np
import pandas as pd
from scipy.stats import norm

PATH_ = re.match(r"^(.*?fits)", os.path.realpath(__file__)).group(1)
os.chdir(PATH_ + '/GCD_rTru')

myso = ctypes.cdll.LoadLibrary(f'../models/GCDM.dll')
algo = myso.GCDM

class params:
    def __init__(self, x0, v, r, g, Te, Tr, xi, leak, u=0, n_sim_trials=100000):
        self.x0 = x0
        self.v = v
        self.r = r
        self.g = g
        self.Te = Te
        self.Tr = Tr
        self.xi = xi
        self.leak = leak
        self.u = u
        self.drift_guess = 0
        self.sx0 = 0
        self.sv = 0
        self.sTe = 0
        self.sTr = 0
        self.sLeak = 0
        self.s = .1
        self.dt = .05
        self.n = n_sim_trials

        self.resp = np.zeros(self.n)
        self.RT = np.zeros(self.n)
        self.PMT = np.zeros(self.n)
        self.MT = np.zeros(self.n)
        self.firstHit = np.zeros(self.n)
        self.firstHitLoc = np.zeros(self.n)
        self.coactiv = np.zeros(self.n)

        self.maxiter = 15000
        self.randomTable = norm.ppf(np.arange(.0001, 1, .0001))
        self.rangeLow = 0
        self.rangeHigh = len(self.randomTable)

def run_sx(s):
    cls = []
    par = params_df.loc[s]
    for f in [200, 1000, 2000]:
        for mc in [.02, .10, .40]:
            sim = params( x0=0, v=par['k']*mc, r=par[f'r_{f}'], g=par['g'], Te=par['Te'], Tr=par[f'Tr_{f}'], xi=par['xi'], leak=par[f'leak'], u=par[f'u_{f}'] )

            algo(
                ctypes.c_double(sim.x0), ctypes.c_double(sim.v), ctypes.c_double(sim.r), ctypes.c_double(sim.g), ctypes.c_double(sim.Te), ctypes.c_double(sim.Tr),
                ctypes.c_double(sim.xi), ctypes.c_double(sim.leak), ctypes.c_double(sim.u), ctypes.c_double(sim.drift_guess),
                ctypes.c_double(sim.sx0), ctypes.c_double(sim.sv), ctypes.c_double(sim.sTe), ctypes.c_double(sim.sTr), ctypes.c_double(sim.sLeak),
                ctypes.c_double(sim.s), ctypes.c_double(sim.dt),
                ctypes.c_void_p(sim.resp.ctypes.data), ctypes.c_void_p(sim.RT.ctypes.data),
                ctypes.c_void_p(sim.PMT.ctypes.data), ctypes.c_void_p(sim.MT.ctypes.data),
                ctypes.c_void_p(sim.firstHit.ctypes.data), ctypes.c_void_p(sim.firstHitLoc.ctypes.data),
                ctypes.c_int(sim.n), ctypes.c_int(sim.maxiter),
                ctypes.c_int(sim.rangeLow), ctypes.c_int(sim.rangeHigh), ctypes.c_void_p(sim.randomTable.ctypes.data)
            )

            temp = pd.DataFrame(columns=['s', 'condition_sat', 'condition_force', 'condition_mc', 'accuracy', 'RT', 'PMT', 'MT', 'latency_first_pB', 'location_first_pB'])
            temp['response'] = sim.resp
            temp['accuracy'] = [1 if r==1 else 0 for r in sim.resp]
            temp['RT'] = sim.RT
            temp['PMT'] = sim.PMT
            temp['MT'] = sim.MT
            temp['latency_first_pB'] = sim.firstHit
            temp['location_first_pB'] = [1 if loc==1 else 0 for loc in sim.firstHitLoc]
            temp['condition_force'] = np.repeat(f, 100000)
            temp['condition_mc'] = np.repeat(mc, 100000)
            temp['s'] = np.repeat(s, 100000)
            cls.append(temp)

    classifier = pd.concat(cls, ignore_index=True)
    classifier.to_csv(f'processing/data/s{s}.csv')

if __name__=='__main__':
    params_df = pd.read_csv('params.csv', index_col=0)
    n_subjects = len(params_df.index)
    with multiprocessing.Pool(processes=12) as pool:
        res = pool.map(func=run_sx, iterable=params_df.index)
        pool.close()
        pool.join()

and C code:

#include <stdlib.h>
#include <stdio.h>
#include <sys/time.h>
#include <math.h>

void GCDM(double x0, double v, double r, double g, double Te, double Tr, double xi, double leak, double u, double drift_guess,
          double sx0, double sv, double sTe, double sTr, double sleak, double s, double dt, 
          double *resp, double *RT, double *PMT, double *MT, double *firstHit, double *firstHitLoc,          
          int n, int maxiter, int rangeLow, int rangeHigh, double *randomTable)
{
    double dv, x, y, urg, zU, zL, randNum;
    int i, iter, randIndex, outOfGate, hit;
    struct timeval t1;
    gettimeofday(&t1, NULL);
    srand(t1.tv_usec * t1.tv_sec);

    for (i=0; i<n; i++) {
        dv = x0;
        x = dv;
        y = dv;
        
        resp[i] = -1.0;
        RT[i] = -1.0;
        PMT[i] = -1.0;
        MT[i] = -1.0;
        firstHit[i] = -1.0;
        firstHitLoc[i] = -1.0;

        iter = 0;
        outOfGate = 0;
        hit = 0;
        do {
            iter = iter+1;
            
            randNum = rand()/(1.0 + RAND_MAX);
            randIndex = (randNum * (rangeHigh - rangeLow + 1)) + rangeLow;
            randNum = randomTable[randIndex];
            dv = dv + (v*dt) + (sqrt(dt)*s*randNum); // decision variable
            
            randNum = rand()/(1.0 + RAND_MAX);
            randIndex = (randNum * (rangeHigh - rangeLow + 1)) + rangeLow;
            randNum = randomTable[randIndex];
            x = dv + (xi/sqrt(dt))*randNum; // if (xi != 0) then noisy transmission else pure transmission
            
            y = y + leak*(x - y)*dt + drift_guess*dt; // motor preparation variable, constant gain
            urg = u*iter*dt; // if (u != 0) then urgency-force signal helping else nothing
            zU = y - g + urg; // neural drives
            zL = -y - g + urg;
            
            if (((zU > 0) || (zL > 0)) && (outOfGate == 0)) {
                if (hit == 0) {
                    hit = 1; // this is the first time the gate is overcome
                    firstHit[i] = ((iter*dt) - (dt/2.0)) + Te; // latency of this first gate overcoming
                    if (zU > 0) { firstHitLoc[i] = 1.0; }
                    if (zL > 0) { firstHitLoc[i] = 2.0; } // location
                }
                outOfGate=1; // the decision variable is now located in the region between an EMG bound and the corresponding response bound
                PMT[i] = ((iter*dt) - (dt/2.0)) + Te; // in case several EMG bound hits occur, PMT will store the latency of the last hit before the response 
            }
            
            if ((outOfGate == 1) && ((zU <= 0) && (zL <= 0))) { outOfGate = 0; }
            
            if (zU >= r) {
                resp[i] = 1.0;
                RT[i] = (iter*dt) - (dt/2.0) + Te + Tr;
                MT[i] = RT[i] - PMT[i];
                break;
            }

            if (zL >= r) {
                resp[i] = 2.0; 
                RT[i] = (iter*dt) - (dt/2.0) + Te + Tr;
                MT[i] = RT[i] - PMT[i];
                break;
            }
        } while (iter<maxiter);
    }
}
发布评论

评论列表(0)

  1. 暂无评论