Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/boost/corosio/native/detail/kqueue/kqueue_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ struct kqueue_connect_op final : kqueue_op
socklen_t len = sizeof(err);
if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
err = errno;
// Guard against stale EVFILT_WRITE events from socket creation.
// kqueue registers EVFILT_WRITE at open() time; a freshly created
// socket is "writable" so the filter fires immediately. If the
// reactor processes this event after connect() returns EINPROGRESS
// but before the kernel delivers the connect result (e.g. RST for
// ECONNREFUSED), SO_ERROR is still 0. Use getpeername() to verify
// the connection is actually established.
if (err == 0)
{
sockaddr_storage peer{};
socklen_t peer_len = sizeof(peer);
if (::getpeername(
fd, reinterpret_cast<sockaddr*>(&peer), &peer_len) < 0)
err = (errno == ENOTCONN) ? EAGAIN : errno;
}
complete(err, 0);
}

Expand Down
38 changes: 35 additions & 3 deletions include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,16 @@ descriptor_state::operator()()
cn->complete(err, 0);
else
cn->perform_io();
local_ops.push(cn);
cn = nullptr;

if (cn->errn == EAGAIN || cn->errn == EWOULDBLOCK)
{
cn->errn = 0;
}
else
{
local_ops.push(cn);
cn = nullptr;
}
}

if (wr)
Expand All @@ -630,7 +638,7 @@ descriptor_state::operator()()
// have set read_ready/write_ready while we held the op (no read_op
// was registered, so it cached the edge event). Check the flags
// under the same lock as re-registration so no edge is lost.
while (rd || wr)
while (rd || wr || cn)
{
bool retry = false;
{
Expand Down Expand Up @@ -661,6 +669,19 @@ descriptor_state::operator()()
wr = nullptr;
}
}
if (cn)
{
if (write_ready)
{
write_ready = false;
retry = true;
}
else
{
connect_op = cn;
cn = nullptr;
}
}
}

if (!retry)
Expand Down Expand Up @@ -688,6 +709,17 @@ descriptor_state::operator()()
wr = nullptr;
}
}
if (cn)
{
cn->perform_io();
if (cn->errn == EAGAIN || cn->errn == EWOULDBLOCK)
cn->errn = 0;
else
{
local_ops.push(cn);
cn = nullptr;
}
}
}

// Execute first handler inline — the scheduler's work_cleanup
Expand Down
Loading