From 26213120e3aa7a8c0fba53170cc798571b9295b7 Mon Sep 17 00:00:00 2001 From: Simeon David Schaub Date: Tue, 31 Oct 2023 13:42:08 +0100 Subject: [PATCH] allow initiating peer to close stream gracefully I have a usecase where I would like to remove a worker without killing it. Currently, trying to disconnect from the head node will cause the message handler loop to throw a fatal exception, so this adds a check that the connection is still open when trying to read new messages. --- src/process_messages.jl | 2 +- test/persistent_workers.jl | 44 +++++++++++++++++ test/runtests.jl | 2 + test/testhelpers/PersistentWorkers.jl | 70 +++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 test/persistent_workers.jl create mode 100644 test/testhelpers/PersistentWorkers.jl diff --git a/src/process_messages.jl b/src/process_messages.jl index 211c225..59a5ab4 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -167,7 +167,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) readbytes!(r_stream, boundary, length(MSG_BOUNDARY)) - while true + while !(incoming && eof(r_stream)) reset_state(serializer) header = deserialize_hdr_raw(r_stream) # println("header: ", header) diff --git a/test/persistent_workers.jl b/test/persistent_workers.jl new file mode 100644 index 0000000..fd5c058 --- /dev/null +++ b/test/persistent_workers.jl @@ -0,0 +1,44 @@ +include("testhelpers/PersistentWorkers.jl") +using .PersistentWorkers +using Test +using Random +using DistributedNext + +@testset "PersistentWorkers.jl" begin + cookie = randstring(16) + port = rand(9128:9999) # TODO: make sure port is available? + helpers_path = joinpath(@__DIR__, "testhelpers", "PersistentWorkers.jl") + cmd = `$(Base.julia_exename()) --startup=no --project=$(Base.active_project()) -L $(helpers_path) -e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"` + worker = run(pipeline(cmd; stdout, stderr); wait=false) + try + @show worker.cmd + cluster_cookie(cookie) + sleep(10) + + p = addprocs(PersistentWorkerManager(port))[] + @test procs() == [1, p] + @test workers() == [p] + @test remotecall_fetch(myid, p) == p + rmprocs(p) + @test procs() == [1] + @test workers() == [1] + @test process_running(worker) + # this shouldn't error + @everywhere 1+1 + + # try the same thing again for the same worker + p = addprocs(PersistentWorkerManager(port))[] + @test procs() == [1, p] + @test workers() == [p] + @test remotecall_fetch(myid, p) == p + rmprocs(p) + @test procs() == [1] + @test workers() == [1] + @test process_running(worker) + # this shouldn't error + @everywhere 1+1 + finally + kill(worker) + wait(worker) + end +end diff --git a/test/runtests.jl b/test/runtests.jl index ab596e9..956aa49 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -34,6 +34,8 @@ include("managers.jl") include("distributed_stdlib_detection.jl") +include("persistent_workers.jl") + @testset "Aqua" begin Aqua.test_all(DistributedNext) end diff --git a/test/testhelpers/PersistentWorkers.jl b/test/testhelpers/PersistentWorkers.jl new file mode 100644 index 0000000..c79f7bd --- /dev/null +++ b/test/testhelpers/PersistentWorkers.jl @@ -0,0 +1,70 @@ +module PersistentWorkers + +using DistributedNext: DistributedNext, ClusterManager, WorkerConfig, worker_from_id, set_worker_state, W_TERMINATED +using Sockets: InetAddr, localhost + +export PersistentWorkerManager, start_worker_loop + +struct PersistentWorkerManager{IP} <: ClusterManager + addr::InetAddr{IP} +end + +PersistentWorkerManager(host, port::Integer) = PersistentWorkerManager(InetAddr(host, port)) +PersistentWorkerManager(port::Integer) = PersistentWorkerManager(localhost, port) + +function DistributedNext.launch(cm::PersistentWorkerManager, ::Dict, launched::Array, launch_ntfy::Base.GenericCondition{Base.AlwaysLockedST}) + (; host, port) = cm.addr + wc = WorkerConfig() + wc.io = nothing + wc.host = string(host) + wc.bind_addr = string(host) + wc.port = Int(port) + push!(launched, wc) + notify(launch_ntfy) + return nothing +end + +function DistributedNext.manage(::PersistentWorkerManager, ::Int, ::WorkerConfig, ::Symbol) end + +# don't actually kill the worker, just close the streams +function Base.kill(::PersistentWorkerManager, pid::Int, ::WorkerConfig) + w = worker_from_id(pid) + close(w.r_stream) + close(w.w_stream) + set_worker_state(w, W_TERMINATED) + return nothing +end + +using DistributedNext: LPROC, init_worker, process_messages, cluster_cookie +using Sockets: IPAddr, listen, listenany, accept + +function start_worker_loop(host::IPAddr, port::Union{Nothing, Integer}; cluster_cookie=cluster_cookie()) + init_worker(cluster_cookie) + LPROC.bind_addr = string(host) + if port === nothing + port_hint = 9000 + (getpid() % 1000) + port, sock = listenany(host, UInt16(port_hint)) + else + sock = listen(host, port) + end + LPROC.bind_port = port + t = let sock=sock + @async while isopen(sock) + client = accept(sock) + process_messages(client, client, true) + end + end + errormonitor(t) + @info "Listening on $host:$port, cluster_cookie=$cluster_cookie" + return t, host, port +end + +function start_worker_loop((; host, port)::InetAddr; cluster_cookie=cluster_cookie()) + return start_worker_loop(host, port; cluster_cookie) +end + +function start_worker_loop(port::Union{Nothing, Integer}=nothing; cluster_cookie=cluster_cookie()) + return start_worker_loop(localhost, port; cluster_cookie) +end + +end