最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

memory management - Leak errors in fifo application in c - Stack Overflow

programmeradmin2浏览0评论

I'm not sure why my valgrind is spitting heap errors and I've been tracing my code left and right. I have some code, but I'm not sure if more code is needed.

My jobs.h

#include "piper.h"
#include "broadcast.h"
#include <pthread.h>    // for threads

/*
this depends on everything above so make sure you test with
piperTest first!
*/
/*
the starting and entry point for piper communicator
with its own initialization and free
*/

struct PiperThreadData{
    int running;
    pthread_mutex_t* flagMutex;
    Environment* e;
    pthread_t* piperJob;
};

struct PiperThreadData* startPiperThread();
int isRunning(struct PiperThreadData* tData);
void changeRunningState(int newState, struct PiperThreadData* tData);
void endPiperThread(struct PiperThreadData* tData);
/*
This is the main thread of the piper
*/
void *piperCommunicatorJob(void* tData);

My jobs.c

#include "jobs.h"

#include <stdlib.h>

struct PiperThreadData* startPiperThread(){
    struct PiperThreadData* tData = malloc(sizeof(struct PiperThreadData));
    tData->running = 1;
    //printf("creation pointer: %p\n", tData);

    tData->e = malloc(sizeof(Environment));
    int ret = makeEnv(tData->e);
    if(ret == -1){
        free(tData->e);
        free(tData);
        printErrorNicely("Creation of environment failed",
                            "startPiperThread() in piper.c",
                            1);
    }

    tData->flagMutex = malloc(sizeof(pthread_mutex_t));
    if (!tData->flagMutex) {
        free(tData->e);
        free(tData);
        printErrorNicely("Creation of flag mutex failed",
                            "startPiperThread() in piper.c",
                            1);
    }
    pthread_mutex_init(tData->flagMutex, NULL);

    tData->running = 1;

    tData->piperJob = malloc(sizeof(pthread_t));

    int rc = pthread_create(tData->piperJob, NULL, piperCommunicatorJob, tData);
    if (rc) {
        printf("ERROR; return code from pthread_create() is %d\n", rc);
        free(tData->piperJob);
        
        pthread_mutex_destroy(tData->flagMutex);
        free(tData->flagMutex);

        cleanEnv(tData->e);
        free(tData->e);

        free(tData);
        exit(-1);
    }

    return tData;
}

int isRunning(struct PiperThreadData* tData){
    pthread_mutex_lock(tData->flagMutex);
    int retVal = tData->running;
    pthread_mutex_unlock(tData->flagMutex);
    return retVal;
}

void changeRunningState(int newState, struct PiperThreadData* tData){
    pthread_mutex_lock(tData->flagMutex);
    tData->running = newState;
    pthread_mutex_unlock(tData->flagMutex);
}

void endPiperThread(struct PiperThreadData* tData){
    //printf("close pointer: %p\n", tData);
    pthread_mutex_lock(tData->flagMutex);
    tData->running = 0;
    pthread_mutex_unlock(tData->flagMutex);
    
    void* status; 
    printf("Trying to close thread...\n");
    pthread_join(*(tData->piperJob), &status);

    pthread_exit(tData->piperJob);

    pthread_mutex_destroy(tData->flagMutex);
    free(tData->flagMutex);

    cleanEnv(tData->e);
    free(tData->e);

    printf("Last free\n");
    free(tData);
}

void *piperCommunicatorJob(void* arg){
    struct PiperThreadData* tData = (struct PiperThreadData*)arg;
    //printf("thread pointer: %p\n", tData);
    printf("Environment status: %p\n", tData->e->status);

    int res;
    while(isRunning(tData)){
        printf("PIPER: checking status\n");
        res = checkStatus(tData->e);
        if(tData->e->status->clientOnTheOtherSide){
            res = subscribe(tData->e);
            if(res == NOCLIENT){
                printf("PIPER ERROR: get an error on subscribing\n");
                printf(">> in piperCommunicatorJob() thread\n");
            }
        }

        if(isClientInputPending(tData->e)){
            printf("PIPER: reading input...\n");
            unsigned int cRead = 0;
            res = readFIFO(&cRead, tData->e);
            if(res == 0){
                printf("PIPER ERROR: somehow poll timed out on the other side\n");
                printf(">> throw away that input\n");
                continue;
            }
            if(res == -1){
                printf("PIPER ERROR: too much stuff in our buffer\n");
                printf(">> throw away that input\n");
                continue;
            }
        }

        if(isServerOutputPending(tData->e)){
            printf("PIPER: writing...\n");
            res = writeToFIFO(PACKETSIZE, tData->e);
            if(res == 0){
                printf("PIPER ERROR: somehow we got a server pending, but nothing to output\n");
            }
        }
    }
    return NULL;
}

here is piper.h which contains the definition for Environment and Status

//Only once header
#ifndef PIPER
#define PIPER

#include "ring.h"
#include "merror.h"

#include <stdio.h>      // For printf(), fgets()
#include <string.h>     // For strlen(), strerror()
#include <fcntl.h>      // For open() and O_RDONLY, O_WRONLY
#include <sys/stat.h>   // For mkfifo() and file permission macros
#include <sys/types.h>  // For mkfifo() and data types like mode_t
#include <unistd.h>     // For read(), write(), close(), access()

#include <poll.h>       // For poll(), pollfd
#include <time.h>

#include <errno.h>    // For errno

#define TIMEOUT 10000
#define BUFFERSIZE 1024
#define PACKETSIZE 100

#define ENVINITIALIZED -9
#define ENVINITIALIZED_STR "Environment was already initialized"
#define ENVNOTINITIALIZED -10
#define ENVNOTINITIALIZED_STR "Environment was not initialized"
#define CLIENTCLOSED -11
#define CLIENTCLOSE_STR "Client closed the connection"
#define NOCLIENT -12
#define NOTCLIENT_STR "There is nobody on the listening side"

#define numInput 1
#define inputFIFO "/tmp/publisherInput.fifo"
#define numOutput 1
#define outputFIFO "/tmp/publisherOutput.fifo"

extern int killSwitch;

/*
checks if path exists and then deletes it if it does

because of race conditions, it's hard to make a function that just checks 
a path (it might be misunderstood and TOCTOU vulnerabilities might happen)

*/
int deletePathIfExists(const char* path);

/*
a data structure that represents the status of Environment
You can look at the variables to see what kind of statuses
are maintained
*/
struct Status{
    //Should be a binary where a client is ready to be read
    // but the client has not been "subscribed" yet
    int clientOnTheOtherSide;

    //Should be a binary indicating whether or not
    //a client has connected. Any instance where multiple 
    //connected should be an error and not permitted
    int clientConnected;

    //should be true once a client is verified. 
    //an unverified client should be kicked immediately
    int clientVerified;
};

/*
    A data structure that is just a collection of the state
of the fifos (whether they exist or not)
*/
struct Environment{
    int created;
    //int numConnected
    struct Status* status;
    struct pollfd inputfds[numInput], outputfds[numOutput];
    //this gotta be a pointer 
    // becuase it doesn't know the actual size
    struct RingBuffer *inputRB, *outputRB;

    //whether or not the client side put input
    int clientInputPending;
    //whether or not the server is ready to output something
    int serverOutputPending;


    //gotta make it thread safe?
    pthread_mutex_t* mutex;
};
typedef struct Environment Environment;

/*
checks the existence of input/output fifos. if it exists then it
makes the two input/output FIFOS:
    - one called "input"
    - one called "output"
then it returns the filedescriptors of both

"You cannot open a FIFO for writing without blocking unless a reader 
already has the FIFO open. The simplest fix is just to wait a second 
and repeat the open if it returns ENXIO. Repeat until the other side 
is running."

this means the filedescriptor of write fifo cannot be opened
- it does mean it can be made
- it also does mean that it can only be read once a connection
has been established to this system
(creating further really annoying complications >w<)
    -=> checkout subscribe() function

returns 1 on success
returns ENVINITIALIZED if environment was already initialized
returns -1 on any failure
*/
int makeEnv(Environment* env);

/*
this changes the status of env in order 
to allow write calls to FIFO

should be called when someone connects to the input FIFO
- BUT the only way to check if someone connected to THAT is 
by poll() and THAT is only called in 
    - readFIFO
    - checkStatus
---> SOOO
The solution is to make entrust this function call to
the user. If the user calls check status and finds that
a client connected. Then it calls subscribe (or disconnected,
unsubscribe)
But that implementation is left to the user, it's not
listed here

the read fifo
returns 1 on a successful connect
returns ENVNOTINITIALIZED if env is not initialized
returns NOCLIENT if there is nobody to connect
returns -1 on a "nobody is listening"
*/
int subscribe(Environment* env);

/*
decreases the numberConnected

returns 1 on success
returns -1 if nobody else there (but doesnt crash)
*/
int unsubscribe(Environment* env);

/*
updates the status field in the environment
- status is updated via poll function

Note that this does NOT verify the connection
and change the status of verifiedConnection

returns 1 if there is someone on the other side
    (i.e. env is initialized, someone is on the other side) 
returns -1 if any fail
*/
int checkStatus(Environment* env);

/*
read BUFFERSIZE (100) characters off the contents of input FIFO
and changes the buffer to reflect the values

note that buffer becomes overwritten, 
even on certain errors

also note,
the his function hangs until it receives something from the fifo

also also note,
the client side may actually crash on their end while 
fifo is reading
there is a special errno that happens that must be accounted for

returns 1 on success 
return ENVNOTINITIALIZED when call read when env was not initialized
returns 0 on a poll timeout
returns -1 if our wanted read size exceeds the remaining 
size in ring buffer
*/
int readFIFO(unsigned int* charsRead, Environment* env);

/*
writes numChars characters onto the FIFO. Then it fills the
remainder of the buffer with space characters

returns 1 on success
return ENVNOTINITIALIZED when call read when env was not initialized
returns 0 if outputRB was empty
returns -1 if the outputRB pops started to exceed its own size (non-0)
    (which implies the outputRBs started to stop being multiples of 100)
*/
int writeToFIFO(unsigned int numChars, Environment* env);

/*
essentially extracts a string from the inputRingBuffer

note that there is not null-byte received. Every
packet should be exactly 100 bytes

This is where this fits in the picture:
=> INPUT FIFO>RINGBUFFER => *[READ RINGBUFFER]* => 
    PROCESSES THE QUERY
=> WRITE TO RINGBUFFER => RINGBUFFER>OUTPUT FIFO

returns 1;
returns ENVNOTINITIALIZED if env is not initialized
returns 0 if inputRingBuffer is empty
returns -1 in a weird case where inputRingBuffer pop
    exceeded is size (even though we checked in code)
*/
int getQuery(char* buffer, Environment* env);

/*
essentially pushes a string into the outputRingBuffer 

This is where this fits in the picture:
=> INPUT FIFO>RINGBUFFER => READ RINGBUFFER => 
    PROCESSES THE QUERY
=> *[WRITE TO RINGBUFFER]* => RINGBUFFER>OUTPUT FIFO

returns 1
returns ENVNOTINITIIALIZED if env is not initialized
returns 0 if we want to send a response
    but our outputRingBuffer is too small
*/
int sendResponse(char* response, Environment* env);

int isClientInputPending(Environment* env);
int isServerOutputPending(Environment* env);

/*
it looks at fd to see if it's an open fd
    if it is, then it calls close
it looks at the directory to see if inputfifo, outputfifo exists
    if it exists it is deleted

**Don't fet to free the RingBuffer 
*/
int cleanEnv(Environment* env);

/*
prints the status
*/
void printStatus(struct Status* s);

/*
TODO...
This one requires a bit of research before implementation
because we don't know good ways to verify connection that
provide security

however, verification will NOT be required for the other 
functions to work. This will have to be a check by the user
*/
int verifyClient(Environment* env); // this one requires research


#endif 

And also the makeEnv() and cleanEnv() should be handling the Environment initialization

int makeEnv(Environment* env){
    /*
    //some defensive programming
    if(env->created == 1){
        printErrorNicely("Environment seems to be created already. Did you call this twice?", "makeEnv()", 0);
        return ENVINITIALIZED;
    }
    */
    env->created = 1;
    env->clientInputPending = 0;
    env->serverOutputPending = 0;

    //printf("Creating FIFOS\n");

    //giving these fifos read and write permissions to everybody 
    // (not execute permissions of course)
    int res = mkfifo(inputFIFO, 0666);
    if(res == -1){
        printErrorNicely("Trying to create input fifo", "makeEnv()", 0);
    }

    res = mkfifo(outputFIFO, 0666);
    if(res == -1){
        printErrorNicely("Trying to create output fifo", "makeEnv()", 0);
    }

    //printf("FIFOS made\n");

    env->created = 1;

    env->status = malloc(sizeof(struct Status));
    env->status->clientOnTheOtherSide = 0;
    env->status->clientConnected = 0;
    env->status->clientVerified = 0;

    int inputfdInt = open(inputFIFO, O_RDONLY | O_NONBLOCK);
    if(inputfdInt == -1){
        printErrorNicely("Trying to get file descriptor of input fifo with open()", "makeEnv()", 0);
        return -1;
    }

    /*
    "You cannot open a FIFO for writing without blocking unless a reader 
    already has the FIFO open. The simplest fix is just to wait a second 
    and repeat the open if it returns ENXIO. Repeat until the other side 
    is running."
    */
    /*
    int outputfdInt = open(outputFIFO, O_WRONLY | O_NONBLOCK);
    if(outputfdInt == -1){
        printErrorNicely("Trying to get file descriptor of output fifo with open()", "makeEnv()", 0);
        return -1;
    }
    */

    //printf("Environment set. Setting vars\n");

    struct pollfd inputfd;
    inputfd.fd = inputfdInt;
    inputfd.events = POLLIN;
    env->inputfds[0] = inputfd;

    struct pollfd outputfd;
    outputfd.fd = -1;
    outputfd.events = POLLOUT;
    env->outputfds[0] = outputfd;

    //printf("Setting up buffers\n");

    env->inputRB = malloc(sizeof(struct RingBuffer));
    env->outputRB = malloc(sizeof(struct RingBuffer));
    createRingBuffer(env->inputRB, BUFFERSIZE);
    createRingBuffer(env->outputRB, BUFFERSIZE);

    env->mutex = malloc(sizeof(pthread_mutex_t));
    pthread_mutex_init(env->mutex, NULL);
    return 1;
}

int cleanEnv(Environment* env){
    close(env->inputfds[0].fd);
    close(env->outputfds[0].fd);

    unlink(inputFIFO);
    unlink(outputFIFO);

    deleteRingBuffer(env->inputRB);
    deleteRingBuffer(env->outputRB);
    free(env->inputRB);
    free(env->outputRB);
    
    free(env->status);

    pthread_mutex_destroy(env->mutex);
    free(env->mutex);

    return 1;
}

Sorry if this post sucks. I can edit it if there is a better way to write it.

Also my valgrind posts this:

==1236== Memcheck, a memory error detector
==1236== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==1236== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==1236== Command: ./test
==1236== Parent PID: 1235
==1236== 
==1236== 
==1236== HEAP SUMMARY:
==1236==     in use at exit: 2,292 bytes in 10 blocks
==1236==   total heap usage: 20 allocs, 10 frees, 8,882 bytes allocated
==1236== 
==1236== 8 bytes in 1 blocks are still reachable in loss record 1 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x1096F4: startPiperThread (jobs.c:32)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 12 bytes in 1 blocks are indirectly lost in loss record 2 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A0D0: makeEnv (piper.c:51)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 24 bytes in 1 blocks are indirectly lost in loss record 3 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A18C: makeEnv (piper.c:90)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 24 bytes in 1 blocks are indirectly lost in loss record 4 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A1A1: makeEnv (piper.c:91)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 40 bytes in 1 blocks are indirectly lost in loss record 5 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A1E0: makeEnv (piper.c:95)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 40 bytes in 1 blocks are indirectly lost in loss record 6 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x109679: startPiperThread (jobs.c:20)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 64 bytes in 1 blocks are indirectly lost in loss record 7 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x109611: startPiperThread (jobs.c:10)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 1,024 bytes in 1 blocks are indirectly lost in loss record 8 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10AEF7: createRingBuffer (ring.c:14)
==1236==    by 0x10A1C1: makeEnv (piper.c:92)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 1,024 bytes in 1 blocks are indirectly lost in loss record 9 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10AEF7: createRingBuffer (ring.c:14)
==1236==    by 0x10A1D6: makeEnv (piper.c:93)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 2,284 (32 direct, 2,252 indirect) bytes in 1 blocks are definitely lost in loss record 10 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x1095F9: startPiperThread (jobs.c:6)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== LEAK SUMMARY:
==1236==    definitely lost: 32 bytes in 1 blocks
==1236==    indirectly lost: 2,252 bytes in 8 blocks
==1236==      possibly lost: 0 bytes in 0 blocks
==1236==    still reachable: 8 bytes in 1 blocks
==1236==         suppressed: 0 bytes in 0 blocks
==1236== 
==1236== For lists of detected and suppressed errors, rerun with: -s
==1236== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)

I'm not sure why my valgrind is spitting heap errors and I've been tracing my code left and right. I have some code, but I'm not sure if more code is needed.

My jobs.h

#include "piper.h"
#include "broadcast.h"
#include <pthread.h>    // for threads

/*
this depends on everything above so make sure you test with
piperTest first!
*/
/*
the starting and entry point for piper communicator
with its own initialization and free
*/

struct PiperThreadData{
    int running;
    pthread_mutex_t* flagMutex;
    Environment* e;
    pthread_t* piperJob;
};

struct PiperThreadData* startPiperThread();
int isRunning(struct PiperThreadData* tData);
void changeRunningState(int newState, struct PiperThreadData* tData);
void endPiperThread(struct PiperThreadData* tData);
/*
This is the main thread of the piper
*/
void *piperCommunicatorJob(void* tData);

My jobs.c

#include "jobs.h"

#include <stdlib.h>

struct PiperThreadData* startPiperThread(){
    struct PiperThreadData* tData = malloc(sizeof(struct PiperThreadData));
    tData->running = 1;
    //printf("creation pointer: %p\n", tData);

    tData->e = malloc(sizeof(Environment));
    int ret = makeEnv(tData->e);
    if(ret == -1){
        free(tData->e);
        free(tData);
        printErrorNicely("Creation of environment failed",
                            "startPiperThread() in piper.c",
                            1);
    }

    tData->flagMutex = malloc(sizeof(pthread_mutex_t));
    if (!tData->flagMutex) {
        free(tData->e);
        free(tData);
        printErrorNicely("Creation of flag mutex failed",
                            "startPiperThread() in piper.c",
                            1);
    }
    pthread_mutex_init(tData->flagMutex, NULL);

    tData->running = 1;

    tData->piperJob = malloc(sizeof(pthread_t));

    int rc = pthread_create(tData->piperJob, NULL, piperCommunicatorJob, tData);
    if (rc) {
        printf("ERROR; return code from pthread_create() is %d\n", rc);
        free(tData->piperJob);
        
        pthread_mutex_destroy(tData->flagMutex);
        free(tData->flagMutex);

        cleanEnv(tData->e);
        free(tData->e);

        free(tData);
        exit(-1);
    }

    return tData;
}

int isRunning(struct PiperThreadData* tData){
    pthread_mutex_lock(tData->flagMutex);
    int retVal = tData->running;
    pthread_mutex_unlock(tData->flagMutex);
    return retVal;
}

void changeRunningState(int newState, struct PiperThreadData* tData){
    pthread_mutex_lock(tData->flagMutex);
    tData->running = newState;
    pthread_mutex_unlock(tData->flagMutex);
}

void endPiperThread(struct PiperThreadData* tData){
    //printf("close pointer: %p\n", tData);
    pthread_mutex_lock(tData->flagMutex);
    tData->running = 0;
    pthread_mutex_unlock(tData->flagMutex);
    
    void* status; 
    printf("Trying to close thread...\n");
    pthread_join(*(tData->piperJob), &status);

    pthread_exit(tData->piperJob);

    pthread_mutex_destroy(tData->flagMutex);
    free(tData->flagMutex);

    cleanEnv(tData->e);
    free(tData->e);

    printf("Last free\n");
    free(tData);
}

void *piperCommunicatorJob(void* arg){
    struct PiperThreadData* tData = (struct PiperThreadData*)arg;
    //printf("thread pointer: %p\n", tData);
    printf("Environment status: %p\n", tData->e->status);

    int res;
    while(isRunning(tData)){
        printf("PIPER: checking status\n");
        res = checkStatus(tData->e);
        if(tData->e->status->clientOnTheOtherSide){
            res = subscribe(tData->e);
            if(res == NOCLIENT){
                printf("PIPER ERROR: get an error on subscribing\n");
                printf(">> in piperCommunicatorJob() thread\n");
            }
        }

        if(isClientInputPending(tData->e)){
            printf("PIPER: reading input...\n");
            unsigned int cRead = 0;
            res = readFIFO(&cRead, tData->e);
            if(res == 0){
                printf("PIPER ERROR: somehow poll timed out on the other side\n");
                printf(">> throw away that input\n");
                continue;
            }
            if(res == -1){
                printf("PIPER ERROR: too much stuff in our buffer\n");
                printf(">> throw away that input\n");
                continue;
            }
        }

        if(isServerOutputPending(tData->e)){
            printf("PIPER: writing...\n");
            res = writeToFIFO(PACKETSIZE, tData->e);
            if(res == 0){
                printf("PIPER ERROR: somehow we got a server pending, but nothing to output\n");
            }
        }
    }
    return NULL;
}

here is piper.h which contains the definition for Environment and Status

//Only once header
#ifndef PIPER
#define PIPER

#include "ring.h"
#include "merror.h"

#include <stdio.h>      // For printf(), fgets()
#include <string.h>     // For strlen(), strerror()
#include <fcntl.h>      // For open() and O_RDONLY, O_WRONLY
#include <sys/stat.h>   // For mkfifo() and file permission macros
#include <sys/types.h>  // For mkfifo() and data types like mode_t
#include <unistd.h>     // For read(), write(), close(), access()

#include <poll.h>       // For poll(), pollfd
#include <time.h>

#include <errno.h>    // For errno

#define TIMEOUT 10000
#define BUFFERSIZE 1024
#define PACKETSIZE 100

#define ENVINITIALIZED -9
#define ENVINITIALIZED_STR "Environment was already initialized"
#define ENVNOTINITIALIZED -10
#define ENVNOTINITIALIZED_STR "Environment was not initialized"
#define CLIENTCLOSED -11
#define CLIENTCLOSE_STR "Client closed the connection"
#define NOCLIENT -12
#define NOTCLIENT_STR "There is nobody on the listening side"

#define numInput 1
#define inputFIFO "/tmp/publisherInput.fifo"
#define numOutput 1
#define outputFIFO "/tmp/publisherOutput.fifo"

extern int killSwitch;

/*
checks if path exists and then deletes it if it does

because of race conditions, it's hard to make a function that just checks 
a path (it might be misunderstood and TOCTOU vulnerabilities might happen)

*/
int deletePathIfExists(const char* path);

/*
a data structure that represents the status of Environment
You can look at the variables to see what kind of statuses
are maintained
*/
struct Status{
    //Should be a binary where a client is ready to be read
    // but the client has not been "subscribed" yet
    int clientOnTheOtherSide;

    //Should be a binary indicating whether or not
    //a client has connected. Any instance where multiple 
    //connected should be an error and not permitted
    int clientConnected;

    //should be true once a client is verified. 
    //an unverified client should be kicked immediately
    int clientVerified;
};

/*
    A data structure that is just a collection of the state
of the fifos (whether they exist or not)
*/
struct Environment{
    int created;
    //int numConnected
    struct Status* status;
    struct pollfd inputfds[numInput], outputfds[numOutput];
    //this gotta be a pointer 
    // becuase it doesn't know the actual size
    struct RingBuffer *inputRB, *outputRB;

    //whether or not the client side put input
    int clientInputPending;
    //whether or not the server is ready to output something
    int serverOutputPending;


    //gotta make it thread safe?
    pthread_mutex_t* mutex;
};
typedef struct Environment Environment;

/*
checks the existence of input/output fifos. if it exists then it
makes the two input/output FIFOS:
    - one called "input"
    - one called "output"
then it returns the filedescriptors of both

"You cannot open a FIFO for writing without blocking unless a reader 
already has the FIFO open. The simplest fix is just to wait a second 
and repeat the open if it returns ENXIO. Repeat until the other side 
is running."

this means the filedescriptor of write fifo cannot be opened
- it does mean it can be made
- it also does mean that it can only be read once a connection
has been established to this system
(creating further really annoying complications >w<)
    -=> checkout subscribe() function

returns 1 on success
returns ENVINITIALIZED if environment was already initialized
returns -1 on any failure
*/
int makeEnv(Environment* env);

/*
this changes the status of env in order 
to allow write calls to FIFO

should be called when someone connects to the input FIFO
- BUT the only way to check if someone connected to THAT is 
by poll() and THAT is only called in 
    - readFIFO
    - checkStatus
---> SOOO
The solution is to make entrust this function call to
the user. If the user calls check status and finds that
a client connected. Then it calls subscribe (or disconnected,
unsubscribe)
But that implementation is left to the user, it's not
listed here

the read fifo
returns 1 on a successful connect
returns ENVNOTINITIALIZED if env is not initialized
returns NOCLIENT if there is nobody to connect
returns -1 on a "nobody is listening"
*/
int subscribe(Environment* env);

/*
decreases the numberConnected

returns 1 on success
returns -1 if nobody else there (but doesnt crash)
*/
int unsubscribe(Environment* env);

/*
updates the status field in the environment
- status is updated via poll function

Note that this does NOT verify the connection
and change the status of verifiedConnection

returns 1 if there is someone on the other side
    (i.e. env is initialized, someone is on the other side) 
returns -1 if any fail
*/
int checkStatus(Environment* env);

/*
read BUFFERSIZE (100) characters off the contents of input FIFO
and changes the buffer to reflect the values

note that buffer becomes overwritten, 
even on certain errors

also note,
the his function hangs until it receives something from the fifo

also also note,
the client side may actually crash on their end while 
fifo is reading
there is a special errno that happens that must be accounted for

returns 1 on success 
return ENVNOTINITIALIZED when call read when env was not initialized
returns 0 on a poll timeout
returns -1 if our wanted read size exceeds the remaining 
size in ring buffer
*/
int readFIFO(unsigned int* charsRead, Environment* env);

/*
writes numChars characters onto the FIFO. Then it fills the
remainder of the buffer with space characters

returns 1 on success
return ENVNOTINITIALIZED when call read when env was not initialized
returns 0 if outputRB was empty
returns -1 if the outputRB pops started to exceed its own size (non-0)
    (which implies the outputRBs started to stop being multiples of 100)
*/
int writeToFIFO(unsigned int numChars, Environment* env);

/*
essentially extracts a string from the inputRingBuffer

note that there is not null-byte received. Every
packet should be exactly 100 bytes

This is where this fits in the picture:
=> INPUT FIFO>RINGBUFFER => *[READ RINGBUFFER]* => 
    PROCESSES THE QUERY
=> WRITE TO RINGBUFFER => RINGBUFFER>OUTPUT FIFO

returns 1;
returns ENVNOTINITIALIZED if env is not initialized
returns 0 if inputRingBuffer is empty
returns -1 in a weird case where inputRingBuffer pop
    exceeded is size (even though we checked in code)
*/
int getQuery(char* buffer, Environment* env);

/*
essentially pushes a string into the outputRingBuffer 

This is where this fits in the picture:
=> INPUT FIFO>RINGBUFFER => READ RINGBUFFER => 
    PROCESSES THE QUERY
=> *[WRITE TO RINGBUFFER]* => RINGBUFFER>OUTPUT FIFO

returns 1
returns ENVNOTINITIIALIZED if env is not initialized
returns 0 if we want to send a response
    but our outputRingBuffer is too small
*/
int sendResponse(char* response, Environment* env);

int isClientInputPending(Environment* env);
int isServerOutputPending(Environment* env);

/*
it looks at fd to see if it's an open fd
    if it is, then it calls close
it looks at the directory to see if inputfifo, outputfifo exists
    if it exists it is deleted

**Don't fet to free the RingBuffer 
*/
int cleanEnv(Environment* env);

/*
prints the status
*/
void printStatus(struct Status* s);

/*
TODO...
This one requires a bit of research before implementation
because we don't know good ways to verify connection that
provide security

however, verification will NOT be required for the other 
functions to work. This will have to be a check by the user
*/
int verifyClient(Environment* env); // this one requires research


#endif 

And also the makeEnv() and cleanEnv() should be handling the Environment initialization

int makeEnv(Environment* env){
    /*
    //some defensive programming
    if(env->created == 1){
        printErrorNicely("Environment seems to be created already. Did you call this twice?", "makeEnv()", 0);
        return ENVINITIALIZED;
    }
    */
    env->created = 1;
    env->clientInputPending = 0;
    env->serverOutputPending = 0;

    //printf("Creating FIFOS\n");

    //giving these fifos read and write permissions to everybody 
    // (not execute permissions of course)
    int res = mkfifo(inputFIFO, 0666);
    if(res == -1){
        printErrorNicely("Trying to create input fifo", "makeEnv()", 0);
    }

    res = mkfifo(outputFIFO, 0666);
    if(res == -1){
        printErrorNicely("Trying to create output fifo", "makeEnv()", 0);
    }

    //printf("FIFOS made\n");

    env->created = 1;

    env->status = malloc(sizeof(struct Status));
    env->status->clientOnTheOtherSide = 0;
    env->status->clientConnected = 0;
    env->status->clientVerified = 0;

    int inputfdInt = open(inputFIFO, O_RDONLY | O_NONBLOCK);
    if(inputfdInt == -1){
        printErrorNicely("Trying to get file descriptor of input fifo with open()", "makeEnv()", 0);
        return -1;
    }

    /*
    "You cannot open a FIFO for writing without blocking unless a reader 
    already has the FIFO open. The simplest fix is just to wait a second 
    and repeat the open if it returns ENXIO. Repeat until the other side 
    is running."
    */
    /*
    int outputfdInt = open(outputFIFO, O_WRONLY | O_NONBLOCK);
    if(outputfdInt == -1){
        printErrorNicely("Trying to get file descriptor of output fifo with open()", "makeEnv()", 0);
        return -1;
    }
    */

    //printf("Environment set. Setting vars\n");

    struct pollfd inputfd;
    inputfd.fd = inputfdInt;
    inputfd.events = POLLIN;
    env->inputfds[0] = inputfd;

    struct pollfd outputfd;
    outputfd.fd = -1;
    outputfd.events = POLLOUT;
    env->outputfds[0] = outputfd;

    //printf("Setting up buffers\n");

    env->inputRB = malloc(sizeof(struct RingBuffer));
    env->outputRB = malloc(sizeof(struct RingBuffer));
    createRingBuffer(env->inputRB, BUFFERSIZE);
    createRingBuffer(env->outputRB, BUFFERSIZE);

    env->mutex = malloc(sizeof(pthread_mutex_t));
    pthread_mutex_init(env->mutex, NULL);
    return 1;
}

int cleanEnv(Environment* env){
    close(env->inputfds[0].fd);
    close(env->outputfds[0].fd);

    unlink(inputFIFO);
    unlink(outputFIFO);

    deleteRingBuffer(env->inputRB);
    deleteRingBuffer(env->outputRB);
    free(env->inputRB);
    free(env->outputRB);
    
    free(env->status);

    pthread_mutex_destroy(env->mutex);
    free(env->mutex);

    return 1;
}

Sorry if this post sucks. I can edit it if there is a better way to write it.

Also my valgrind posts this:

==1236== Memcheck, a memory error detector
==1236== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==1236== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==1236== Command: ./test
==1236== Parent PID: 1235
==1236== 
==1236== 
==1236== HEAP SUMMARY:
==1236==     in use at exit: 2,292 bytes in 10 blocks
==1236==   total heap usage: 20 allocs, 10 frees, 8,882 bytes allocated
==1236== 
==1236== 8 bytes in 1 blocks are still reachable in loss record 1 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x1096F4: startPiperThread (jobs.c:32)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 12 bytes in 1 blocks are indirectly lost in loss record 2 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A0D0: makeEnv (piper.c:51)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 24 bytes in 1 blocks are indirectly lost in loss record 3 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A18C: makeEnv (piper.c:90)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 24 bytes in 1 blocks are indirectly lost in loss record 4 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A1A1: makeEnv (piper.c:91)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 40 bytes in 1 blocks are indirectly lost in loss record 5 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10A1E0: makeEnv (piper.c:95)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 40 bytes in 1 blocks are indirectly lost in loss record 6 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x109679: startPiperThread (jobs.c:20)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 64 bytes in 1 blocks are indirectly lost in loss record 7 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x109611: startPiperThread (jobs.c:10)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 1,024 bytes in 1 blocks are indirectly lost in loss record 8 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10AEF7: createRingBuffer (ring.c:14)
==1236==    by 0x10A1C1: makeEnv (piper.c:92)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 1,024 bytes in 1 blocks are indirectly lost in loss record 9 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x10AEF7: createRingBuffer (ring.c:14)
==1236==    by 0x10A1D6: makeEnv (piper.c:93)
==1236==    by 0x10962C: startPiperThread (jobs.c:11)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== 2,284 (32 direct, 2,252 indirect) bytes in 1 blocks are definitely lost in loss record 10 of 10
==1236==    at 0x4846828: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==1236==    by 0x1095F9: startPiperThread (jobs.c:6)
==1236==    by 0x1094CD: piperJobTest (jobsTester.c:8)
==1236==    by 0x1095DC: main (jobsTester.c:30)
==1236== 
==1236== LEAK SUMMARY:
==1236==    definitely lost: 32 bytes in 1 blocks
==1236==    indirectly lost: 2,252 bytes in 8 blocks
==1236==      possibly lost: 0 bytes in 0 blocks
==1236==    still reachable: 8 bytes in 1 blocks
==1236==         suppressed: 0 bytes in 0 blocks
==1236== 
==1236== For lists of detected and suppressed errors, rerun with: -s
==1236== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)
Share Improve this question asked Apr 2 at 3:46 minyoung heominyoung heo 412 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

congrats on complete/thorough ness.

In endPiperThread +11, is that pthread_exit() supposed to be there? The lines following it won't execute.

发布评论

评论列表(0)

  1. 暂无评论