mirror of
https://github.com/libssh2/libssh2.git
synced 2025-11-02 10:53:16 +03:00
SFTP: use multiple outgoing packets when writing
sftp_write was rewritten to split up outgoing data into multiple packets and deal with the acks in a more asynchronous manner. This is meant to help overcome latency and round-trip problems with the SFTP protocol.
This commit is contained in:
@@ -556,6 +556,7 @@ struct _LIBSSH2_SFTP_HANDLE
|
|||||||
struct _libssh2_sftp_handle_file_data
|
struct _libssh2_sftp_handle_file_data
|
||||||
{
|
{
|
||||||
libssh2_uint64_t offset;
|
libssh2_uint64_t offset;
|
||||||
|
libssh2_uint64_t offset_sent;
|
||||||
} file;
|
} file;
|
||||||
struct _libssh2_sftp_handle_dir_data
|
struct _libssh2_sftp_handle_dir_data
|
||||||
{
|
{
|
||||||
@@ -569,6 +570,10 @@ struct _LIBSSH2_SFTP_HANDLE
|
|||||||
libssh2_nonblocking_states close_state;
|
libssh2_nonblocking_states close_state;
|
||||||
unsigned long close_request_id;
|
unsigned long close_request_id;
|
||||||
unsigned char *close_packet;
|
unsigned char *close_packet;
|
||||||
|
|
||||||
|
/* list of chunks being written to server */
|
||||||
|
struct list_head write_list;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct _LIBSSH2_SFTP
|
struct _LIBSSH2_SFTP
|
||||||
|
|||||||
201
src/sftp.c
201
src/sftp.c
@@ -43,6 +43,7 @@
|
|||||||
#include "libssh2_sftp.h"
|
#include "libssh2_sftp.h"
|
||||||
#include "channel.h"
|
#include "channel.h"
|
||||||
#include "session.h"
|
#include "session.h"
|
||||||
|
#include "sftp.h"
|
||||||
|
|
||||||
/* Note: Version 6 was documented at the time of writing
|
/* Note: Version 6 was documented at the time of writing
|
||||||
* However it was marked as "DO NOT IMPLEMENT" due to pending changes
|
* However it was marked as "DO NOT IMPLEMENT" due to pending changes
|
||||||
@@ -1007,6 +1008,7 @@ sftp_open(LIBSSH2_SFTP *sftp, const char *filename,
|
|||||||
fp->sftp = sftp; /* point to the parent struct */
|
fp->sftp = sftp; /* point to the parent struct */
|
||||||
|
|
||||||
fp->u.file.offset = 0;
|
fp->u.file.offset = 0;
|
||||||
|
fp->u.file.offset_sent = 0;
|
||||||
|
|
||||||
_libssh2_debug(session, LIBSSH2_TRACE_SFTP, "Open command successful");
|
_libssh2_debug(session, LIBSSH2_TRACE_SFTP, "Open command successful");
|
||||||
return fp;
|
return fp;
|
||||||
@@ -1386,7 +1388,32 @@ libssh2_sftp_readdir_ex(LIBSSH2_SFTP_HANDLE *hnd, char *buffer,
|
|||||||
*
|
*
|
||||||
* Write data to an SFTP handle. Returns the number of bytes written, or
|
* Write data to an SFTP handle. Returns the number of bytes written, or
|
||||||
* a negative error code.
|
* a negative error code.
|
||||||
|
*
|
||||||
|
* We recommend sending very large data buffers to this function!
|
||||||
|
*
|
||||||
|
* Concept:
|
||||||
|
*
|
||||||
|
* - Split all outgoing data in packets no larger than N.
|
||||||
|
* - Each N bytes packet gets created as a separate SFTP packet.
|
||||||
|
* - Put all created packets in a linked list.
|
||||||
|
* - Send the parts, as many as possible until EAGAIN, and check for
|
||||||
|
* ACKs.
|
||||||
|
* - If the first packet(s) in the list has been ACKed, remove them from
|
||||||
|
* the list and return back the total byte count (TOTAL) for those packets.
|
||||||
|
*
|
||||||
|
* - On following call, the 'buffer' is expected to have advanced TOTAL
|
||||||
|
* bytes.
|
||||||
|
* - Detect how much of the given buffer that was already sent in a previous
|
||||||
|
* call by inspecting the linked list.
|
||||||
|
* - Split up the remainder and put in list to send.
|
||||||
|
* - Move through list and count ACKed packets and try to send more.
|
||||||
|
* - Return TOTAL bytes acked so far.
|
||||||
|
*
|
||||||
|
* Caveats:
|
||||||
|
* - be careful: we must not return a higher number than what was given!
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
|
static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
|
||||||
size_t count)
|
size_t count)
|
||||||
{
|
{
|
||||||
@@ -1394,77 +1421,157 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
|
|||||||
LIBSSH2_CHANNEL *channel = sftp->channel;
|
LIBSSH2_CHANNEL *channel = sftp->channel;
|
||||||
LIBSSH2_SESSION *session = channel->session;
|
LIBSSH2_SESSION *session = channel->session;
|
||||||
size_t data_len, retcode;
|
size_t data_len, retcode;
|
||||||
/* 25 = packet_len(4) + packet_type(1) + request_id(4) + handle_len(4) +
|
|
||||||
offset(8) + count(4) */
|
|
||||||
ssize_t packet_len;
|
ssize_t packet_len;
|
||||||
unsigned char *s, *data;
|
unsigned char *s, *data;
|
||||||
int rc;
|
int rc;
|
||||||
|
struct sftp_write_chunk *chunk;
|
||||||
|
struct sftp_write_chunk *next;
|
||||||
|
size_t sent = 0;
|
||||||
|
size_t acked = 0;
|
||||||
|
size_t org_count = count;
|
||||||
|
|
||||||
packet_len = handle->handle_len + count + 25;
|
chunk = _libssh2_list_first(&handle->write_list);
|
||||||
|
|
||||||
if (sftp->write_state == libssh2_NB_state_idle) {
|
/* the list of staged packets is in send order, the first to be sent
|
||||||
_libssh2_debug(session, LIBSSH2_TRACE_SFTP, "Writing %lu bytes",
|
next */
|
||||||
(unsigned long) count);
|
while(chunk) {
|
||||||
s = sftp->write_packet = LIBSSH2_ALLOC(session, packet_len);
|
if(SFTP_WITHIN(chunk->org_buffer, buffer, count)) {
|
||||||
if (!sftp->write_packet) {
|
/* parts of the given buffer have already been staged for
|
||||||
return _libssh2_error(session, LIBSSH2_ERROR_ALLOC,
|
sending, advance buffer to the end of the staged buffer and
|
||||||
"Unable to allocate memory for FXP_WRITE");
|
decrease count with the amount */
|
||||||
|
size_t delta = chunk->org_buflen-(chunk->org_buffer-buffer);
|
||||||
|
buffer += delta;
|
||||||
|
count -= delta;
|
||||||
|
chunk = _libssh2_list_next(&chunk->node);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
while(count) {
|
||||||
|
/* Possibly this should have some logic to prevent a very very
|
||||||
|
small fraction to be left but lets ignore that for now */
|
||||||
|
size_t size = MIN(MAX_SFTP_OUTGOING_SIZE, count);
|
||||||
|
|
||||||
|
/* 25 = packet_len(4) + packet_type(1) + request_id(4) +
|
||||||
|
handle_len(4) + offset(8) + count(4) */
|
||||||
|
packet_len = handle->handle_len + size + 25;
|
||||||
|
|
||||||
|
chunk = LIBSSH2_ALLOC(session, packet_len +
|
||||||
|
sizeof(struct sftp_write_chunk));
|
||||||
|
if (!chunk)
|
||||||
|
return _libssh2_error(session, LIBSSH2_ERROR_ALLOC,
|
||||||
|
"malloc fail for FXP_WRITE");
|
||||||
|
|
||||||
|
chunk->org_buffer = buffer;
|
||||||
|
chunk->org_buflen = size;
|
||||||
|
chunk->sent = 0;
|
||||||
|
chunk->lefttosend = packet_len;
|
||||||
|
|
||||||
|
s = chunk->packet;
|
||||||
_libssh2_store_u32(&s, packet_len - 4);
|
_libssh2_store_u32(&s, packet_len - 4);
|
||||||
|
|
||||||
*(s++) = SSH_FXP_WRITE;
|
*(s++) = SSH_FXP_WRITE;
|
||||||
sftp->write_request_id = sftp->request_id++;
|
sftp->write_request_id = sftp->request_id++;
|
||||||
|
chunk->request_id = sftp->write_request_id;
|
||||||
_libssh2_store_u32(&s, sftp->write_request_id);
|
_libssh2_store_u32(&s, sftp->write_request_id);
|
||||||
_libssh2_store_str(&s, handle->handle, handle->handle_len);
|
_libssh2_store_str(&s, handle->handle, handle->handle_len);
|
||||||
_libssh2_store_u64(&s, handle->u.file.offset);
|
_libssh2_store_u64(&s, handle->u.file.offset_sent);
|
||||||
_libssh2_store_str(&s, buffer, count);
|
handle->u.file.offset_sent += size; /* advance offset at once */
|
||||||
|
_libssh2_store_str(&s, buffer, size);
|
||||||
|
|
||||||
sftp->write_state = libssh2_NB_state_created;
|
/* add this new entry LAST in the list */
|
||||||
|
_libssh2_list_add(&handle->write_list, &chunk->node);
|
||||||
|
|
||||||
|
buffer += size;
|
||||||
|
count -= size; /* deduct the size we used, as we might have
|
||||||
|
to create more packets */
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sftp->write_state == libssh2_NB_state_created) {
|
/* move through the packets that haven't been sent and send as many
|
||||||
rc = _libssh2_channel_write(channel, 0, sftp->write_packet,
|
as possible - remember that we don't block */
|
||||||
packet_len);
|
chunk = _libssh2_list_first(&handle->write_list);
|
||||||
if(rc < 0) {
|
|
||||||
/* error */
|
while(chunk) {
|
||||||
return rc;
|
if(chunk->lefttosend) {
|
||||||
|
rc = _libssh2_channel_write(channel, 0,
|
||||||
|
&chunk->packet[chunk->sent],
|
||||||
|
chunk->lefttosend);
|
||||||
|
if(rc < 0) {
|
||||||
|
if(rc != LIBSSH2_ERROR_EAGAIN)
|
||||||
|
/* error */
|
||||||
|
return rc;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* remember where to continue sending the next time */
|
||||||
|
chunk->lefttosend -= rc;
|
||||||
|
chunk->sent += rc;
|
||||||
|
sent += rc;
|
||||||
|
|
||||||
|
if(chunk->lefttosend)
|
||||||
|
/* data left to send, get out of loop */
|
||||||
|
break;
|
||||||
|
|
||||||
|
chunk = _libssh2_list_next(&chunk->node);
|
||||||
|
|
||||||
|
sftp->write_state = libssh2_NB_state_sent;
|
||||||
}
|
}
|
||||||
else if(0 == rc) {
|
else {
|
||||||
/* nothing sent is an error */
|
/* move on to the next chunk with data to send */
|
||||||
return LIBSSH2_ERROR_SOCKET_SEND;
|
chunk = _libssh2_list_next(&chunk->node);
|
||||||
}
|
}
|
||||||
else if (packet_len != rc) {
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
LIBSSH2_FREE(session, sftp->write_packet);
|
|
||||||
sftp->write_packet = NULL;
|
|
||||||
sftp->write_state = libssh2_NB_state_sent;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = sftp_packet_require(sftp, SSH_FXP_STATUS,
|
/*
|
||||||
sftp->write_request_id, &data, &data_len);
|
* Count all ACKed packets
|
||||||
if (rc == LIBSSH2_ERROR_EAGAIN) {
|
*/
|
||||||
return rc;
|
chunk = _libssh2_list_first(&handle->write_list);
|
||||||
}
|
|
||||||
else if (rc) {
|
while(chunk) {
|
||||||
sftp->write_state = libssh2_NB_state_idle;
|
/* we can expect the packets to be ACKed in order */
|
||||||
return _libssh2_error(session, rc,
|
rc = sftp_packet_require(sftp, SSH_FXP_STATUS,
|
||||||
"Timeout waiting for status message");
|
chunk->request_id, &data, &data_len);
|
||||||
|
if (rc == LIBSSH2_ERROR_EAGAIN)
|
||||||
|
break;
|
||||||
|
else if (rc) {
|
||||||
|
sftp->write_state = libssh2_NB_state_idle;
|
||||||
|
return _libssh2_error(session, rc, "Waiting for SFTP status");
|
||||||
|
}
|
||||||
|
retcode = _libssh2_ntohu32(data + 5);
|
||||||
|
LIBSSH2_FREE(session, data);
|
||||||
|
|
||||||
|
sftp->last_errno = retcode;
|
||||||
|
if (retcode == LIBSSH2_FX_OK) {
|
||||||
|
acked += chunk->org_buflen; /* number of payload data that was
|
||||||
|
acked here */
|
||||||
|
|
||||||
|
/* we increase the offset value for all acks */
|
||||||
|
handle->u.file.offset += chunk->org_buflen;
|
||||||
|
|
||||||
|
next = _libssh2_list_next(&chunk->node);
|
||||||
|
|
||||||
|
_libssh2_list_remove(&chunk->node); /* remove from list */
|
||||||
|
LIBSSH2_FREE(session, chunk); /* free memory */
|
||||||
|
|
||||||
|
chunk = next;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
/* TODO: handle errors here! */
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sftp->write_state = libssh2_NB_state_idle;
|
sftp->write_state = libssh2_NB_state_idle;
|
||||||
|
|
||||||
retcode = _libssh2_ntohu32(data + 5);
|
if(acked)
|
||||||
LIBSSH2_FREE(session, data);
|
/* we got data acked so return that amount, but no more than what
|
||||||
|
was asked to get sent! */
|
||||||
|
return MIN(acked, org_count);
|
||||||
|
|
||||||
if (retcode == LIBSSH2_FX_OK) {
|
return _libssh2_error(session, LIBSSH2_ERROR_EAGAIN,
|
||||||
handle->u.file.offset += count;
|
"Would block");
|
||||||
return count;
|
|
||||||
}
|
|
||||||
sftp->last_errno = retcode;
|
|
||||||
|
|
||||||
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
|
|
||||||
"SFTP Protocol Error");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* libssh2_sftp_write
|
/* libssh2_sftp_write
|
||||||
|
|||||||
Reference in New Issue
Block a user