mirror of
https://git.libssh.org/projects/libssh.git
synced 2025-07-31 00:03:07 +03:00
Fixed outgoing flow control + writes behaviours
This commit is contained in:
@ -9,7 +9,7 @@ int main(void) {
|
|||||||
char buffer[1024*1024];
|
char buffer[1024*1024];
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
session = connect_ssh("172.16.104.134", NULL, 0);
|
session = connect_ssh("localhost", NULL, 0);
|
||||||
if (session == NULL) {
|
if (session == NULL) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -48,8 +48,8 @@ void ssh_socket_fd_set(ssh_socket s, fd_set *set, int *fd_max);
|
|||||||
void ssh_socket_set_fd_in(ssh_socket s, socket_t fd);
|
void ssh_socket_set_fd_in(ssh_socket s, socket_t fd);
|
||||||
void ssh_socket_set_fd_out(ssh_socket s, socket_t fd);
|
void ssh_socket_set_fd_out(ssh_socket s, socket_t fd);
|
||||||
int ssh_socket_nonblocking_flush(ssh_socket s);
|
int ssh_socket_nonblocking_flush(ssh_socket s);
|
||||||
void ssh_socket_set_towrite(ssh_socket s);
|
void ssh_socket_set_write_wontblock(ssh_socket s);
|
||||||
void ssh_socket_set_toread(ssh_socket s);
|
void ssh_socket_set_read_wontblock(ssh_socket s);
|
||||||
void ssh_socket_set_except(ssh_socket s);
|
void ssh_socket_set_except(ssh_socket s);
|
||||||
int ssh_socket_get_status(ssh_socket s);
|
int ssh_socket_get_status(ssh_socket s);
|
||||||
int ssh_socket_data_available(ssh_socket s);
|
int ssh_socket_data_available(ssh_socket s);
|
||||||
|
@ -2469,13 +2469,13 @@ int ssh_channel_select(ssh_channel *readchans, ssh_channel *writechans,
|
|||||||
|
|
||||||
for (i = 0; readchans[i] != NULL; i++) {
|
for (i = 0; readchans[i] != NULL; i++) {
|
||||||
if (ssh_socket_fd_isset(readchans[i]->session->socket, &rset)) {
|
if (ssh_socket_fd_isset(readchans[i]->session->socket, &rset)) {
|
||||||
ssh_socket_set_toread(readchans[i]->session->socket);
|
ssh_socket_set_read_wontblock(readchans[i]->session->socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = 0; writechans[i] != NULL; i++) {
|
for (i = 0; writechans[i] != NULL; i++) {
|
||||||
if (ssh_socket_fd_isset(writechans[i]->session->socket, &wset)) {
|
if (ssh_socket_fd_isset(writechans[i]->session->socket, &wset)) {
|
||||||
ssh_socket_set_towrite(writechans[i]->session->socket);
|
ssh_socket_set_write_wontblock(writechans[i]->session->socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,9 +216,6 @@ int ssh_send_banner(ssh_session session, int server) {
|
|||||||
if (ssh_socket_write(session->socket, buffer, strlen(buffer)) == SSH_ERROR) {
|
if (ssh_socket_write(session->socket, buffer, strlen(buffer)) == SSH_ERROR) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (ssh_socket_nonblocking_flush(session->socket) == SSH_ERROR){
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
#ifdef WITH_PCAP
|
#ifdef WITH_PCAP
|
||||||
if(session->pcap_ctx)
|
if(session->pcap_ctx)
|
||||||
ssh_pcap_context_write(session->pcap_ctx,SSH_PCAP_DIR_OUT,buffer,strlen(buffer),strlen(buffer));
|
ssh_pcap_context_write(session->pcap_ctx,SSH_PCAP_DIR_OUT,buffer,strlen(buffer),strlen(buffer));
|
||||||
|
@ -565,7 +565,7 @@ int ssh_select(ssh_channel *channels, ssh_channel *outchannels, socket_t maxfd,
|
|||||||
for (i = 0; channels[i]; i++) {
|
for (i = 0; channels[i]; i++) {
|
||||||
if (channels[i]->session->alive &&
|
if (channels[i]->session->alive &&
|
||||||
ssh_socket_fd_isset(channels[i]->session->socket,&localset)) {
|
ssh_socket_fd_isset(channels[i]->session->socket,&localset)) {
|
||||||
ssh_socket_set_toread(channels[i]->session->socket);
|
ssh_socket_set_read_wontblock(channels[i]->session->socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,9 +433,6 @@ static int ssh_packet_write(ssh_session session) {
|
|||||||
rc=ssh_socket_write(session->socket,
|
rc=ssh_socket_write(session->socket,
|
||||||
ssh_buffer_get_begin(session->out_buffer),
|
ssh_buffer_get_begin(session->out_buffer),
|
||||||
ssh_buffer_get_len(session->out_buffer));
|
ssh_buffer_get_len(session->out_buffer));
|
||||||
if(rc == SSH_OK){
|
|
||||||
rc=ssh_socket_nonblocking_flush(session->socket);
|
|
||||||
}
|
|
||||||
leave_function();
|
leave_function();
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
@ -308,7 +308,7 @@ void ssh_set_fd_toread(ssh_session session) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ssh_socket_set_toread(session->socket);
|
ssh_socket_set_read_wontblock(session->socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -321,7 +321,7 @@ void ssh_set_fd_towrite(ssh_session session) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ssh_socket_set_towrite(session->socket);
|
ssh_socket_set_write_wontblock(session->socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -362,7 +362,7 @@ int ssh_handle_packets(ssh_session session, int timeout) {
|
|||||||
enter_function();
|
enter_function();
|
||||||
spoll_in=ssh_socket_get_poll_handle_in(session->socket);
|
spoll_in=ssh_socket_get_poll_handle_in(session->socket);
|
||||||
spoll_out=ssh_socket_get_poll_handle_out(session->socket);
|
spoll_out=ssh_socket_get_poll_handle_out(session->socket);
|
||||||
ssh_poll_set_events(spoll_in, POLLIN | POLLERR);
|
ssh_poll_add_events(spoll_in, POLLIN | POLLERR);
|
||||||
ctx=ssh_poll_get_ctx(spoll_in);
|
ctx=ssh_poll_get_ctx(spoll_in);
|
||||||
if(ctx==NULL){
|
if(ctx==NULL){
|
||||||
ctx=ssh_get_global_poll_ctx(session);
|
ctx=ssh_get_global_poll_ctx(session);
|
||||||
|
52
src/socket.c
52
src/socket.c
@ -81,9 +81,9 @@ struct ssh_socket_struct {
|
|||||||
socket_t fd_out;
|
socket_t fd_out;
|
||||||
int fd_is_socket;
|
int fd_is_socket;
|
||||||
int last_errno;
|
int last_errno;
|
||||||
int data_to_read; /* reading now on socket will
|
int read_wontblock; /* reading now on socket will
|
||||||
not block */
|
not block */
|
||||||
int data_to_write;
|
int write_wontblock;
|
||||||
int data_except;
|
int data_except;
|
||||||
enum ssh_socket_states_e state;
|
enum ssh_socket_states_e state;
|
||||||
ssh_buffer out_buffer;
|
ssh_buffer out_buffer;
|
||||||
@ -152,8 +152,8 @@ ssh_socket ssh_socket_new(ssh_session session) {
|
|||||||
SAFE_FREE(s);
|
SAFE_FREE(s);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
s->data_to_read = 0;
|
s->read_wontblock = 0;
|
||||||
s->data_to_write = 0;
|
s->write_wontblock = 0;
|
||||||
s->data_except = 0;
|
s->data_except = 0;
|
||||||
s->poll_in=s->poll_out=NULL;
|
s->poll_in=s->poll_out=NULL;
|
||||||
s->state=SSH_SOCKET_NONE;
|
s->state=SSH_SOCKET_NONE;
|
||||||
@ -175,7 +175,7 @@ void ssh_socket_set_callbacks(ssh_socket s, ssh_socket_callbacks callbacks){
|
|||||||
int ssh_socket_pollcallback(struct ssh_poll_handle_struct *p, socket_t fd, int revents, void *v_s){
|
int ssh_socket_pollcallback(struct ssh_poll_handle_struct *p, socket_t fd, int revents, void *v_s){
|
||||||
ssh_socket s=(ssh_socket )v_s;
|
ssh_socket s=(ssh_socket )v_s;
|
||||||
char buffer[4096];
|
char buffer[4096];
|
||||||
int r,w;
|
int r;
|
||||||
int err=0;
|
int err=0;
|
||||||
socklen_t errlen=sizeof(err);
|
socklen_t errlen=sizeof(err);
|
||||||
/* Do not do anything if this socket was already closed */
|
/* Do not do anything if this socket was already closed */
|
||||||
@ -199,7 +199,7 @@ int ssh_socket_pollcallback(struct ssh_poll_handle_struct *p, socket_t fd, int r
|
|||||||
revents |= POLLIN;
|
revents |= POLLIN;
|
||||||
}
|
}
|
||||||
if(revents & POLLIN){
|
if(revents & POLLIN){
|
||||||
s->data_to_read=1;
|
s->read_wontblock=1;
|
||||||
r=ssh_socket_unbuffered_read(s,buffer,sizeof(buffer));
|
r=ssh_socket_unbuffered_read(s,buffer,sizeof(buffer));
|
||||||
if(r<0){
|
if(r<0){
|
||||||
err=-1;
|
err=-1;
|
||||||
@ -246,18 +246,16 @@ int ssh_socket_pollcallback(struct ssh_poll_handle_struct *p, socket_t fd, int r
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
/* So, we can write data */
|
/* So, we can write data */
|
||||||
s->data_to_write=1;
|
s->write_wontblock=1;
|
||||||
|
ssh_poll_remove_events(p,POLLOUT);
|
||||||
|
|
||||||
/* If buffered data is pending, write it */
|
/* If buffered data is pending, write it */
|
||||||
if(buffer_get_rest_len(s->out_buffer) > 0){
|
if(buffer_get_rest_len(s->out_buffer) > 0){
|
||||||
w=ssh_socket_unbuffered_write(s, buffer_get_rest(s->out_buffer),
|
ssh_socket_nonblocking_flush(s);
|
||||||
buffer_get_rest_len(s->out_buffer));
|
|
||||||
if(w>0)
|
|
||||||
buffer_pass_bytes(s->out_buffer,w);
|
|
||||||
} else if(s->callbacks && s->callbacks->controlflow){
|
} else if(s->callbacks && s->callbacks->controlflow){
|
||||||
/* Otherwise advertise the upper level that write can be done */
|
/* Otherwise advertise the upper level that write can be done */
|
||||||
s->callbacks->controlflow(SSH_SOCKET_FLOW_WRITEWONTBLOCK,s->callbacks->userdata);
|
s->callbacks->controlflow(SSH_SOCKET_FLOW_WRITEWONTBLOCK,s->callbacks->userdata);
|
||||||
}
|
}
|
||||||
ssh_poll_remove_events(p,POLLOUT);
|
|
||||||
/* TODO: Find a way to put back POLLOUT when buffering occurs */
|
/* TODO: Find a way to put back POLLOUT when buffering occurs */
|
||||||
}
|
}
|
||||||
return err;
|
return err;
|
||||||
@ -432,7 +430,7 @@ static int ssh_socket_unbuffered_read(ssh_socket s, void *buffer, uint32_t len)
|
|||||||
#else
|
#else
|
||||||
s->last_errno = errno;
|
s->last_errno = errno;
|
||||||
#endif
|
#endif
|
||||||
s->data_to_read = 0;
|
s->read_wontblock = 0;
|
||||||
|
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
s->data_except = 1;
|
s->data_except = 1;
|
||||||
@ -460,7 +458,7 @@ static int ssh_socket_unbuffered_write(ssh_socket s, const void *buffer,
|
|||||||
#else
|
#else
|
||||||
s->last_errno = errno;
|
s->last_errno = errno;
|
||||||
#endif
|
#endif
|
||||||
s->data_to_write = 0;
|
s->write_wontblock = 0;
|
||||||
/* Reactive the POLLOUT detector in the poll multiplexer system */
|
/* Reactive the POLLOUT detector in the poll multiplexer system */
|
||||||
if(s->poll_out){
|
if(s->poll_out){
|
||||||
ssh_log(s->session, SSH_LOG_PACKET, "Enabling POLLOUT for socket");
|
ssh_log(s->session, SSH_LOG_PACKET, "Enabling POLLOUT for socket");
|
||||||
@ -519,7 +517,7 @@ int ssh_socket_write(ssh_socket s, const void *buffer, int len) {
|
|||||||
if (buffer_add_data(s->out_buffer, buffer, len) < 0) {
|
if (buffer_add_data(s->out_buffer, buffer, len) < 0) {
|
||||||
return SSH_ERROR;
|
return SSH_ERROR;
|
||||||
}
|
}
|
||||||
ssh_socket_set_towrite(s);
|
ssh_socket_nonblocking_flush(s);
|
||||||
}
|
}
|
||||||
leave_function();
|
leave_function();
|
||||||
return SSH_OK;
|
return SSH_OK;
|
||||||
@ -549,7 +547,13 @@ int ssh_socket_nonblocking_flush(ssh_socket s) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
len = buffer_get_rest_len(s->out_buffer);
|
len = buffer_get_rest_len(s->out_buffer);
|
||||||
if (s->data_to_write && len > 0) {
|
if (!s->write_wontblock && s->poll_out && len > 0) {
|
||||||
|
/* force the poll system to catch pollout events */
|
||||||
|
ssh_poll_add_events(s->poll_out, POLLOUT);
|
||||||
|
leave_function();
|
||||||
|
return SSH_AGAIN;
|
||||||
|
}
|
||||||
|
if (s->write_wontblock && len > 0) {
|
||||||
w = ssh_socket_unbuffered_write(s, buffer_get_rest(s->out_buffer), len);
|
w = ssh_socket_unbuffered_write(s, buffer_get_rest(s->out_buffer), len);
|
||||||
if (w < 0) {
|
if (w < 0) {
|
||||||
session->alive = 0;
|
session->alive = 0;
|
||||||
@ -569,7 +573,7 @@ int ssh_socket_nonblocking_flush(ssh_socket s) {
|
|||||||
len = buffer_get_rest_len(s->out_buffer);
|
len = buffer_get_rest_len(s->out_buffer);
|
||||||
if (s->poll_out && len > 0) {
|
if (s->poll_out && len > 0) {
|
||||||
/* force the poll system to catch pollout events */
|
/* force the poll system to catch pollout events */
|
||||||
ssh_poll_set_events(s->poll_out, ssh_poll_get_events(s->poll_out) | POLLOUT);
|
ssh_poll_add_events(s->poll_out, POLLOUT);
|
||||||
leave_function();
|
leave_function();
|
||||||
return SSH_AGAIN;
|
return SSH_AGAIN;
|
||||||
}
|
}
|
||||||
@ -579,12 +583,12 @@ int ssh_socket_nonblocking_flush(ssh_socket s) {
|
|||||||
return SSH_OK;
|
return SSH_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ssh_socket_set_towrite(ssh_socket s) {
|
void ssh_socket_set_write_wontblock(ssh_socket s) {
|
||||||
s->data_to_write = 1;
|
s->write_wontblock = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ssh_socket_set_toread(ssh_socket s) {
|
void ssh_socket_set_read_wontblock(ssh_socket s) {
|
||||||
s->data_to_read = 1;
|
s->read_wontblock = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ssh_socket_set_except(ssh_socket s) {
|
void ssh_socket_set_except(ssh_socket s) {
|
||||||
@ -592,17 +596,17 @@ void ssh_socket_set_except(ssh_socket s) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ssh_socket_data_available(ssh_socket s) {
|
int ssh_socket_data_available(ssh_socket s) {
|
||||||
return s->data_to_read;
|
return s->read_wontblock;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ssh_socket_data_writable(ssh_socket s) {
|
int ssh_socket_data_writable(ssh_socket s) {
|
||||||
return s->data_to_write;
|
return s->write_wontblock;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ssh_socket_get_status(ssh_socket s) {
|
int ssh_socket_get_status(ssh_socket s) {
|
||||||
int r = 0;
|
int r = 0;
|
||||||
|
|
||||||
if (s->data_to_read) {
|
if (s->read_wontblock) {
|
||||||
r |= SSH_READ_PENDING;
|
r |= SSH_READ_PENDING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user