最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - forcing completion of an rxjs observer - Stack Overflow

programmeradmin1浏览0评论

I've got an rxjs observer (really a Subject) that tails a file forever, just like tail -f. It's awesome for monitoring logfiles, for example.

This "forever" behavior is great for my application, but terrible for testing. Currently my application works but my tests hang forever.

I'd like to force an observer change to plete early, because my test code knows how many lines should be in the file. How do I do this?

I tried calling onCompleted on the Subject handle I returned but at that point it's basically cast as an observer and you can't force it to close, the error is:

Object # has no method 'onCompleted'

Here's the source code:

function ObserveTail(filename) {

source = new Rx.Subject();

if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}

var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);

tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     

this.source = source;
}           

And here's the test code that can't figure out how to force forever to end (tape style test). Note the "ILLEGAL" line:

test('tailing a file works correctly', function(tid) {

var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);

handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

})

I've got an rxjs observer (really a Subject) that tails a file forever, just like tail -f. It's awesome for monitoring logfiles, for example.

This "forever" behavior is great for my application, but terrible for testing. Currently my application works but my tests hang forever.

I'd like to force an observer change to plete early, because my test code knows how many lines should be in the file. How do I do this?

I tried calling onCompleted on the Subject handle I returned but at that point it's basically cast as an observer and you can't force it to close, the error is:

Object # has no method 'onCompleted'

Here's the source code:

function ObserveTail(filename) {

source = new Rx.Subject();

if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}

var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);

tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     

this.source = source;
}           

And here's the test code that can't figure out how to force forever to end (tape style test). Note the "ILLEGAL" line:

test('tailing a file works correctly', function(tid) {

var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);

handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

})
Share Improve this question edited Jan 28, 2016 at 2:46 Paul S asked Jan 28, 2016 at 2:36 Paul SPaul S 9721 gold badge13 silver badges27 bronze badges 7
  • Use your favorite debugger and have a look at handle. You should find out your mistake. The more time you spend debugging by yourself and the better you bee at debugging and it is a really valuable skill. Otherwise, javascript is not a typed language which a shame, so you have to pay attention to the objects that you pass from one place to another, as there will be no warning from the piler when you use them incorrectly. It takes some practice and a lot of mistakes to get used to it so the more experience the better. – user3743222 Commented Jan 28, 2016 at 3:13
  • but is this the correct pattern for solving this problem? – Paul S Commented Jan 28, 2016 at 3:31
  • Okay, it's handle.source.onCompleted(). Wish I could get the exception handler in javascript to pletely print out the object in question by default. – Paul S Commented Jan 28, 2016 at 3:38
  • But the overall question still stands: Is this the correct pattern for "tell the upstream source to plete" in RXJS? I have another upstream source that's on a setTimeout loop (polls a website), and I have to set a magic boolean to tell it to exit the loop.. I extended the Subject to do that but that's probably bad hackery... The reason I'm posting here is there are a dearth of good real world examples using RXJS, hoping this will help others. – Paul S Commented Jan 28, 2016 at 3:40
  • I don't understand your question. In any case, there are two conditions under which a subject will plete: 1. if you call onCompleted yourself, 2. if the subject is used as an observer, and the source of that observer pletes. Use whichever is more appropriate. – user3743222 Commented Jan 28, 2016 at 13:30
 |  Show 2 more ments

1 Answer 1

Reset to default 5

It sounds like you solved your problem, but to your original question

I'd like to force an observer change to plete early, because my test code knows how many lines should be in the file. How do I do this?

In general the use of Subjects is discouraged when you have better alternatives, since they tend to be a crutch for people to use programming styles they are familiar with. Instead of trying to use a Subject I would suggest that you think about what each event would mean in an Observable life cycles.

Wrap Event Emitters

There already exists wrapper for the EventEmitter#on/off pattern in the form of Observable.fromEvent. It handles clean up and keeping the subscription alive only when there are listeners. Thus ObserveTail can be refactored into

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var close = Rx.Observable.fromEvent(tail, "close");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line.takeUntil(close).merge(error).subscribe(observer);
  });
} 

Which has several benefits over the vanilla use of Subjects, one, you will now actually see the error downstream, and two, this will handle clean up of your events when you are done with them.

Avoid *Sync Methods

Then this can be rolled into your file existence checking without the use of readSync

//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
    .filter(function(exists) { return exists; })
    .flatMap(ObserveTail(filename));

Next you can simplify your filter/map/map sequence down by using flatMap instead.

var result = source.flatMap(function(x) {
  try {
    return Rx.Observable.just(JSON.parse(x));
  } catch (e) {
    return Rx.Observable.empty();
  }
}, 
//This allows you to map the result of the parsed value
function(x, json) {
  return json.name;
})
.timeout(10000, "observer timed out");

Don't signal, unsubscribe

How do you stop "signal" a stop when streams only travel in one direction. We rarely actually want to have an Observer directly municate with an Observable, so a better pattern is to not actually "signal" a stop but to simply unsubscribe from the Observable and leave it up to the Observable's behavior to determine what it should do from there.

Essentially your Observer really shouldn't care about your Observable more than to say "I'm done here".

To do that you need to declare a condition you want to reach in when stopping.

In this case since you are simply stopping after a set number in your test case you can use take to unsubscribe. Thus the final subscribe block would look like:

result
 //After lines is reached this will plete.
 .take(lines)
 .subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

Edit 1

As pointed out in the ments, In the case of this particular api there isn't a real "close" event since Tail is essentially an infinite operation. In this sense it is no different from a mouse event handler, we will stop sending events when people stop listening. So your block would probably end up looking like:

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line
            .finally(function() {  tail.unwatch(); })
            .merge(error).subscribe(observer);
  }).share();
} 

The addition of the finally and the share operators creates an object which will attach to the tail when a new subscriber arrives and will remain attached as long as there is at least one subscriber still listening. Once all the subscribers are done however we can safely unwatch the tail.

发布评论

评论列表(0)

  1. 暂无评论