Bug Summary

File:rpc/rpc-transport/socket/src/socket.c
Location:line 3185, column 17
Description:Value stored to 'ret' is never read

Annotated Source Code

1/*
2 Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3 This file is part of GlusterFS.
4
5 This file is licensed to you under your choice of the GNU Lesser
6 General Public License, version 3 or any later version (LGPLv3 or
7 later), or the GNU General Public License, version 2 (GPLv2), in all
8 cases as published by the Free Software Foundation.
9*/
10
11
12#ifndef _CONFIG_H
13#define _CONFIG_H
14#include "config.h"
15#endif
16
17#include "socket.h"
18#include "name.h"
19#include "dict.h"
20#include "rpc-transport.h"
21#include "logging.h"
22#include "xlator.h"
23#include "byte-order.h"
24#include "common-utils.h"
25#include "compat-errno.h"
26
27
28/* ugly #includes below */
29#include "protocol-common.h"
30#include "glusterfs3-xdr.h"
31#include "xdr-nfs3.h"
32#include "rpcsvc.h"
33
34#include <fcntl.h>
35#include <errno(*__errno_location ()).h>
36#include <netinet/tcp.h>
37#include <rpc/xdr.h>
38#include <sys/ioctl.h>
39#define GF_LOG_ERRNO(errno)(((*__errno_location ()) == 107) ? GF_LOG_DEBUG : GF_LOG_ERROR
)
((errno(*__errno_location ()) == ENOTCONN107) ? GF_LOG_DEBUG : GF_LOG_ERROR)
40#define SA(ptr)((struct sockaddr *)ptr) ((struct sockaddr *)ptr)
41
42#define SSL_ENABLED_OPT"transport.socket.ssl-enabled" "transport.socket.ssl-enabled"
43#define SSL_OWN_CERT_OPT"transport.socket.ssl-own-cert" "transport.socket.ssl-own-cert"
44#define SSL_PRIVATE_KEY_OPT"transport.socket.ssl-private-key" "transport.socket.ssl-private-key"
45#define SSL_CA_LIST_OPT"transport.socket.ssl-ca-list" "transport.socket.ssl-ca-list"
46#define OWN_THREAD_OPT"transport.socket.own-thread" "transport.socket.own-thread"
47
48/* TBD: do automake substitutions etc. (ick) to set these. */
49#if !defined(DEFAULT_CERT_PATH"/etc/ssl/glusterfs.pem")
50#define DEFAULT_CERT_PATH"/etc/ssl/glusterfs.pem" "/etc/ssl/glusterfs.pem"
51#endif
52#if !defined(DEFAULT_KEY_PATH"/etc/ssl/glusterfs.key")
53#define DEFAULT_KEY_PATH"/etc/ssl/glusterfs.key" "/etc/ssl/glusterfs.key"
54#endif
55#if !defined(DEFAULT_CA_PATH"/etc/ssl/glusterfs.ca")
56#define DEFAULT_CA_PATH"/etc/ssl/glusterfs.ca" "/etc/ssl/glusterfs.ca"
57#endif
58
59#define POLL_MASK_INPUT(0x001 | 0x002) (POLLIN0x001 | POLLPRI0x002)
60#define POLL_MASK_OUTPUT(0x004) (POLLOUT0x004)
61#define POLL_MASK_ERROR(0x008 | 0x010 | 0x020) (POLLERR0x008 | POLLHUP0x010 | POLLNVAL0x020)
62
63typedef int SSL_unary_func (SSL *);
64typedef int SSL_trinary_func (SSL *, void *, int);
65
66#define __socket_proto_reset_pending(priv)do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0)
do { \
67 struct gf_sock_incoming_frag *frag; \
68 frag = &priv->incoming.frag; \
69 \
70 memset (&frag->vector, 0, sizeof (frag->vector)); \
71 frag->pending_vector = &frag->vector; \
72 frag->pending_vector->iov_base = frag->fragcurrent; \
73 priv->incoming.pending_vector = frag->pending_vector; \
74 } while (0)
75
76
77#define __socket_proto_update_pending(priv)do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag
= &priv->incoming.frag; if (frag->pending_vector->
iov_len == 0) { remaining = (((uint32_t)(priv->incoming.fraghdr
& 0x7fffffffU)) - frag->bytes_read); frag->pending_vector
->iov_len = (remaining > frag->remaining_size) ? frag
->remaining_size : remaining; frag->remaining_size -= frag
->pending_vector->iov_len; } } while (0)
\
78 do { \
79 uint32_t remaining; \
80 struct gf_sock_incoming_frag *frag; \
81 frag = &priv->incoming.frag; \
82 if (frag->pending_vector->iov_len == 0) { \
83 remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr)((uint32_t)(priv->incoming.fraghdr & 0x7fffffffU)) \
84 - frag->bytes_read); \
85 \
86 frag->pending_vector->iov_len = \
87 (remaining > frag->remaining_size) \
88 ? frag->remaining_size : remaining; \
89 \
90 frag->remaining_size -= \
91 frag->pending_vector->iov_len; \
92 } \
93 } while (0)
94
95#define __socket_proto_update_priv_after_read(priv, ret, bytes_read){ struct gf_sock_incoming_frag *frag; frag = &priv->incoming
.frag; frag->fragcurrent += bytes_read; frag->bytes_read
+= bytes_read; if ((ret > 0) || (frag->remaining_size !=
0)) { if (frag->remaining_size != 0 && ret == 0) {
do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); } do { do { if (0) printf ("partial read on non-blocking socket"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 95, GF_LOG_TRACE, "partial read on non-blocking socket"); }
while (0); break; } }
\
96 { \
97 struct gf_sock_incoming_frag *frag; \
98 frag = &priv->incoming.frag; \
99 \
100 frag->fragcurrent += bytes_read; \
101 frag->bytes_read += bytes_read; \
102 \
103 if ((ret > 0) || (frag->remaining_size != 0)) { \
104 if (frag->remaining_size != 0 && ret == 0) { \
105 __socket_proto_reset_pending (priv)do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0)
; \
106 } \
107 \
108 gf_log (this->name, GF_LOG_TRACE, \do { do { if (0) printf ("partial read on non-blocking socket"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 109, GF_LOG_TRACE, "partial read on non-blocking socket"); }
while (0)
109 "partial read on non-blocking socket")do { do { if (0) printf ("partial read on non-blocking socket"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 109, GF_LOG_TRACE, "partial read on non-blocking socket"); }
while (0)
; \
110 \
111 break; \
112 } \
113 }
114
115#define __socket_proto_init_pending(priv,size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > size
) ? size : remaining; frag->remaining_size = (size - frag->
pending_vector->iov_len); } while(0)
\
116 do { \
117 uint32_t remaining = 0; \
118 struct gf_sock_incoming_frag *frag; \
119 frag = &priv->incoming.frag; \
120 \
121 remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr)((uint32_t)(priv->incoming.fraghdr & 0x7fffffffU)) \
122 - frag->bytes_read); \
123 \
124 __socket_proto_reset_pending (priv)do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0)
; \
125 \
126 frag->pending_vector->iov_len = \
127 (remaining > size) ? size : remaining; \
128 \
129 frag->remaining_size = (size - frag->pending_vector->iov_len); \
130 \
131 } while(0)
132
133
134/* This will be used in a switch case and breaks from the switch case if all
135 * the pending data is not read.
136 */
137#define __socket_proto_read(priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 137, GF_LOG_WARNING
, "reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); break; } { struct gf_sock_incoming_frag *frag; frag = &
priv->incoming.frag; frag->fragcurrent += bytes_read; frag
->bytes_read += bytes_read; if ((ret > 0) || (frag->
remaining_size != 0)) { if (frag->remaining_size != 0 &&
ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &
priv->incoming.frag; memset (&frag->vector, 0, sizeof
(frag->vector)); frag->pending_vector = &frag->
vector; frag->pending_vector->iov_base = frag->fragcurrent
; priv->incoming.pending_vector = frag->pending_vector;
} while (0); } do { do { if (0) printf ("partial read on non-blocking socket"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 137, GF_LOG_TRACE, "partial read on non-blocking socket"); }
while (0); break; } }; }
\
138 { \
139 size_t bytes_read = 0; \
140 struct gf_sock_incoming *in; \
141 in = &priv->incoming; \
142 \
143 __socket_proto_update_pending (priv)do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag
= &priv->incoming.frag; if (frag->pending_vector->
iov_len == 0) { remaining = (((uint32_t)(priv->incoming.fraghdr
& 0x7fffffffU)) - frag->bytes_read); frag->pending_vector
->iov_len = (remaining > frag->remaining_size) ? frag
->remaining_size : remaining; frag->remaining_size -= frag
->pending_vector->iov_len; } } while (0)
; \
144 \
145 ret = __socket_readv (this, \
146 in->pending_vector, 1, \
147 &in->pending_vector, \
148 &in->pending_count, \
149 &bytes_read); \
150 if (ret == -1) { \
151 if (priv->read_fail_log) \
152 gf_log (this->name, GF_LOG_WARNING, \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
153 "reading from socket failed." \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
154 "Error (%s), peer (%s)", \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
155 strerror (errno), \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
156 this->peerinfo.identifier)do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
; \
157 break; \
158 } \
159 __socket_proto_update_priv_after_read (priv, ret, bytes_read){ struct gf_sock_incoming_frag *frag; frag = &priv->incoming
.frag; frag->fragcurrent += bytes_read; frag->bytes_read
+= bytes_read; if ((ret > 0) || (frag->remaining_size !=
0)) { if (frag->remaining_size != 0 && ret == 0) {
do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); } do { do { if (0) printf ("partial read on non-blocking socket"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 159, GF_LOG_TRACE, "partial read on non-blocking socket"); }
while (0); break; } }
; \
160 }
161
162int socket_init (rpc_transport_t *this);
163
164void
165ssl_dump_error_stack (const char *caller)
166{
167 unsigned long errnum = 0;
168 char errbuf[120] = {0,};
169
170 /* OpenSSL docs explicitly give 120 as the error-string length. */
171
172 while ((errnum = ERR_get_error())) {
173 ERR_error_string(errnum,errbuf);
174 gf_log(caller,GF_LOG_ERROR," %s",errbuf)do { do { if (0) printf (" %s",errbuf); } while (0); _gf_log
(caller, "socket.c", __FUNCTION__, 174, GF_LOG_ERROR," %s",
errbuf); } while (0)
;
175 }
176}
177
178int
179ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)
180{
181 int r = (-1);
182 struct pollfd pfd = {-1,};
183 socket_private_t *priv = NULL((void*)0);
184
185 GF_VALIDATE_OR_GOTO(this->name,this->private,out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn (this->name, "socket.c",
__FUNCTION__, 185, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
186 priv = this->private;
187
188 for (;;) {
189 if (buf) {
190 if (priv->connected == -1) {
191 /*
192 * Fields in the SSL structure (especially
193 * the BIO pointers) are not valid at this
194 * point, so we'll segfault if we pass them
195 * to SSL_read/SSL_write.
196 */
197 gf_log(this->name,GF_LOG_INFO,do { do { if (0) printf ("lost connection in %s", __func__); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
198, GF_LOG_INFO, "lost connection in %s", __func__); } while
(0)
198 "lost connection in %s", __func__)do { do { if (0) printf ("lost connection in %s", __func__); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
198, GF_LOG_INFO, "lost connection in %s", __func__); } while
(0)
;
199 break;
200 }
201 r = func(priv->ssl_ssl,buf,len);
202 }
203 else {
204 /*
205 * We actually need these functions to get to
206 * priv->connected == 1.
207 */
208 r = ((SSL_unary_func *)func)(priv->ssl_ssl);
209 }
210 switch (SSL_get_error(priv->ssl_ssl,r)) {
211 case SSL_ERROR_NONE0:
212 return r;
213 case SSL_ERROR_WANT_READ2:
214 pfd.fd = priv->sock;
215 pfd.events = POLLIN0x001;
216 if (poll(&pfd,1,-1) < 0) {
217 gf_log(this->name,GF_LOG_ERROR,"poll error %d",do { do { if (0) printf ("poll error %d", (*__errno_location (
))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 218, GF_LOG_ERROR,"poll error %d", (*__errno_location ()));
} while (0)
218 errno)do { do { if (0) printf ("poll error %d", (*__errno_location (
))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 218, GF_LOG_ERROR,"poll error %d", (*__errno_location ()));
} while (0)
;
219 }
220 break;
221 case SSL_ERROR_WANT_WRITE3:
222 pfd.fd = priv->sock;
223 pfd.events = POLLOUT0x004;
224 if (poll(&pfd,1,-1) < 0) {
225 gf_log(this->name,GF_LOG_ERROR,"poll error %d",do { do { if (0) printf ("poll error %d", (*__errno_location (
))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 226, GF_LOG_ERROR,"poll error %d", (*__errno_location ()));
} while (0)
226 errno)do { do { if (0) printf ("poll error %d", (*__errno_location (
))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 226, GF_LOG_ERROR,"poll error %d", (*__errno_location ()));
} while (0)
;
227 }
228 break;
229 case SSL_ERROR_SYSCALL5:
230 /* This is what we get when remote disconnects. */
231 gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("syscall error (probably remote disconnect)"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 232, GF_LOG_DEBUG, "syscall error (probably remote disconnect)"
); } while (0)
232 "syscall error (probably remote disconnect)")do { do { if (0) printf ("syscall error (probably remote disconnect)"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 232, GF_LOG_DEBUG, "syscall error (probably remote disconnect)"
); } while (0)
;
233 errno(*__errno_location ()) = ENODATA61;
234 goto out;
235 default:
236 errno(*__errno_location ()) = EIO5;
237 goto out; /* "break" would just loop again */
238 }
239 }
240out:
241 return -1;
242}
243
244#define ssl_connect_one(t)ssl_do((t),((void*)0),0,(SSL_trinary_func *)SSL_connect) ssl_do((t),NULL((void*)0),0,(SSL_trinary_func *)SSL_connect)
245#define ssl_accept_one(t)ssl_do((t),((void*)0),0,(SSL_trinary_func *)SSL_accept) ssl_do((t),NULL((void*)0),0,(SSL_trinary_func *)SSL_accept)
246#define ssl_read_one(t,b,l)ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read)
247#define ssl_write_one(t,b,l)ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write)
248
249int
250ssl_setup_connection (rpc_transport_t *this, int server)
251{
252 X509 *peer = NULL((void*)0);
253 char peer_CN[256] = "";
254 int ret = -1;
255 socket_private_t *priv = NULL((void*)0);
256
257 GF_VALIDATE_OR_GOTO(this->name,this->private,done)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn (this->name, "socket.c",
__FUNCTION__, 257, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto done; } } while (0)
;
258 priv = this->private;
259
260 priv->ssl_ssl = SSL_new(priv->ssl_ctx);
261 if (!priv->ssl_ssl) {
262 gf_log(this->name,GF_LOG_ERROR,"SSL_new failed")do { do { if (0) printf ("SSL_new failed"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 262, GF_LOG_ERROR,
"SSL_new failed"); } while (0)
;
263 ssl_dump_error_stack(this->name);
264 goto done;
265 }
266 priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE0x00);
267 if (!priv->ssl_sbio) {
268 gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed")do { do { if (0) printf ("BIO_new_socket failed"); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 268, GF_LOG_ERROR
,"BIO_new_socket failed"); } while (0)
;
269 ssl_dump_error_stack(this->name);
270 goto free_ssl;
271 }
272 SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio);
273
274 if (server) {
275 ret = ssl_accept_one(this)ssl_do((this),((void*)0),0,(SSL_trinary_func *)SSL_accept);
276 }
277 else {
278 ret = ssl_connect_one(this)ssl_do((this),((void*)0),0,(SSL_trinary_func *)SSL_connect);
279 }
280
281 /* Make sure _the call_ succeeded. */
282 if (ret < 0) {
283 goto ssl_error;
284 }
285
286 /* Make sure _SSL verification_ succeeded, yielding an identity. */
287 if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK0) {
288 goto ssl_error;
289 }
290 peer = SSL_get_peer_certificate(priv->ssl_ssl);
291 if (!peer) {
292 goto ssl_error;
293 }
294
295 /* Finally, everything seems OK. */
296 X509_NAME_get_text_by_NID(X509_get_subject_name(peer),
297 NID_commonName13, peer_CN, sizeof(peer_CN)-1);
298 peer_CN[sizeof(peer_CN)-1] = '\0';
299 gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN)do { do { if (0) printf ("peer CN = %s", peer_CN); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 299, GF_LOG_INFO
,"peer CN = %s", peer_CN); } while (0)
;
300 return 0;
301
302 /* Error paths. */
303ssl_error:
304 gf_log(this->name,GF_LOG_ERROR,"SSL connect error")do { do { if (0) printf ("SSL connect error"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 304, GF_LOG_ERROR,
"SSL connect error"); } while (0)
;
305 ssl_dump_error_stack(this->name);
306free_ssl:
307 SSL_free(priv->ssl_ssl);
308 priv->ssl_ssl = NULL((void*)0);
309done:
310 return ret;
311}
312
313
314void
315ssl_teardown_connection (socket_private_t *priv)
316{
317 SSL_shutdown(priv->ssl_ssl);
318 SSL_clear(priv->ssl_ssl);
319 SSL_free(priv->ssl_ssl);
320 priv->ssl_ssl = NULL((void*)0);
321}
322
323
324ssize_t
325__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
326{
327 socket_private_t *priv = NULL((void*)0);
328 int sock = -1;
329 int ret = -1;
330
331 priv = this->private;
332 sock = priv->sock;
333
334 if (priv->use_ssl) {
335 ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len)ssl_do((this),(opvector->iov_base),(opvector->iov_len),
(SSL_trinary_func *)SSL_read)
;
336 } else {
337 ret = readv (sock, opvector, opcount);
338 }
339
340 return ret;
341}
342
343
344ssize_t
345__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count)
346{
347 struct iovec iov = {0, };
348 int ret = -1;
349
350 iov.iov_base = buf;
351 iov.iov_len = count;
352
353 ret = __socket_ssl_readv (this, &iov, 1);
354
355 return ret;
356}
357
358
359int
360__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount)
361{
362 socket_private_t *priv = NULL((void*)0);
363 int sock = -1;
364 struct gf_sock_incoming *in = NULL((void*)0);
365 int req_len = -1;
366 int ret = -1;
367
368 priv = this->private;
369 sock = priv->sock;
370 in = &priv->incoming;
371 req_len = iov_length (opvector, opcount);
372
373 if (in->record_state == SP_STATE_READING_FRAGHDR) {
374 in->ra_read = 0;
375 in->ra_served = 0;
376 in->ra_max = 0;
377 in->ra_buf = NULL((void*)0);
378 goto uncached;
379 }
380
381 if (!in->ra_max) {
382 /* first call after passing SP_STATE_READING_FRAGHDR */
383 in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX)((((uint32_t)(in->fraghdr & 0x7fffffffU)))<(1024)?(
((uint32_t)(in->fraghdr & 0x7fffffffU))):(1024))
;
384 /* Note that the in->iobuf is the primary iobuf into which
385 headers are read into. By using this itself as our
386 read-ahead cache, we can avoid memory copies in iov_load
387 */
388 in->ra_buf = iobuf_ptr (in->iobuf)((in->iobuf)->ptr);
389 }
390
391 /* fill read-ahead */
392 if (in->ra_read < in->ra_max) {
393 ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read],
394 (in->ra_max - in->ra_read));
395 if (ret > 0)
396 in->ra_read += ret;
397
398 /* we proceed to test if there is still cached data to
399 be served even if readahead could not progress */
400 }
401
402 /* serve cached */
403 if (in->ra_served < in->ra_read) {
404 ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served],
405 min (req_len, (in->ra_read - in->ra_served))((req_len)<((in->ra_read - in->ra_served))?(req_len)
:((in->ra_read - in->ra_served)))
);
406
407 in->ra_served += ret;
408 /* Do not read uncached and cached in the same call */
409 goto out;
410 }
411
412 if (in->ra_read < in->ra_max)
413 /* If there was no cached data to be served, (and we are
414 guaranteed to have already performed an attempt to progress
415 readahead above), and we have not yet read out the full
416 readahead capacity, then bail out for now without doing
417 the uncached read below (as that will overtake future cached
418 read)
419 */
420 goto out;
421uncached:
422 ret = __socket_ssl_readv (this, opvector, opcount);
423out:
424 return ret;
425}
426
427
428/*
429 * return value:
430 * 0 = success (completed)
431 * -1 = error
432 * > 0 = incomplete
433 */
434
435int
436__socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
437 struct iovec **pending_vector, int *pending_count, size_t *bytes,
438 int write)
439{
440 socket_private_t *priv = NULL((void*)0);
441 int sock = -1;
442 int ret = -1;
443 struct iovec *opvector = NULL((void*)0);
444 int opcount = 0;
445 int moved = 0;
446
447 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 447, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
448 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 448, GF_LOG_ERROR, "invalid argument: " "this->private")
; } while (0); goto out; } } while (0)
;
449
450 priv = this->private;
451 sock = priv->sock;
452
453 opvector = vector;
454 opcount = count;
455
456 if (bytes != NULL((void*)0)) {
457 *bytes = 0;
458 }
459
460 while (opcount > 0) {
461 if (opvector->iov_len == 0) {
462 gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("would have passed zero length to read/write"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 463, GF_LOG_DEBUG, "would have passed zero length to read/write"
); } while (0)
463 "would have passed zero length to read/write")do { do { if (0) printf ("would have passed zero length to read/write"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 463, GF_LOG_DEBUG, "would have passed zero length to read/write"
); } while (0)
;
464 ++opvector;
465 --opcount;
466 continue;
467 }
468 if (write) {
469 if (priv->use_ssl) {
470 ret = ssl_write_one(this,ssl_do((this),(opvector->iov_base),(opvector->iov_len),
(SSL_trinary_func *)SSL_write)
471 opvector->iov_base, opvector->iov_len)ssl_do((this),(opvector->iov_base),(opvector->iov_len),
(SSL_trinary_func *)SSL_write)
;
472 }
473 else {
474 ret = writev (sock, opvector, opcount);
475 }
476
477 if (ret == 0 || (ret == -1 && errno(*__errno_location ()) == EAGAIN11)) {
478 /* done for now */
479 break;
480 }
481 this->total_bytes_write += ret;
482 } else {
483 ret = __socket_cached_read (this, opvector, opcount);
484
485 if (ret == 0) {
486 gf_log(this->name,GF_LOG_DEBUG,"EOF on socket")do { do { if (0) printf ("EOF on socket"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 486, GF_LOG_DEBUG,
"EOF on socket"); } while (0)
;
487 errno(*__errno_location ()) = ENODATA61;
488 ret = -1;
489 }
490 if (ret == -1 && errno(*__errno_location ()) == EAGAIN11) {
491 /* done for now */
492 break;
493 }
494 this->total_bytes_read += ret;
495 }
496
497 if (ret == 0) {
498 /* Mostly due to 'umount' in client */
499
500 gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("EOF from peer %s", this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 501, GF_LOG_DEBUG, "EOF from peer %s", this->
peerinfo.identifier); } while (0)
501 "EOF from peer %s", this->peerinfo.identifier)do { do { if (0) printf ("EOF from peer %s", this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 501, GF_LOG_DEBUG, "EOF from peer %s", this->
peerinfo.identifier); } while (0)
;
502 opcount = -1;
503 errno(*__errno_location ()) = ENOTCONN107;
504 break;
505 }
506 if (ret == -1) {
507 if (errno(*__errno_location ()) == EINTR4)
508 continue;
509
510 if (write || (!write && priv->read_fail_log))
511 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0)
512 "%s on %s failed (%s)",do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0)
513 write ? "writev":"readv",do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0)
514 this->peerinfo.identifier,do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0)
515 strerror (errno))do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev"
:"readv", this->peerinfo.identifier, strerror ((*__errno_location
()))); } while (0)
;
516 if (priv->use_ssl) {
517 ssl_dump_error_stack(this->name);
518 }
519 opcount = -1;
520 break;
521 }
522
523 if (bytes != NULL((void*)0)) {
524 *bytes += ret;
525 }
526
527 moved = 0;
528
529 while (moved < ret) {
530 if (!opcount) {
531 gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("ran out of iov, moved %d/%d", moved
, ret); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 533, GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret
); } while (0)
532 "ran out of iov, moved %d/%d",do { do { if (0) printf ("ran out of iov, moved %d/%d", moved
, ret); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 533, GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret
); } while (0)
533 moved, ret)do { do { if (0) printf ("ran out of iov, moved %d/%d", moved
, ret); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 533, GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret
); } while (0)
;
534 goto ran_out;
535 }
536 if (!opvector[0].iov_len) {
537 opvector++;
538 opcount--;
539 continue;
540 }
541 if ((ret - moved) >= opvector[0].iov_len) {
542 moved += opvector[0].iov_len;
543 opvector++;
544 opcount--;
545 } else {
546 opvector[0].iov_len -= (ret - moved);
547 opvector[0].iov_base += (ret - moved);
548 moved += (ret - moved);
549 }
550 }
551 }
552
553ran_out:
554
555 if (pending_vector)
556 *pending_vector = opvector;
557
558 if (pending_count)
559 *pending_count = opcount;
560
561out:
562 return opcount;
563}
564
565
566int
567__socket_readv (rpc_transport_t *this, struct iovec *vector, int count,
568 struct iovec **pending_vector, int *pending_count,
569 size_t *bytes)
570{
571 int ret = -1;
572
573 ret = __socket_rwv (this, vector, count,
574 pending_vector, pending_count, bytes, 0);
575
576 return ret;
577}
578
579
580int
581__socket_writev (rpc_transport_t *this, struct iovec *vector, int count,
582 struct iovec **pending_vector, int *pending_count)
583{
584 int ret = -1;
585
586 ret = __socket_rwv (this, vector, count,
587 pending_vector, pending_count, NULL((void*)0), 1);
588
589 return ret;
590}
591
592
593int
594__socket_shutdown (rpc_transport_t *this)
595{
596 int ret = -1;
597 socket_private_t *priv = this->private;
598
599 priv->connected = -1;
600 ret = shutdown (priv->sock, SHUT_RDWRSHUT_RDWR);
601 if (ret) {
602 /* its already disconnected.. no need to understand
603 why it failed to shutdown in normal cases */
604 gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("shutdown() returned %d. %s", ret, strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 606, GF_LOG_DEBUG, "shutdown() returned %d. %s"
, ret, strerror ((*__errno_location ()))); } while (0)
605 "shutdown() returned %d. %s",do { do { if (0) printf ("shutdown() returned %d. %s", ret, strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 606, GF_LOG_DEBUG, "shutdown() returned %d. %s"
, ret, strerror ((*__errno_location ()))); } while (0)
606 ret, strerror (errno))do { do { if (0) printf ("shutdown() returned %d. %s", ret, strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 606, GF_LOG_DEBUG, "shutdown() returned %d. %s"
, ret, strerror ((*__errno_location ()))); } while (0)
;
607 }
608
609 return ret;
610}
611
612int
613__socket_disconnect (rpc_transport_t *this)
614{
615 int ret = -1;
616 socket_private_t *priv = NULL((void*)0);
617
618 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 618, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
619 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 619, GF_LOG_ERROR, "invalid argument: " "this->private")
; } while (0); goto out; } } while (0)
;
620
621 priv = this->private;
622
623 if (priv->sock != -1) {
624 ret = __socket_shutdown(this);
625 if (priv->own_thread) {
626 /*
627 * Without this, reconnect (= disconnect + connect)
628 * won't work except by accident.
629 */
630 close(priv->sock);
631 priv->sock = -1;
632 /*
633 * Closing the socket forces an error that will wake
634 * up the polling thread. Wait for it to notice and
635 * respond.
636 */
637 if (priv->ot_state == OT_ALIVE) {
638 priv->ot_state = OT_DYING;
639 pthread_cond_wait(&priv->ot_event,&priv->lock);
640 }
641 }
642 else if (priv->use_ssl) {
643 ssl_teardown_connection(priv);
644 }
645 }
646
647out:
648 return ret;
649}
650
651
652int
653__socket_server_bind (rpc_transport_t *this)
654{
655 socket_private_t *priv = NULL((void*)0);
656 int ret = -1;
657 int opt = 1;
658 int reuse_check_sock = -1;
659 struct sockaddr_storage unix_addr = {0};
660
661 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 661, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
662 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 662, GF_LOG_ERROR, "invalid argument: " "this->private")
; } while (0); goto out; } } while (0)
;
663
664 priv = this->private;
665
666 ret = setsockopt (priv->sock, SOL_SOCKET1, SO_REUSEADDR2,
667 &opt, sizeof (opt));
668
669 if (ret == -1) {
670 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setsockopt() for SO_REUSEADDR failed (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 672, GF_LOG_ERROR, "setsockopt() for SO_REUSEADDR failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
671 "setsockopt() for SO_REUSEADDR failed (%s)",do { do { if (0) printf ("setsockopt() for SO_REUSEADDR failed (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 672, GF_LOG_ERROR, "setsockopt() for SO_REUSEADDR failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
672 strerror (errno))do { do { if (0) printf ("setsockopt() for SO_REUSEADDR failed (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 672, GF_LOG_ERROR, "setsockopt() for SO_REUSEADDR failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
;
673 }
674
675 /* reuse-address doesn't work for unix type sockets */
676 if (AF_UNIX1 == SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr)->sa_family) {
677 memcpy (&unix_addr, SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr),
678 this->myinfo.sockaddr_len);
679 reuse_check_sock = socket (AF_UNIX1, SOCK_STREAMSOCK_STREAM, 0);
680 if (reuse_check_sock >= 0) {
681 ret = connect (reuse_check_sock, SA (&unix_addr)((struct sockaddr *)&unix_addr),
682 this->myinfo.sockaddr_len);
683 if ((ret == -1) && (ECONNREFUSED111 == errno(*__errno_location ()))) {
684 unlink (((struct sockaddr_un*)&unix_addr)->sun_path);
685 }
686 close (reuse_check_sock);
687 }
688 }
689
690 ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
691 this->myinfo.sockaddr_len);
692
693 if (ret == -1) {
694 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("binding to %s failed: %s", this->
myinfo.identifier, strerror ((*__errno_location ()))); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 696, GF_LOG_ERROR
, "binding to %s failed: %s", this->myinfo.identifier, strerror
((*__errno_location ()))); } while (0)
695 "binding to %s failed: %s",do { do { if (0) printf ("binding to %s failed: %s", this->
myinfo.identifier, strerror ((*__errno_location ()))); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 696, GF_LOG_ERROR
, "binding to %s failed: %s", this->myinfo.identifier, strerror
((*__errno_location ()))); } while (0)
696 this->myinfo.identifier, strerror (errno))do { do { if (0) printf ("binding to %s failed: %s", this->
myinfo.identifier, strerror ((*__errno_location ()))); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 696, GF_LOG_ERROR
, "binding to %s failed: %s", this->myinfo.identifier, strerror
((*__errno_location ()))); } while (0)
;
697 if (errno(*__errno_location ()) == EADDRINUSE98) {
698 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("Port is already in use"); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 699, GF_LOG_ERROR
, "Port is already in use"); } while (0)
699 "Port is already in use")do { do { if (0) printf ("Port is already in use"); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 699, GF_LOG_ERROR
, "Port is already in use"); } while (0)
;
700 }
701 }
702
703out:
704 return ret;
705}
706
707
708int
709__socket_nonblock (int fd)
710{
711 int flags = 0;
712 int ret = -1;
713
714 flags = fcntl (fd, F_GETFL3);
715
716 if (flags != -1)
717 ret = fcntl (fd, F_SETFL4, flags | O_NONBLOCK04000);
718
719 return ret;
720}
721
722int
723__socket_nodelay (int fd)
724{
725 int on = 1;
726 int ret = -1;
727
728 ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP, TCP_NODELAY1,
729 &on, sizeof (on));
730 if (!ret)
731 gf_log (THIS->name, GF_LOG_TRACE,do { do { if (0) printf ("NODELAY enabled for socket %d", fd)
; } while (0); _gf_log ((*__glusterfs_this_location())->name
, "socket.c", __FUNCTION__, 732, GF_LOG_TRACE, "NODELAY enabled for socket %d"
, fd); } while (0)
732 "NODELAY enabled for socket %d", fd)do { do { if (0) printf ("NODELAY enabled for socket %d", fd)
; } while (0); _gf_log ((*__glusterfs_this_location())->name
, "socket.c", __FUNCTION__, 732, GF_LOG_TRACE, "NODELAY enabled for socket %d"
, fd); } while (0)
;
733
734 return ret;
735}
736
737
738static int
739__socket_keepalive (int fd, int family, int keepalive_intvl, int keepalive_idle)
740{
741 int on = 1;
742 int ret = -1;
743
744 ret = setsockopt (fd, SOL_SOCKET1, SO_KEEPALIVE9, &on, sizeof (on));
745 if (ret == -1) {
746 gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep alive option on socket %d"
, fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__
, 747, GF_LOG_WARNING, "failed to set keep alive option on socket %d"
, fd); } while (0)
747 "failed to set keep alive option on socket %d", fd)do { do { if (0) printf ("failed to set keep alive option on socket %d"
, fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__
, 747, GF_LOG_WARNING, "failed to set keep alive option on socket %d"
, fd); } while (0)
;
748 goto err;
749 }
750
751 if (keepalive_intvl == GF_USE_DEFAULT_KEEPALIVE(-1))
752 goto done;
753
754#if !defined(GF_LINUX_HOST_OS1) && !defined(__NetBSD__)
755#ifdef GF_SOLARIS_HOST_OS
756 ret = setsockopt (fd, SOL_SOCKET1, SO_KEEPALIVE9, &keepalive_intvl,
757 sizeof (keepalive_intvl));
758#else
759 ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP, TCP_KEEPALIVE, &keepalive_intvl,
760 sizeof (keepalive_intvl));
761#endif
762 if (ret == -1) {
763 gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep alive interval on socket %d"
, fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__
, 764, GF_LOG_WARNING, "failed to set keep alive interval on socket %d"
, fd); } while (0)
764 "failed to set keep alive interval on socket %d", fd)do { do { if (0) printf ("failed to set keep alive interval on socket %d"
, fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__
, 764, GF_LOG_WARNING, "failed to set keep alive interval on socket %d"
, fd); } while (0)
;
765 goto err;
766 }
767#else
768 if (family != AF_INET2)
769 goto done;
770
771 ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP, TCP_KEEPIDLE4, &keepalive_idle,
772 sizeof (keepalive_intvl));
773 if (ret == -1) {
774 gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep idle %d on socket %d, %s"
, keepalive_idle, fd, strerror((*__errno_location ()))); } while
(0); _gf_log ("socket", "socket.c", __FUNCTION__, 776, GF_LOG_WARNING
, "failed to set keep idle %d on socket %d, %s", keepalive_idle
, fd, strerror((*__errno_location ()))); } while (0)
775 "failed to set keep idle %d on socket %d, %s",do { do { if (0) printf ("failed to set keep idle %d on socket %d, %s"
, keepalive_idle, fd, strerror((*__errno_location ()))); } while
(0); _gf_log ("socket", "socket.c", __FUNCTION__, 776, GF_LOG_WARNING
, "failed to set keep idle %d on socket %d, %s", keepalive_idle
, fd, strerror((*__errno_location ()))); } while (0)
776 keepalive_idle, fd, strerror(errno))do { do { if (0) printf ("failed to set keep idle %d on socket %d, %s"
, keepalive_idle, fd, strerror((*__errno_location ()))); } while
(0); _gf_log ("socket", "socket.c", __FUNCTION__, 776, GF_LOG_WARNING
, "failed to set keep idle %d on socket %d, %s", keepalive_idle
, fd, strerror((*__errno_location ()))); } while (0)
;
777 goto err;
778 }
779 ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP , TCP_KEEPINTVL5, &keepalive_intvl,
780 sizeof (keepalive_intvl));
781 if (ret == -1) {
782 gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep interval %d on socket %d, %s"
, keepalive_intvl, fd, strerror((*__errno_location ()))); } while
(0); _gf_log ("socket", "socket.c", __FUNCTION__, 784, GF_LOG_WARNING
, "failed to set keep interval %d on socket %d, %s", keepalive_intvl
, fd, strerror((*__errno_location ()))); } while (0)
783 "failed to set keep interval %d on socket %d, %s",do { do { if (0) printf ("failed to set keep interval %d on socket %d, %s"
, keepalive_intvl, fd, strerror((*__errno_location ()))); } while
(0); _gf_log ("socket", "socket.c", __FUNCTION__, 784, GF_LOG_WARNING
, "failed to set keep interval %d on socket %d, %s", keepalive_intvl
, fd, strerror((*__errno_location ()))); } while (0)
784 keepalive_intvl, fd, strerror(errno))do { do { if (0) printf ("failed to set keep interval %d on socket %d, %s"
, keepalive_intvl, fd, strerror((*__errno_location ()))); } while
(0); _gf_log ("socket", "socket.c", __FUNCTION__, 784, GF_LOG_WARNING
, "failed to set keep interval %d on socket %d, %s", keepalive_intvl
, fd, strerror((*__errno_location ()))); } while (0)
;
785 goto err;
786 }
787#endif
788
789done:
790 gf_log (THIS->name, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval "do { do { if (0) printf ("Keep-alive enabled for socket %d, interval "
"%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while
(0); _gf_log ((*__glusterfs_this_location())->name, "socket.c"
, __FUNCTION__, 791, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval "
"%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while
(0)
791 "%d, idle: %d", fd, keepalive_intvl, keepalive_idle)do { do { if (0) printf ("Keep-alive enabled for socket %d, interval "
"%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while
(0); _gf_log ((*__glusterfs_this_location())->name, "socket.c"
, __FUNCTION__, 791, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval "
"%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while
(0)
;
792
793err:
794 return ret;
795}
796
797
798int
799__socket_connect_finish (int fd)
800{
801 int ret = -1;
802 int optval = 0;
803 socklen_t optlen = sizeof (int);
804
805 ret = getsockopt (fd, SOL_SOCKET1, SO_ERROR4, (void *)&optval, &optlen);
806
807 if (ret == 0 && optval) {
808 errno(*__errno_location ()) = optval;
809 ret = -1;
810 }
811
812 return ret;
813}
814
815
816void
817__socket_reset (rpc_transport_t *this)
818{
819 socket_private_t *priv = NULL((void*)0);
820
821 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 821, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
822 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 822, GF_LOG_ERROR, "invalid argument: " "this->private")
; } while (0); goto out; } } while (0)
;
823
824 priv = this->private;
825
826 /* TODO: use mem-pool on incoming data */
827
828 if (priv->incoming.iobref) {
829 iobref_unref (priv->incoming.iobref);
830 priv->incoming.iobref = NULL((void*)0);
831 }
832
833 if (priv->incoming.iobuf) {
834 iobuf_unref (priv->incoming.iobuf);
835 }
836
837 GF_FREE (priv->incoming.request_info)__gf_free (priv->incoming.request_info);
838
839 memset (&priv->incoming, 0, sizeof (priv->incoming));
840
841 event_unregister (this->ctx->event_pool, priv->sock, priv->idx);
842
843 close (priv->sock);
844 priv->sock = -1;
845 priv->idx = -1;
846 priv->connected = -1;
847
848out:
849 return;
850}
851
852
853void
854socket_set_lastfrag (uint32_t *fragsize) {
855 (*fragsize) |= 0x80000000U;
856}
857
858
859void
860socket_set_frag_header_size (uint32_t size, char *haddr)
861{
862 size = htonl (size);
863 memcpy (haddr, &size, sizeof (size));
864}
865
866
867void
868socket_set_last_frag_header_size (uint32_t size, char *haddr)
869{
870 socket_set_lastfrag (&size);
871 socket_set_frag_header_size (size, haddr);
872}
873
874struct ioq *
875__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
876{
877 struct ioq *entry = NULL((void*)0);
878 int count = 0;
879 uint32_t size = 0;
880
881 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 881, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
882
883 /* TODO: use mem-pool */
884 entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq)__gf_calloc (1, sizeof (*entry), gf_common_mt_ioq);
885 if (!entry)
886 return NULL((void*)0);
887
888 count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount;
889
890 GF_ASSERT (count <= (MAX_IOVEC - 1))do { if (!(count <= (16 - 1))) { do { do { if (0) printf (
"Assertion failed: " "count <= (MAX_IOVEC - 1)"); } while (
0); _gf_log_callingfn ("", "socket.c", __FUNCTION__, 890, GF_LOG_ERROR
, "Assertion failed: " "count <= (MAX_IOVEC - 1)"); } while
(0); } } while (0)
;
891
892 size = iov_length (msg->rpchdr, msg->rpchdrcount)
893 + iov_length (msg->proghdr, msg->proghdrcount)
894 + iov_length (msg->progpayload, msg->progpayloadcount);
895
896 if (size > RPC_MAX_FRAGMENT_SIZE0x7fffffff) {
897 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("msg size (%u) bigger than the maximum allowed size on "
"sockets (%u)", size, 0x7fffffff); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 899, GF_LOG_ERROR, "msg size (%u) bigger than the maximum allowed size on "
"sockets (%u)", size, 0x7fffffff); } while (0)
898 "msg size (%u) bigger than the maximum allowed size on "do { do { if (0) printf ("msg size (%u) bigger than the maximum allowed size on "
"sockets (%u)", size, 0x7fffffff); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 899, GF_LOG_ERROR, "msg size (%u) bigger than the maximum allowed size on "
"sockets (%u)", size, 0x7fffffff); } while (0)
899 "sockets (%u)", size, RPC_MAX_FRAGMENT_SIZE)do { do { if (0) printf ("msg size (%u) bigger than the maximum allowed size on "
"sockets (%u)", size, 0x7fffffff); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 899, GF_LOG_ERROR, "msg size (%u) bigger than the maximum allowed size on "
"sockets (%u)", size, 0x7fffffff); } while (0)
;
900 GF_FREE (entry)__gf_free (entry);
901 return NULL((void*)0);
902 }
903
904 socket_set_last_frag_header_size (size, (char *)&entry->fraghdr);
905
906 entry->vector[0].iov_base = (char *)&entry->fraghdr;
907 entry->vector[0].iov_len = sizeof (entry->fraghdr);
908 entry->count = 1;
909
910 if (msg->rpchdr != NULL((void*)0)) {
911 memcpy (&entry->vector[1], msg->rpchdr,
912 sizeof (struct iovec) * msg->rpchdrcount);
913 entry->count += msg->rpchdrcount;
914 }
915
916 if (msg->proghdr != NULL((void*)0)) {
917 memcpy (&entry->vector[entry->count], msg->proghdr,
918 sizeof (struct iovec) * msg->proghdrcount);
919 entry->count += msg->proghdrcount;
920 }
921
922 if (msg->progpayload != NULL((void*)0)) {
923 memcpy (&entry->vector[entry->count], msg->progpayload,
924 sizeof (struct iovec) * msg->progpayloadcount);
925 entry->count += msg->progpayloadcount;
926 }
927
928 entry->pending_vector = entry->vector;
929 entry->pending_count = entry->count;
930
931 if (msg->iobref != NULL((void*)0))
932 entry->iobref = iobref_ref (msg->iobref);
933
934 INIT_LIST_HEAD (&entry->list)do { (&entry->list)->next = (&entry->list)->
prev = &entry->list; } while (0)
;
935
936out:
937 return entry;
938}
939
940
941void
942__socket_ioq_entry_free (struct ioq *entry)
943{
944 GF_VALIDATE_OR_GOTO ("socket", entry, out)do { if (!entry) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "entry"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 944, GF_LOG_ERROR, "invalid argument: "
"entry"); } while (0); goto out; } } while (0)
;
945
946 list_del_init (&entry->list);
947 if (entry->iobref)
948 iobref_unref (entry->iobref);
949
950 /* TODO: use mem-pool */
951 GF_FREE (entry)__gf_free (entry);
952
953out:
954 return;
955}
956
957
958void
959__socket_ioq_flush (rpc_transport_t *this)
960{
961 socket_private_t *priv = NULL((void*)0);
962 struct ioq *entry = NULL((void*)0);
963
964 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 964, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
965 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 965, GF_LOG_ERROR, "invalid argument: " "this->private")
; } while (0); goto out; } } while (0)
;
966
967 priv = this->private;
968
969 while (!list_empty (&priv->ioq)) {
970 entry = priv->ioq_next;
971 __socket_ioq_entry_free (entry);
972 }
973
974out:
975 return;
976}
977
978
979int
980__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
981{
982 int ret = -1;
983 socket_private_t *priv = NULL((void*)0);
984 char a_byte = 0;
985
986 ret = __socket_writev (this, entry->pending_vector,
987 entry->pending_count,
988 &entry->pending_vector,
989 &entry->pending_count);
990
991 if (ret == 0) {
992 /* current entry was completely written */
993 GF_ASSERT (entry->pending_count == 0)do { if (!(entry->pending_count == 0)) { do { do { if (0) printf
("Assertion failed: " "entry->pending_count == 0"); } while
(0); _gf_log_callingfn ("", "socket.c", __FUNCTION__, 993, GF_LOG_ERROR
, "Assertion failed: " "entry->pending_count == 0"); } while
(0); } } while (0)
;
994 __socket_ioq_entry_free (entry);
995 priv = this->private;
996 if (priv->own_thread) {
997 /*
998 * The pipe should only remain readable if there are
999 * more entries after this, so drain the byte
1000 * representing this entry.
1001 */
1002 if (!direct && read(priv->pipe[0],&a_byte,1) < 1) {
1003 gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("read error on pipe"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 1004, GF_LOG_WARNING
, "read error on pipe"); } while (0)
1004 "read error on pipe")do { do { if (0) printf ("read error on pipe"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 1004, GF_LOG_WARNING
, "read error on pipe"); } while (0)
;
1005 }
1006 }
1007 }
1008
1009 return ret;
1010}
1011
1012
1013int
1014__socket_ioq_churn (rpc_transport_t *this)
1015{
1016 socket_private_t *priv = NULL((void*)0);
1017 int ret = 0;
1018 struct ioq *entry = NULL((void*)0);
1019
1020 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1020, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1021 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1021, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1022
1023 priv = this->private;
1024
1025 while (!list_empty (&priv->ioq)) {
1026 /* pick next entry */
1027 entry = priv->ioq_next;
1028
1029 ret = __socket_ioq_churn_entry (this, entry, 0);
1030
1031 if (ret != 0)
1032 break;
1033 }
1034
1035 if (!priv->own_thread && list_empty (&priv->ioq)) {
1036 /* all pending writes done, not interested in POLLOUT */
1037 priv->idx = event_select_on (this->ctx->event_pool,
1038 priv->sock, priv->idx, -1, 0);
1039 }
1040
1041out:
1042 return ret;
1043}
1044
1045
1046int
1047socket_event_poll_err (rpc_transport_t *this)
1048{
1049 socket_private_t *priv = NULL((void*)0);
1050 int ret = -1;
1051
1052 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1052, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1053 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1053, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1054
1055 priv = this->private;
1056
1057 pthread_mutex_lock (&priv->lock);
1058 {
1059 __socket_ioq_flush (this);
1060 __socket_reset (this);
1061 }
1062 pthread_mutex_unlock (&priv->lock);
1063
1064 rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
1065
1066out:
1067 return ret;
1068}
1069
1070
1071int
1072socket_event_poll_out (rpc_transport_t *this)
1073{
1074 socket_private_t *priv = NULL((void*)0);
1075 int ret = -1;
1076
1077 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1077, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1078 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1078, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1079
1080 priv = this->private;
1081
1082 pthread_mutex_lock (&priv->lock);
1083 {
1084 if (priv->connected == 1) {
1085 ret = __socket_ioq_churn (this);
1086
1087 if (ret == -1) {
1088 __socket_disconnect (this);
1089 }
1090 }
1091 }
1092 pthread_mutex_unlock (&priv->lock);
1093
1094 ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL((void*)0));
1095
1096out:
1097 return ret;
1098}
1099
1100
1101static inline int
1102__socket_read_simple_msg (rpc_transport_t *this)
1103{
1104 int ret = 0;
1105 uint32_t remaining_size = 0;
1106 size_t bytes_read = 0;
1107 socket_private_t *priv = NULL((void*)0);
1108 struct gf_sock_incoming *in = NULL((void*)0);
1109 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1110
1111 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1111, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1112 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1112, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1113
1114 priv = this->private;
1115
1116 in = &priv->incoming;
1117 frag = &in->frag;
1118
1119 switch (frag->simple_state) {
1120
1121 case SP_STATE_SIMPLE_MSG_INIT:
1122 remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read;
1123
1124 __socket_proto_init_pending (priv, remaining_size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > remaining_size
) ? remaining_size : remaining; frag->remaining_size = (remaining_size
- frag->pending_vector->iov_len); } while(0)
;
1125
1126 frag->simple_state = SP_STATE_READING_SIMPLE_MSG;
1127
1128 /* fall through */
1129
1130 case SP_STATE_READING_SIMPLE_MSG:
1131 ret = 0;
1132
1133 remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read;
1134
1135 if (remaining_size > 0) {
1136 ret = __socket_readv (this,
1137 in->pending_vector, 1,
1138 &in->pending_vector,
1139 &in->pending_count,
1140 &bytes_read);
1141 }
1142
1143 if (ret == -1) {
1144 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0)
1145 "reading from socket failed. Error (%s), "do { do { if (0) printf ("reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0)
1146 "peer (%s)", strerror (errno),do { do { if (0) printf ("reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0)
1147 this->peerinfo.identifier)do { do { if (0) printf ("reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), "
"peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0)
;
1148 break;
1149 }
1150
1151 frag->bytes_read += bytes_read;
1152 frag->fragcurrent += bytes_read;
1153
1154 if (ret > 0) {
1155 gf_log (this->name, GF_LOG_TRACE,do { do { if (0) printf ("partial read on non-blocking socket."
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 1156, GF_LOG_TRACE, "partial read on non-blocking socket.")
; } while (0)
1156 "partial read on non-blocking socket.")do { do { if (0) printf ("partial read on non-blocking socket."
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 1156, GF_LOG_TRACE, "partial read on non-blocking socket.")
; } while (0)
;
1157 break;
1158 }
1159
1160 if (ret == 0) {
1161 frag->simple_state = SP_STATE_SIMPLE_MSG_INIT;
1162 }
1163 }
1164
1165out:
1166 return ret;
1167}
1168
1169
1170static inline int
1171__socket_read_simple_request (rpc_transport_t *this)
1172{
1173 return __socket_read_simple_msg (this);
1174}
1175
1176
1177#define rpc_cred_addr(buf)(buf + 8 + 24 - 4) (buf + RPC_MSGTYPE_SIZE8 + RPC_CALL_BODY_SIZE24 - 4)
1178
1179#define rpc_verf_addr(fragcurrent)(fragcurrent - 4) (fragcurrent - 4)
1180
1181#define rpc_msgtype_addr(buf)(buf + 4) (buf + 4)
1182
1183#define rpc_prognum_addr(buf)(buf + 8 + 4) (buf + RPC_MSGTYPE_SIZE8 + 4)
1184#define rpc_progver_addr(buf)(buf + 8 + 8) (buf + RPC_MSGTYPE_SIZE8 + 8)
1185#define rpc_procnum_addr(buf)(buf + 8 + 12) (buf + RPC_MSGTYPE_SIZE8 + 12)
1186
1187static inline int
1188__socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vector_sizer)
1189{
1190 socket_private_t *priv = NULL((void*)0);
1191 int ret = 0;
1192 uint32_t credlen = 0, verflen = 0;
1193 char *addr = NULL((void*)0);
1194 struct iobuf *iobuf = NULL((void*)0);
1195 uint32_t remaining_size = 0;
1196 ssize_t readsize = 0;
1197 size_t size = 0;
1198 struct gf_sock_incoming *in = NULL((void*)0);
1199 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1200 sp_rpcfrag_request_state_t *request = NULL((void*)0);
1201
1202 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1202, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1203 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1203, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1204
1205 priv = this->private;
1206
1207 /* used to reduce the indirection */
1208 in = &priv->incoming;
1209 frag = &in->frag;
1210 request = &frag->call_body.request;
1211
1212 switch (request->vector_state) {
1213 case SP_STATE_VECTORED_REQUEST_INIT:
1214 request->vector_sizer_state = 0;
1215
1216 addr = rpc_cred_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 24 - 4);
1217
1218 /* also read verf flavour and verflen */
1219 credlen = ntoh32hton32 (*((uint32_t *)addr))
1220 + RPC_AUTH_FLAVOUR_N_LENGTH_SIZE8;
1221
1222 __socket_proto_init_pending (priv, credlen)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > credlen
) ? credlen : remaining; frag->remaining_size = (credlen -
frag->pending_vector->iov_len); } while(0)
;
1223
1224 request->vector_state = SP_STATE_READING_CREDBYTES;
1225
1226 /* fall through */
1227
1228 case SP_STATE_READING_CREDBYTES:
1229 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1229,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1229, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1230
1231 request->vector_state = SP_STATE_READ_CREDBYTES;
1232
1233 /* fall through */
1234
1235 case SP_STATE_READ_CREDBYTES:
1236 addr = rpc_verf_addr (frag->fragcurrent)(frag->fragcurrent - 4);
1237 verflen = ntoh32hton32 (*((uint32_t *)addr));
1238
1239 if (verflen == 0) {
1240 request->vector_state = SP_STATE_READ_VERFBYTES;
1241 goto sp_state_read_verfbytes;
1242 }
1243 __socket_proto_init_pending (priv, verflen)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > verflen
) ? verflen : remaining; frag->remaining_size = (verflen -
frag->pending_vector->iov_len); } while(0)
;
1244
1245 request->vector_state = SP_STATE_READING_VERFBYTES;
1246
1247 /* fall through */
1248
1249 case SP_STATE_READING_VERFBYTES:
1250 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1250,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1250, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1251
1252 request->vector_state = SP_STATE_READ_VERFBYTES;
1253
1254 /* fall through */
1255
1256 case SP_STATE_READ_VERFBYTES:
1257sp_state_read_verfbytes:
1258 /* set the base_addr 'persistently' across multiple calls
1259 into the state machine */
1260 in->proghdr_base_addr = frag->fragcurrent;
1261
1262 request->vector_sizer_state =
1263 vector_sizer (request->vector_sizer_state,
1264 &readsize, in->proghdr_base_addr,
1265 frag->fragcurrent);
1266 __socket_proto_init_pending (priv, readsize)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > readsize
) ? readsize : remaining; frag->remaining_size = (readsize
- frag->pending_vector->iov_len); } while(0)
;
1267
1268 request->vector_state = SP_STATE_READING_PROGHDR;
1269
1270 /* fall through */
1271
1272 case SP_STATE_READING_PROGHDR:
1273 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1273,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1273, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1274
1275 request->vector_state = SP_STATE_READ_PROGHDR;
1276
1277 /* fall through */
1278
1279 case SP_STATE_READ_PROGHDR:
1280sp_state_read_proghdr:
1281 request->vector_sizer_state =
1282 vector_sizer (request->vector_sizer_state,
1283 &readsize, in->proghdr_base_addr,
1284 frag->fragcurrent);
1285 if (readsize == 0) {
1286 request->vector_state = SP_STATE_READ_PROGHDR_XDATA;
1287 goto sp_state_read_proghdr_xdata;
1288 }
1289
1290 __socket_proto_init_pending (priv, readsize)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > readsize
) ? readsize : remaining; frag->remaining_size = (readsize
- frag->pending_vector->iov_len); } while(0)
;
1291
1292 request->vector_state = SP_STATE_READING_PROGHDR_XDATA;
1293
1294 /* fall through */
1295
1296 case SP_STATE_READING_PROGHDR_XDATA:
1297 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1297,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1297, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1298
1299 request->vector_state = SP_STATE_READ_PROGHDR;
1300 /* check if the vector_sizer() has more to say */
1301 goto sp_state_read_proghdr;
1302
1303 case SP_STATE_READ_PROGHDR_XDATA:
1304sp_state_read_proghdr_xdata:
1305 if (in->payload_vector.iov_base == NULL((void*)0)) {
1306
1307 size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read;
1308 iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
1309 if (!iobuf) {
1310 ret = -1;
1311 break;
1312 }
1313
1314 if (in->iobref == NULL((void*)0)) {
1315 in->iobref = iobref_new ();
1316 if (in->iobref == NULL((void*)0)) {
1317 ret = -1;
1318 iobuf_unref (iobuf);
1319 break;
1320 }
1321 }
1322
1323 iobref_add (in->iobref, iobuf);
1324 iobuf_unref (iobuf);
1325
1326 in->payload_vector.iov_base = iobuf_ptr (iobuf)((iobuf)->ptr);
1327
1328 frag->fragcurrent = iobuf_ptr (iobuf)((iobuf)->ptr);
1329 }
1330
1331 request->vector_state = SP_STATE_READING_PROG;
1332
1333 /* fall through */
1334
1335 case SP_STATE_READING_PROG:
1336 /* now read the remaining rpc msg into buffer pointed by
1337 * fragcurrent
1338 */
1339
1340 ret = __socket_read_simple_msg (this);
1341
1342 remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read;
1343
1344 if ((ret == -1) ||
1345 ((ret == 0) && (remaining_size == 0)
1346 && RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U)))) {
1347 request->vector_state = SP_STATE_VECTORED_REQUEST_INIT;
1348 in->payload_vector.iov_len
1349 = ((unsigned long)frag->fragcurrent
1350 - (unsigned long)in->payload_vector.iov_base);
1351 }
1352 break;
1353 }
1354
1355out:
1356 return ret;
1357}
1358
1359static inline int
1360__socket_read_request (rpc_transport_t *this)
1361{
1362 socket_private_t *priv = NULL((void*)0);
1363 uint32_t prognum = 0, procnum = 0, progver = 0;
1364 uint32_t remaining_size = 0;
1365 int ret = -1;
1366 char *buf = NULL((void*)0);
1367 rpcsvc_vector_sizer vector_sizer = NULL((void*)0);
1368 struct gf_sock_incoming *in = NULL((void*)0);
1369 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1370 sp_rpcfrag_request_state_t *request = NULL((void*)0);
1371
1372 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1372, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1373 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1373, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1374
1375 priv = this->private;
1376
1377 /* used to reduce the indirection */
1378 in = &priv->incoming;
1379 frag = &in->frag;
1380 request = &frag->call_body.request;
1381
1382 switch (request->header_state) {
1383
1384 case SP_STATE_REQUEST_HEADER_INIT:
1385
1386 __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > 24
) ? 24 : remaining; frag->remaining_size = (24 - frag->
pending_vector->iov_len); } while(0)
;
1387
1388 request->header_state = SP_STATE_READING_RPCHDR1;
1389
1390 /* fall through */
1391
1392 case SP_STATE_READING_RPCHDR1:
1393 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1393,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1393, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1394
1395 request->header_state = SP_STATE_READ_RPCHDR1;
1396
1397 /* fall through */
1398
1399 case SP_STATE_READ_RPCHDR1:
1400 buf = rpc_prognum_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 4);
1401 prognum = ntoh32hton32 (*((uint32_t *)buf));
1402
1403 buf = rpc_progver_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 8);
1404 progver = ntoh32hton32 (*((uint32_t *)buf));
1405
1406 buf = rpc_procnum_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 12);
1407 procnum = ntoh32hton32 (*((uint32_t *)buf));
1408
1409 if (this->listener) {
1410 /* this check is needed as rpcsvc and rpc-clnt
1411 * actor structures are not same */
1412 vector_sizer =
1413 rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata,
1414 prognum, progver, procnum);
1415 }
1416
1417 if (vector_sizer) {
1418 ret = __socket_read_vectored_request (this, vector_sizer);
1419 } else {
1420 ret = __socket_read_simple_request (this);
1421 }
1422
1423 remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read;
1424
1425 if ((ret == -1)
1426 || ((ret == 0)
1427 && (remaining_size == 0)
1428 && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) {
1429 request->header_state = SP_STATE_REQUEST_HEADER_INIT;
1430 }
1431
1432 break;
1433 }
1434
1435out:
1436 return ret;
1437}
1438
1439
1440static inline int
1441__socket_read_accepted_successful_reply (rpc_transport_t *this)
1442{
1443 socket_private_t *priv = NULL((void*)0);
1444 int ret = 0;
1445 struct iobuf *iobuf = NULL((void*)0);
1446 gfs3_read_rsp read_rsp = {0, };
1447 ssize_t size = 0;
1448 ssize_t default_read_size = 0;
1449 char *proghdr_buf = NULL((void*)0);
1450 XDR xdr;
1451 struct gf_sock_incoming *in = NULL((void*)0);
1452 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1453
1454 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1454, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1455 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1455, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1456
1457 priv = this->private;
1458
1459 /* used to reduce the indirection */
1460 in = &priv->incoming;
1461 frag = &in->frag;
1462
1463 switch (frag->call_body.reply.accepted_success_state) {
1464
1465 case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT:
1466 default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp,
1467 &read_rsp);
1468
1469 proghdr_buf = frag->fragcurrent;
1470
1471 __socket_proto_init_pending (priv, default_read_size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > default_read_size
) ? default_read_size : remaining; frag->remaining_size = (
default_read_size - frag->pending_vector->iov_len); } while
(0)
;
1472
1473 frag->call_body.reply.accepted_success_state
1474 = SP_STATE_READING_PROC_HEADER;
1475
1476 /* fall through */
1477
1478 case SP_STATE_READING_PROC_HEADER:
1479 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1479,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1479, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1480
1481 /* there can be 'xdata' in read response, figure it out */
1482 xdrmem_create (&xdr, proghdr_buf, default_read_size,
1483 XDR_DECODE);
1484
1485 /* This will fail if there is xdata sent from server, if not,
1486 well and good, we don't need to worry about */
1487 xdr_gfs3_read_rsp (&xdr, &read_rsp);
1488
1489 free (read_rsp.xdata.xdata_val);
1490
1491 /* need to round off to proper roof (%4), as XDR packing pads
1492 the end of opaque object with '0' */
1493 size = roof (read_rsp.xdata.xdata_len, 4)((((read_rsp.xdata.xdata_len)+(4)-1)/((4)?(4):1))*(4));
1494
1495 if (!size) {
1496 frag->call_body.reply.accepted_success_state
1497 = SP_STATE_READ_PROC_OPAQUE;
1498 goto read_proc_opaque;
1499 }
1500
1501 __socket_proto_init_pending (priv, size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > size
) ? size : remaining; frag->remaining_size = (size - frag->
pending_vector->iov_len); } while(0)
;
1502
1503 frag->call_body.reply.accepted_success_state
1504 = SP_STATE_READING_PROC_OPAQUE;
1505
1506 case SP_STATE_READING_PROC_OPAQUE:
1507 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1507,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1507, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1508
1509 frag->call_body.reply.accepted_success_state
1510 = SP_STATE_READ_PROC_OPAQUE;
1511
1512 case SP_STATE_READ_PROC_OPAQUE:
1513 read_proc_opaque:
1514 if (in->payload_vector.iov_base == NULL((void*)0)) {
1515
1516 size = (RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read);
1517
1518 iobuf = iobuf_get2 (this->ctx->iobuf_pool, size);
1519 if (iobuf == NULL((void*)0)) {
1520 ret = -1;
1521 goto out;
1522 }
1523
1524 if (in->iobref == NULL((void*)0)) {
1525 in->iobref = iobref_new ();
1526 if (in->iobref == NULL((void*)0)) {
1527 ret = -1;
1528 iobuf_unref (iobuf);
1529 goto out;
1530 }
1531 }
1532
1533 iobref_add (in->iobref, iobuf);
1534 iobuf_unref (iobuf);
1535
1536 in->payload_vector.iov_base = iobuf_ptr (iobuf)((iobuf)->ptr);
1537
1538 in->payload_vector.iov_len = size;
1539 }
1540
1541 frag->fragcurrent = in->payload_vector.iov_base;
1542
1543 frag->call_body.reply.accepted_success_state
1544 = SP_STATE_READ_PROC_HEADER;
1545
1546 /* fall through */
1547
1548 case SP_STATE_READ_PROC_HEADER:
1549 /* now read the entire remaining msg into new iobuf */
1550 ret = __socket_read_simple_msg (this);
1551 if ((ret == -1)
1552 || ((ret == 0) && RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U)))) {
1553 frag->call_body.reply.accepted_success_state
1554 = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT;
1555 }
1556
1557 break;
1558 }
1559
1560out:
1561 return ret;
1562}
1563
1564#define rpc_reply_verflen_addr(fragcurrent)((char *)fragcurrent - 4) ((char *)fragcurrent - 4)
1565#define rpc_reply_accept_status_addr(fragcurrent)((char *)fragcurrent - 4) ((char *)fragcurrent - 4)
1566
1567static inline int
1568__socket_read_accepted_reply (rpc_transport_t *this)
1569{
1570 socket_private_t *priv = NULL((void*)0);
1571 int ret = -1;
1572 char *buf = NULL((void*)0);
1573 uint32_t verflen = 0, len = 0;
1574 uint32_t remaining_size = 0;
1575 struct gf_sock_incoming *in = NULL((void*)0);
1576 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1577
1578 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1578, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1579 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1579, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1580
1581 priv = this->private;
1582 /* used to reduce the indirection */
1583 in = &priv->incoming;
1584 frag = &in->frag;
1585
1586 switch (frag->call_body.reply.accepted_state) {
1587
1588 case SP_STATE_ACCEPTED_REPLY_INIT:
1589 __socket_proto_init_pending (priv,do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > 8
) ? 8 : remaining; frag->remaining_size = (8 - frag->pending_vector
->iov_len); } while(0)
1590 RPC_AUTH_FLAVOUR_N_LENGTH_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > 8
) ? 8 : remaining; frag->remaining_size = (8 - frag->pending_vector
->iov_len); } while(0)
;
1591
1592 frag->call_body.reply.accepted_state
1593 = SP_STATE_READING_REPLY_VERFLEN;
1594
1595 /* fall through */
1596
1597 case SP_STATE_READING_REPLY_VERFLEN:
1598 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1598,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1598, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1599
1600 frag->call_body.reply.accepted_state
1601 = SP_STATE_READ_REPLY_VERFLEN;
1602
1603 /* fall through */
1604
1605 case SP_STATE_READ_REPLY_VERFLEN:
1606 buf = rpc_reply_verflen_addr (frag->fragcurrent)((char *)frag->fragcurrent - 4);
1607
1608 verflen = ntoh32hton32 (*((uint32_t *) buf));
1609
1610 /* also read accept status along with verf data */
1611 len = verflen + RPC_ACCEPT_STATUS_LEN4;
1612
1613 __socket_proto_init_pending (priv, len)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > len
) ? len : remaining; frag->remaining_size = (len - frag->
pending_vector->iov_len); } while(0)
;
1614
1615 frag->call_body.reply.accepted_state
1616 = SP_STATE_READING_REPLY_VERFBYTES;
1617
1618 /* fall through */
1619
1620 case SP_STATE_READING_REPLY_VERFBYTES:
1621 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1621,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1621, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1622
1623 frag->call_body.reply.accepted_state
1624 = SP_STATE_READ_REPLY_VERFBYTES;
1625
1626 buf = rpc_reply_accept_status_addr (frag->fragcurrent)((char *)frag->fragcurrent - 4);
1627
1628 frag->call_body.reply.accept_status
1629 = ntoh32hton32 (*(uint32_t *) buf);
1630
1631 /* fall through */
1632
1633 case SP_STATE_READ_REPLY_VERFBYTES:
1634
1635 if (frag->call_body.reply.accept_status
1636 == SUCCESS) {
1637 ret = __socket_read_accepted_successful_reply (this);
1638 } else {
1639 /* read entire remaining msg into buffer pointed to by
1640 * fragcurrent
1641 */
1642 ret = __socket_read_simple_msg (this);
1643 }
1644
1645 remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU))
1646 - frag->bytes_read;
1647
1648 if ((ret == -1)
1649 || ((ret == 0) && (remaining_size == 0)
1650 && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) {
1651 frag->call_body.reply.accepted_state
1652 = SP_STATE_ACCEPTED_REPLY_INIT;
1653 }
1654
1655 break;
1656 }
1657
1658out:
1659 return ret;
1660}
1661
1662
1663static inline int
1664__socket_read_denied_reply (rpc_transport_t *this)
1665{
1666 return __socket_read_simple_msg (this);
1667}
1668
1669
1670#define rpc_reply_status_addr(fragcurrent)((char *)fragcurrent - 4) ((char *)fragcurrent - 4)
1671
1672
1673static inline int
1674__socket_read_vectored_reply (rpc_transport_t *this)
1675{
1676 socket_private_t *priv = NULL((void*)0);
1677 int ret = 0;
1678 char *buf = NULL((void*)0);
1679 uint32_t remaining_size = 0;
1680 struct gf_sock_incoming *in = NULL((void*)0);
1681 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1682
1683 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1683, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1684 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1684, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1685
1686 priv = this->private;
1687 in = &priv->incoming;
1688 frag = &in->frag;
1689
1690 switch (frag->call_body.reply.status_state) {
1691
1692 case SP_STATE_ACCEPTED_REPLY_INIT:
1693 __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > 4
) ? 4 : remaining; frag->remaining_size = (4 - frag->pending_vector
->iov_len); } while(0)
;
1694
1695 frag->call_body.reply.status_state
1696 = SP_STATE_READING_REPLY_STATUS;
1697
1698 /* fall through */
1699
1700 case SP_STATE_READING_REPLY_STATUS:
1701 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1701,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1701, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1702
1703 buf = rpc_reply_status_addr (frag->fragcurrent)((char *)frag->fragcurrent - 4);
1704
1705 frag->call_body.reply.accept_status
1706 = ntoh32hton32 (*((uint32_t *) buf));
1707
1708 frag->call_body.reply.status_state
1709 = SP_STATE_READ_REPLY_STATUS;
1710
1711 /* fall through */
1712
1713 case SP_STATE_READ_REPLY_STATUS:
1714 if (frag->call_body.reply.accept_status == MSG_ACCEPTED) {
1715 ret = __socket_read_accepted_reply (this);
1716 } else {
1717 ret = __socket_read_denied_reply (this);
1718 }
1719
1720 remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read;
1721
1722 if ((ret == -1)
1723 || ((ret == 0) && (remaining_size == 0)
1724 && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) {
1725 frag->call_body.reply.status_state
1726 = SP_STATE_ACCEPTED_REPLY_INIT;
1727 in->payload_vector.iov_len
1728 = (unsigned long)frag->fragcurrent
1729 - (unsigned long)in->payload_vector.iov_base;
1730 }
1731 break;
1732 }
1733
1734out:
1735 return ret;
1736}
1737
1738
1739static inline int
1740__socket_read_simple_reply (rpc_transport_t *this)
1741{
1742 return __socket_read_simple_msg (this);
1743}
1744
1745#define rpc_xid_addr(buf)(buf) (buf)
1746
1747static inline int
1748__socket_read_reply (rpc_transport_t *this)
1749{
1750 socket_private_t *priv = NULL((void*)0);
1751 char *buf = NULL((void*)0);
1752 int32_t ret = -1;
1753 rpc_request_info_t *request_info = NULL((void*)0);
1754 char map_xid = 0;
1755 struct gf_sock_incoming *in = NULL((void*)0);
1756 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1757
1758 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1758, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1759 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1759, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1760
1761 priv = this->private;
1762 in = &priv->incoming;
1763 frag = &in->frag;
1764
1765 buf = rpc_xid_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr));
1766
1767 if (in->request_info == NULL((void*)0)) {
1768 in->request_info = GF_CALLOC (1, sizeof (*request_info),__gf_calloc (1, sizeof (*request_info), gf_common_mt_rpc_trans_reqinfo_t
)
1769 gf_common_mt_rpc_trans_reqinfo_t)__gf_calloc (1, sizeof (*request_info), gf_common_mt_rpc_trans_reqinfo_t
)
;
1770 if (in->request_info == NULL((void*)0)) {
1771 goto out;
1772 }
1773
1774 map_xid = 1;
1775 }
1776
1777 request_info = in->request_info;
1778
1779 if (map_xid) {
1780 request_info->xid = ntoh32hton32 (*((uint32_t *) buf));
1781
1782 /* release priv->lock, so as to avoid deadlock b/w conn->lock
1783 * and priv->lock, since we are doing an upcall here.
1784 */
1785 pthread_mutex_unlock (&priv->lock);
1786 {
1787 ret = rpc_transport_notify (this,
1788 RPC_TRANSPORT_MAP_XID_REQUEST,
1789 in->request_info);
1790 }
1791 pthread_mutex_lock (&priv->lock);
1792
1793 if (ret == -1) {
1794 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("notify for event MAP_XID failed"); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
1795, GF_LOG_WARNING, "notify for event MAP_XID failed"); } while
(0)
1795 "notify for event MAP_XID failed")do { do { if (0) printf ("notify for event MAP_XID failed"); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
1795, GF_LOG_WARNING, "notify for event MAP_XID failed"); } while
(0)
;
1796 goto out;
1797 }
1798 }
1799
1800 if ((request_info->prognum == GLUSTER_FOP_PROGRAM1298437)
1801 && (request_info->procnum == GF_FOP_READ)) {
1802 if (map_xid && request_info->rsp.rsp_payload_count != 0) {
1803 in->iobref = iobref_ref (request_info->rsp.rsp_iobref);
1804 in->payload_vector = *request_info->rsp.rsp_payload;
1805 }
1806
1807 ret = __socket_read_vectored_reply (this);
1808 } else {
1809 ret = __socket_read_simple_reply (this);
1810 }
1811out:
1812 return ret;
1813}
1814
1815
1816/* returns the number of bytes yet to be read in a fragment */
1817static inline int
1818__socket_read_frag (rpc_transport_t *this)
1819{
1820 socket_private_t *priv = NULL((void*)0);
1821 int32_t ret = 0;
1822 char *buf = NULL((void*)0);
1823 uint32_t remaining_size = 0;
1824 struct gf_sock_incoming *in = NULL((void*)0);
1825 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1826
1827 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1827, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1828 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1828, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1829
1830 priv = this->private;
1831 /* used to reduce the indirection */
1832 in = &priv->incoming;
1833 frag = &in->frag;
1834
1835 switch (frag->state) {
1836 case SP_STATE_NADA:
1837 __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; remaining = (((uint32_t
)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read
); do { struct gf_sock_incoming_frag *frag; frag = &priv->
incoming.frag; memset (&frag->vector, 0, sizeof (frag->
vector)); frag->pending_vector = &frag->vector; frag
->pending_vector->iov_base = frag->fragcurrent; priv
->incoming.pending_vector = frag->pending_vector; } while
(0); frag->pending_vector->iov_len = (remaining > 8
) ? 8 : remaining; frag->remaining_size = (8 - frag->pending_vector
->iov_len); } while(0)
;
1838
1839 frag->state = SP_STATE_READING_MSGTYPE;
1840
1841 /* fall through */
1842
1843 case SP_STATE_READING_MSGTYPE:
1844 __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = &
priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; if (frag->pending_vector
->iov_len == 0) { remaining = (((uint32_t)(priv->incoming
.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->
pending_vector->iov_len = (remaining > frag->remaining_size
) ? frag->remaining_size : remaining; frag->remaining_size
-= frag->pending_vector->iov_len; } } while (0); ret =
__socket_readv (this, in->pending_vector, 1, &in->
pending_vector, &in->pending_count, &bytes_read); if
(ret == -1) { if (priv->read_fail_log) do { do { if (0) printf
("reading from socket failed." "Error (%s), peer (%s)", strerror
((*__errno_location ())), this->peerinfo.identifier); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 1844,
GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0); break; } { struct gf_sock_incoming_frag *frag
; frag = &priv->incoming.frag; frag->fragcurrent +=
bytes_read; frag->bytes_read += bytes_read; if ((ret >
0) || (frag->remaining_size != 0)) { if (frag->remaining_size
!= 0 && ret == 0) { do { struct gf_sock_incoming_frag
*frag; frag = &priv->incoming.frag; memset (&frag
->vector, 0, sizeof (frag->vector)); frag->pending_vector
= &frag->vector; frag->pending_vector->iov_base
= frag->fragcurrent; priv->incoming.pending_vector = frag
->pending_vector; } while (0); } do { do { if (0) printf (
"partial read on non-blocking socket"); } while (0); _gf_log (
this->name, "socket.c", __FUNCTION__, 1844, GF_LOG_TRACE, "partial read on non-blocking socket"
); } while (0); break; } }; }
;
1845
1846 frag->state = SP_STATE_READ_MSGTYPE;
1847 /* fall through */
1848
1849 case SP_STATE_READ_MSGTYPE:
1850 buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 4);
1851 in->msg_type = ntoh32hton32 (*((uint32_t *)buf));
1852
1853 if (in->msg_type == CALL) {
1854 ret = __socket_read_request (this);
1855 } else if (in->msg_type == REPLY) {
1856 ret = __socket_read_reply (this);
1857 } else if (in->msg_type == GF_UNIVERSAL_ANSWER42) {
1858 gf_log ("rpc", GF_LOG_ERROR,do { do { if (0) printf ("older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c"
, __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0)
1859 "older version of protocol/process trying to "do { do { if (0) printf ("older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c"
, __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0)
1860 "connect from %s. use newer version on that node",do { do { if (0) printf ("older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c"
, __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0)
1861 this->peerinfo.identifier)do { do { if (0) printf ("older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c"
, __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to "
"connect from %s. use newer version on that node", this->
peerinfo.identifier); } while (0)
;
1862 } else {
1863 gf_log ("rpc", GF_LOG_ERROR,do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s"
, in->msg_type, this->peerinfo.identifier); } while (0)
; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR
, "wrong MSG-TYPE (%d) received from %s", in->msg_type, this
->peerinfo.identifier); } while (0)
1864 "wrong MSG-TYPE (%d) received from %s",do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s"
, in->msg_type, this->peerinfo.identifier); } while (0)
; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR
, "wrong MSG-TYPE (%d) received from %s", in->msg_type, this
->peerinfo.identifier); } while (0)
1865 in->msg_type,do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s"
, in->msg_type, this->peerinfo.identifier); } while (0)
; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR
, "wrong MSG-TYPE (%d) received from %s", in->msg_type, this
->peerinfo.identifier); } while (0)
1866 this->peerinfo.identifier)do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s"
, in->msg_type, this->peerinfo.identifier); } while (0)
; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR
, "wrong MSG-TYPE (%d) received from %s", in->msg_type, this
->peerinfo.identifier); } while (0)
;
1867 ret = -1;
1868 }
1869
1870 remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read;
1871
1872 if ((ret == -1)
1873 || ((ret == 0) && (remaining_size == 0)
1874 && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) {
1875 frag->state = SP_STATE_NADA;
1876 }
1877
1878 break;
1879 }
1880
1881out:
1882 return ret;
1883}
1884
1885
1886static inline
1887void __socket_reset_priv (socket_private_t *priv)
1888{
1889 struct gf_sock_incoming *in = NULL((void*)0);
1890
1891 /* used to reduce the indirection */
1892 in = &priv->incoming;
1893
1894 if (in->iobref) {
1895 iobref_unref (in->iobref);
1896 in->iobref = NULL((void*)0);
1897 }
1898
1899 if (in->iobuf) {
1900 iobuf_unref (in->iobuf);
1901 }
1902
1903 if (in->request_info != NULL((void*)0)) {
1904 GF_FREE (in->request_info)__gf_free (in->request_info);
1905 in->request_info = NULL((void*)0);
1906 }
1907
1908 memset (&in->payload_vector, 0,
1909 sizeof (in->payload_vector));
1910
1911 in->iobuf = NULL((void*)0);
1912}
1913
1914
1915int
1916__socket_proto_state_machine (rpc_transport_t *this,
1917 rpc_transport_pollin_t **pollin)
1918{
1919 int ret = -1;
1920 socket_private_t *priv = NULL((void*)0);
1921 struct iobuf *iobuf = NULL((void*)0);
1922 struct iobref *iobref = NULL((void*)0);
1923 struct iovec vector[2];
1924 struct gf_sock_incoming *in = NULL((void*)0);
1925 struct gf_sock_incoming_frag *frag = NULL((void*)0);
1926
1927 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 1927, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
1928 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 1928, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
1929
1930 priv = this->private;
1931 /* used to reduce the indirection */
1932 in = &priv->incoming;
1933 frag = &in->frag;
1934
1935 while (in->record_state != SP_STATE_COMPLETE) {
1936 switch (in->record_state) {
1937
1938 case SP_STATE_NADA:
1939 in->total_bytes_read = 0;
1940 in->payload_vector.iov_len = 0;
1941
1942 in->pending_vector = in->vector;
1943 in->pending_vector->iov_base = &in->fraghdr;
1944
1945 in->pending_vector->iov_len = sizeof (in->fraghdr);
1946
1947 in->record_state = SP_STATE_READING_FRAGHDR;
1948
1949 /* fall through */
1950
1951 case SP_STATE_READING_FRAGHDR:
1952 ret = __socket_readv (this, in->pending_vector, 1,
1953 &in->pending_vector,
1954 &in->pending_count,
1955 NULL((void*)0));
1956 if (ret == -1) {
1957 if (priv->read_fail_log == 1) {
1958 gf_log (this->name,do { do { if (0) printf ("reading from socket failed. Error (%s)"
", peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING
: GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
1959 ((priv->connected == 1) ?do { do { if (0) printf ("reading from socket failed. Error (%s)"
", peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING
: GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
1960 GF_LOG_WARNING : GF_LOG_DEBUG),do { do { if (0) printf ("reading from socket failed. Error (%s)"
", peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING
: GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
1961 "reading from socket failed. Error (%s)"do { do { if (0) printf ("reading from socket failed. Error (%s)"
", peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING
: GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
1962 ", peer (%s)", strerror (errno),do { do { if (0) printf ("reading from socket failed. Error (%s)"
", peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING
: GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
1963 this->peerinfo.identifier)do { do { if (0) printf ("reading from socket failed. Error (%s)"
", peer (%s)", strerror ((*__errno_location ())), this->peerinfo
.identifier); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING
: GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)"
, strerror ((*__errno_location ())), this->peerinfo.identifier
); } while (0)
;
1964 }
1965 goto out;
1966 }
1967
1968 if (ret > 0) {
1969 gf_log (this->name, GF_LOG_TRACE, "partial "do { do { if (0) printf ("partial " "fragment header read"); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
1970, GF_LOG_TRACE, "partial " "fragment header read"); } while
(0)
1970 "fragment header read")do { do { if (0) printf ("partial " "fragment header read"); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
1970, GF_LOG_TRACE, "partial " "fragment header read"); } while
(0)
;
1971 goto out;
1972 }
1973
1974 if (ret == 0) {
1975 in->record_state = SP_STATE_READ_FRAGHDR;
1976 }
1977 /* fall through */
1978
1979 case SP_STATE_READ_FRAGHDR:
1980
1981 in->fraghdr = ntoh32hton32 (in->fraghdr);
1982 in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU));
1983 iobuf = iobuf_get2 (this->ctx->iobuf_pool,
1984 (in->total_bytes_read +
1985 sizeof (in->fraghdr)));
1986 if (!iobuf) {
1987 ret = -ENOMEM12;
1988 goto out;
1989 }
1990
1991 in->iobuf = iobuf;
1992 in->iobuf_size = 0;
1993 frag->fragcurrent = iobuf_ptr (iobuf)((iobuf)->ptr);
1994 in->record_state = SP_STATE_READING_FRAG;
1995 /* fall through */
1996
1997 case SP_STATE_READING_FRAG:
1998 ret = __socket_read_frag (this);
1999
2000 if ((ret == -1) ||
2001 (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)))) {
2002 goto out;
2003 }
2004
2005 frag->bytes_read = 0;
2006
2007 if (!RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))) {
2008 in->record_state = SP_STATE_READING_FRAGHDR;
2009 break;
2010 }
2011
2012 /* we've read the entire rpc record, notify the
2013 * upper layers.
2014 */
2015 if (pollin != NULL((void*)0)) {
2016 int count = 0;
2017 in->iobuf_size = (in->total_bytes_read -
2018 in->payload_vector.iov_len);
2019
2020 memset (vector, 0, sizeof (vector));
2021
2022 if (in->iobref == NULL((void*)0)) {
2023 in->iobref = iobref_new ();
2024 if (in->iobref == NULL((void*)0)) {
2025 ret = -1;
2026 goto out;
2027 }
2028 }
2029
2030 vector[count].iov_base = iobuf_ptr (in->iobuf)((in->iobuf)->ptr);
2031 vector[count].iov_len = in->iobuf_size;
2032
2033 iobref = in->iobref;
2034
2035 count++;
2036
2037 if (in->payload_vector.iov_base != NULL((void*)0)) {
2038 vector[count] = in->payload_vector;
2039 count++;
2040 }
2041
2042 *pollin = rpc_transport_pollin_alloc (this,
2043 vector,
2044 count,
2045 in->iobuf,
2046 iobref,
2047 in->request_info);
2048 iobuf_unref (in->iobuf);
2049 in->iobuf = NULL((void*)0);
2050
2051 if (*pollin == NULL((void*)0)) {
2052 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("transport pollin allocation failed"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2053, GF_LOG_WARNING, "transport pollin allocation failed")
; } while (0)
2053 "transport pollin allocation failed")do { do { if (0) printf ("transport pollin allocation failed"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2053, GF_LOG_WARNING, "transport pollin allocation failed")
; } while (0)
;
2054 ret = -1;
2055 goto out;
2056 }
2057 if (in->msg_type == REPLY)
2058 (*pollin)->is_reply = 1;
2059
2060 in->request_info = NULL((void*)0);
2061 }
2062 in->record_state = SP_STATE_COMPLETE;
2063 break;
2064
2065 case SP_STATE_COMPLETE:
2066 /* control should not reach here */
2067 gf_log (this->name, GF_LOG_WARNING, "control reached to "do { do { if (0) printf ("control reached to " "SP_STATE_COMPLETE, which should not have "
"happened"); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 2069, GF_LOG_WARNING, "control reached to " "SP_STATE_COMPLETE, which should not have "
"happened"); } while (0)
2068 "SP_STATE_COMPLETE, which should not have "do { do { if (0) printf ("control reached to " "SP_STATE_COMPLETE, which should not have "
"happened"); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 2069, GF_LOG_WARNING, "control reached to " "SP_STATE_COMPLETE, which should not have "
"happened"); } while (0)
2069 "happened")do { do { if (0) printf ("control reached to " "SP_STATE_COMPLETE, which should not have "
"happened"); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 2069, GF_LOG_WARNING, "control reached to " "SP_STATE_COMPLETE, which should not have "
"happened"); } while (0)
;
2070 break;
2071 }
2072 }
2073
2074 if (in->record_state == SP_STATE_COMPLETE) {
2075 in->record_state = SP_STATE_NADA;
2076 __socket_reset_priv (priv);
2077 }
2078
2079out:
2080 if ((ret == -1) && (errno(*__errno_location ()) == EAGAIN11)) {
2081 ret = 0;
2082 }
2083 return ret;
2084}
2085
2086
2087int
2088socket_proto_state_machine (rpc_transport_t *this,
2089 rpc_transport_pollin_t **pollin)
2090{
2091 socket_private_t *priv = NULL((void*)0);
2092 int ret = 0;
2093
2094 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 2094, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
2095 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 2095, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
2096
2097 priv = this->private;
2098
2099 pthread_mutex_lock (&priv->lock);
2100 {
2101 ret = __socket_proto_state_machine (this, pollin);
2102 }
2103 pthread_mutex_unlock (&priv->lock);
2104
2105out:
2106 return ret;
2107}
2108
2109
2110int
2111socket_event_poll_in (rpc_transport_t *this)
2112{
2113 int ret = -1;
2114 rpc_transport_pollin_t *pollin = NULL((void*)0);
2115
2116 ret = socket_proto_state_machine (this, &pollin);
2117
2118 if (pollin != NULL((void*)0)) {
2119 ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
2120 pollin);
2121 rpc_transport_pollin_destroy (pollin);
2122 }
2123
2124 return ret;
2125}
2126
2127
2128int
2129socket_connect_finish (rpc_transport_t *this)
2130{
2131 int ret = -1;
2132 socket_private_t *priv = NULL((void*)0);
2133 rpc_transport_event_t event = 0;
2134 char notify_rpc = 0;
2135
2136 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 2136, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
2137 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 2137, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
2138
2139 priv = this->private;
2140
2141 pthread_mutex_lock (&priv->lock);
2142 {
2143 if (priv->connected != 0)
2144 goto unlock;
2145
2146 get_transport_identifiers (this);
2147
2148 ret = __socket_connect_finish (priv->sock);
2149
2150 if (ret == -1 && errno(*__errno_location ()) == EINPROGRESS115)
2151 ret = 1;
2152
2153 if (ret == -1 && errno(*__errno_location ()) != EINPROGRESS115) {
2154 if (!priv->connect_finish_log) {
2155 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("connection to %s failed (%s)", this
->peerinfo.identifier, strerror ((*__errno_location ())));
} while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this->
peerinfo.identifier, strerror ((*__errno_location ()))); } while
(0)
2156 "connection to %s failed (%s)",do { do { if (0) printf ("connection to %s failed (%s)", this
->peerinfo.identifier, strerror ((*__errno_location ())));
} while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this->
peerinfo.identifier, strerror ((*__errno_location ()))); } while
(0)
2157 this->peerinfo.identifier,do { do { if (0) printf ("connection to %s failed (%s)", this
->peerinfo.identifier, strerror ((*__errno_location ())));
} while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this->
peerinfo.identifier, strerror ((*__errno_location ()))); } while
(0)
2158 strerror (errno))do { do { if (0) printf ("connection to %s failed (%s)", this
->peerinfo.identifier, strerror ((*__errno_location ())));
} while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this->
peerinfo.identifier, strerror ((*__errno_location ()))); } while
(0)
;
2159 priv->connect_finish_log = 1;
2160 }
2161 __socket_disconnect (this);
2162 notify_rpc = 1;
2163 event = RPC_TRANSPORT_DISCONNECT;
2164 goto unlock;
2165 }
2166
2167 if (ret == 0) {
2168 notify_rpc = 1;
2169
2170 this->myinfo.sockaddr_len =
2171 sizeof (this->myinfo.sockaddr);
2172
2173 ret = getsockname (priv->sock,
2174 SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr),
2175 &this->myinfo.sockaddr_len);
2176 if (ret == -1) {
2177 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("getsockname on (%d) failed (%s)", priv
->sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2179, GF_LOG_WARNING
, "getsockname on (%d) failed (%s)", priv->sock, strerror (
(*__errno_location ()))); } while (0)
2178 "getsockname on (%d) failed (%s)",do { do { if (0) printf ("getsockname on (%d) failed (%s)", priv
->sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2179, GF_LOG_WARNING
, "getsockname on (%d) failed (%s)", priv->sock, strerror (
(*__errno_location ()))); } while (0)
2179 priv->sock, strerror (errno))do { do { if (0) printf ("getsockname on (%d) failed (%s)", priv
->sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2179, GF_LOG_WARNING
, "getsockname on (%d) failed (%s)", priv->sock, strerror (
(*__errno_location ()))); } while (0)
;
2180 __socket_disconnect (this);
2181 event = GF_EVENT_POLLERR;
2182 goto unlock;
2183 }
2184
2185 priv->connected = 1;
2186 priv->connect_finish_log = 0;
2187 event = RPC_TRANSPORT_CONNECT;
2188 }
2189 }
2190unlock:
2191 pthread_mutex_unlock (&priv->lock);
2192
2193 if (notify_rpc) {
2194 rpc_transport_notify (this, event, this);
2195 }
2196out:
2197 return 0;
2198}
2199
2200
2201/* reads rpc_requests during pollin */
2202int
2203socket_event_handler (int fd, int idx, void *data,
2204 int poll_in, int poll_out, int poll_err)
2205{
2206 rpc_transport_t *this = NULL((void*)0);
2207 socket_private_t *priv = NULL((void*)0);
2208 int ret = -1;
2209
2210 this = data;
2211 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 2211, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
2212 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 2212, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
2213 GF_VALIDATE_OR_GOTO ("socket", this->xl, out)do { if (!this->xl) { (*__errno_location ()) = 22; do { do
{ if (0) printf ("invalid argument: " "this->xl"); } while
(0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2213
, GF_LOG_ERROR, "invalid argument: " "this->xl"); } while (
0); goto out; } } while (0)
;
2214
2215 THIS(*__glusterfs_this_location()) = this->xl;
2216 priv = this->private;
2217
2218 pthread_mutex_lock (&priv->lock);
2219 {
2220 priv->idx = idx;
2221 }
2222 pthread_mutex_unlock (&priv->lock);
2223
2224 ret = (priv->connected == 1) ? 0 : socket_connect_finish(this);
2225
2226 if (!ret && poll_out) {
2227 ret = socket_event_poll_out (this);
2228 }
2229
2230 if (!ret && poll_in) {
2231 ret = socket_event_poll_in (this);
2232 }
2233
2234 if ((ret < 0) || poll_err) {
2235 /* Logging has happened already in earlier cases */
2236 gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),do { do { if (0) printf ("disconnecting now"); } while (0); _gf_log
("transport", "socket.c", __FUNCTION__, 2237, ((ret >= 0)
? GF_LOG_INFO : GF_LOG_DEBUG), "disconnecting now"); } while
(0)
2237 "disconnecting now")do { do { if (0) printf ("disconnecting now"); } while (0); _gf_log
("transport", "socket.c", __FUNCTION__, 2237, ((ret >= 0)
? GF_LOG_INFO : GF_LOG_DEBUG), "disconnecting now"); } while
(0)
;
2238 socket_event_poll_err (this);
2239 rpc_transport_unref (this);
2240 }
2241
2242out:
2243 return ret;
2244}
2245
2246
2247void *
2248socket_poller (void *ctx)
2249{
2250 rpc_transport_t *this = ctx;
2251 socket_private_t *priv = this->private;
2252 struct pollfd pfd[2] = {{0,},};
2253 gf_boolean_t to_write = _gf_false;
2254 int ret = 0;
2255
2256 if (priv->use_ssl) {
2257 if (ssl_setup_connection(this,priv->connected) < 0) {
2258 gf_log (this->name,GF_LOG_ERROR, "%s setup failed",do { do { if (0) printf ("%s setup failed", priv->connected
? "server" : "client"); } while (0); _gf_log (this->name,
"socket.c", __FUNCTION__, 2259, GF_LOG_ERROR, "%s setup failed"
, priv->connected ? "server" : "client"); } while (0)
2259 priv->connected ? "server" : "client")do { do { if (0) printf ("%s setup failed", priv->connected
? "server" : "client"); } while (0); _gf_log (this->name,
"socket.c", __FUNCTION__, 2259, GF_LOG_ERROR, "%s setup failed"
, priv->connected ? "server" : "client"); } while (0)
;
2260 goto err;
2261 }
2262 }
2263
2264 if (!priv->bio) {
2265 ret = __socket_nonblock (priv->sock);
2266 if (ret == -1) {
2267 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2269, GF_LOG_WARNING, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2268 "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2269, GF_LOG_WARNING, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2269 priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2269, GF_LOG_WARNING, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
;
2270 goto err;
2271 }
2272 }
2273
2274 if (priv->connected == 0) {
2275 THIS(*__glusterfs_this_location()) = this->xl;
2276 ret = socket_connect_finish (this);
2277 if (ret != 0) {
2278 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("asynchronous socket_connect_finish failed"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2279, GF_LOG_WARNING, "asynchronous socket_connect_finish failed"
); } while (0)
2279 "asynchronous socket_connect_finish failed")do { do { if (0) printf ("asynchronous socket_connect_finish failed"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2279, GF_LOG_WARNING, "asynchronous socket_connect_finish failed"
); } while (0)
;
2280 }
2281 }
2282
2283 ret = rpc_transport_notify (this->listener,
2284 RPC_TRANSPORT_ACCEPT, this);
2285 if (ret != 0) {
2286 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("asynchronous rpc_transport_notify failed"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2287, GF_LOG_WARNING, "asynchronous rpc_transport_notify failed"
); } while (0)
2287 "asynchronous rpc_transport_notify failed")do { do { if (0) printf ("asynchronous rpc_transport_notify failed"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2287, GF_LOG_WARNING, "asynchronous rpc_transport_notify failed"
); } while (0)
;
2288 }
2289
2290 for (;;) {
2291 pthread_mutex_lock(&priv->lock);
2292 to_write = !list_empty(&priv->ioq);
2293 pthread_mutex_unlock(&priv->lock);
2294 pfd[0].fd = priv->pipe[0];
2295 pfd[0].events = POLL_MASK_ERROR(0x008 | 0x010 | 0x020);
2296 pfd[0].revents = 0;
2297 pfd[1].fd = priv->sock;
2298 pfd[1].events = POLL_MASK_INPUT(0x001 | 0x002) | POLL_MASK_ERROR(0x008 | 0x010 | 0x020);
2299 pfd[1].revents = 0;
2300 if (to_write) {
2301 pfd[1].events |= POLL_MASK_OUTPUT(0x004);
2302 }
2303 else {
2304 pfd[0].events |= POLL_MASK_INPUT(0x001 | 0x002);
2305 }
2306 if (poll(pfd,2,-1) < 0) {
2307 gf_log(this->name,GF_LOG_ERROR,"poll failed")do { do { if (0) printf ("poll failed"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2307, GF_LOG_ERROR
,"poll failed"); } while (0)
;
2308 break;
2309 }
2310 if (pfd[0].revents & POLL_MASK_ERROR(0x008 | 0x010 | 0x020)) {
2311 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("poll error on pipe"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2312, GF_LOG_ERROR
, "poll error on pipe"); } while (0)
2312 "poll error on pipe")do { do { if (0) printf ("poll error on pipe"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2312, GF_LOG_ERROR
, "poll error on pipe"); } while (0)
;
2313 break;
2314 }
2315 /* Only glusterd actually seems to need this. */
2316 THIS(*__glusterfs_this_location()) = this->xl;
2317 if (pfd[1].revents & POLL_MASK_INPUT(0x001 | 0x002)) {
2318 ret = socket_event_poll_in(this);
2319 if (ret >= 0) {
2320 /* Suppress errors while making progress. */
2321 pfd[1].revents &= ~POLL_MASK_ERROR(0x008 | 0x010 | 0x020);
2322 }
2323 else if (errno(*__errno_location ()) == ENOTCONN107) {
2324 ret = 0;
2325 }
2326 }
2327 else if (pfd[1].revents & POLL_MASK_OUTPUT(0x004)) {
2328 ret = socket_event_poll_out(this);
2329 if (ret >= 0) {
2330 /* Suppress errors while making progress. */
2331 pfd[1].revents &= ~POLL_MASK_ERROR(0x008 | 0x010 | 0x020);
2332 }
2333 else if (errno(*__errno_location ()) == ENOTCONN107) {
2334 ret = 0;
2335 }
2336 }
2337 else {
2338 /*
2339 * This usually means that we left poll() because
2340 * somebody pushed a byte onto our pipe. That wakeup
2341 * is why the pipe is there, but once awake we can do
2342 * all the checking we need on the next iteration.
2343 */
2344 ret = 0;
2345 }
2346 if (pfd[1].revents & POLL_MASK_ERROR(0x008 | 0x010 | 0x020)) {
2347 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("poll error on socket"); } while (0)
; _gf_log (this->name, "socket.c", __FUNCTION__, 2348, GF_LOG_ERROR
, "poll error on socket"); } while (0)
2348 "poll error on socket")do { do { if (0) printf ("poll error on socket"); } while (0)
; _gf_log (this->name, "socket.c", __FUNCTION__, 2348, GF_LOG_ERROR
, "poll error on socket"); } while (0)
;
2349 break;
2350 }
2351 if (ret < 0) {
2352 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("error in polling loop"); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 2353, GF_LOG_ERROR
, "error in polling loop"); } while (0)
2353 "error in polling loop")do { do { if (0) printf ("error in polling loop"); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 2353, GF_LOG_ERROR
, "error in polling loop"); } while (0)
;
2354 break;
2355 }
2356 }
2357
2358err:
2359 /* All (and only) I/O errors should come here. */
2360 pthread_mutex_lock(&priv->lock);
2361 if (priv->ot_state == OT_ALIVE) {
2362 /*
2363 * We have to do this if we're here because of an error we
2364 * detected ourselves, but need to avoid a recursive call
2365 * if our death is the result of an external disconnect.
2366 */
2367 __socket_shutdown(this);
2368 close(priv->sock);
2369 priv->sock = -1;
2370 }
2371 if (priv->ssl_ssl) {
2372 /*
2373 * We're always responsible for this part, but only actually
2374 * have to do it if we got far enough for ssl_ssl to be valid
2375 * (i.e. errors in ssl_setup_connection don't count).
2376 */
2377 ssl_teardown_connection(priv);
2378 }
2379 priv->ot_state = OT_IDLE;
2380 /*
2381 * We expect there to be only one waiter, but if there do happen to
2382 * be multiple it's probably better to unblock them than to let them
2383 * hang. If there are none, this is a harmless no-op.
2384 */
2385 pthread_cond_broadcast(&priv->ot_event);
2386 pthread_mutex_unlock(&priv->lock);
2387 rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this);
2388 rpc_transport_unref (this);
2389 return NULL((void*)0);
2390}
2391
2392
2393void
2394socket_spawn (rpc_transport_t *this)
2395{
2396 socket_private_t *priv = this->private;
2397
2398 if (priv->ot_state == OT_ALIVE) {
2399 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("refusing to start redundant poller"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2400, GF_LOG_WARNING, "refusing to start redundant poller")
; } while (0)
2400 "refusing to start redundant poller")do { do { if (0) printf ("refusing to start redundant poller"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2400, GF_LOG_WARNING, "refusing to start redundant poller")
; } while (0)
;
2401 return;
2402 }
2403
2404 priv->ot_state = OT_ALIVE;
2405
2406 if (pthread_create(&priv->thread,NULL((void*)0),socket_poller,this) != 0) {
2407 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("could not create poll thread"); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 2408,
GF_LOG_ERROR, "could not create poll thread"); } while (0)
2408 "could not create poll thread")do { do { if (0) printf ("could not create poll thread"); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 2408,
GF_LOG_ERROR, "could not create poll thread"); } while (0)
;
2409 }
2410}
2411
2412int
2413socket_server_event_handler (int fd, int idx, void *data,
2414 int poll_in, int poll_out, int poll_err)
2415{
2416 rpc_transport_t *this = NULL((void*)0);
2417 socket_private_t *priv = NULL((void*)0);
2418 int ret = 0;
2419 int new_sock = -1;
2420 rpc_transport_t *new_trans = NULL((void*)0);
2421 struct sockaddr_storage new_sockaddr = {0, };
2422 socklen_t addrlen = sizeof (new_sockaddr);
2423 socket_private_t *new_priv = NULL((void*)0);
2424 glusterfs_ctx_t *ctx = NULL((void*)0);
2425
2426 this = data;
2427 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 2427, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
2428 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 2428, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
2429 GF_VALIDATE_OR_GOTO ("socket", this->xl, out)do { if (!this->xl) { (*__errno_location ()) = 22; do { do
{ if (0) printf ("invalid argument: " "this->xl"); } while
(0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2429
, GF_LOG_ERROR, "invalid argument: " "this->xl"); } while (
0); goto out; } } while (0)
;
2430
2431 THIS(*__glusterfs_this_location()) = this->xl;
2432 priv = this->private;
2433 ctx = this->ctx;
2434
2435 pthread_mutex_lock (&priv->lock);
2436 {
2437 priv->idx = idx;
2438
2439 if (poll_in) {
2440 new_sock = accept (priv->sock, SA (&new_sockaddr)((struct sockaddr *)&new_sockaddr),
2441 &addrlen);
2442
2443 if (new_sock == -1) {
2444 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("accept on %d failed (%s)", priv->
sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2446, GF_LOG_WARNING
, "accept on %d failed (%s)", priv->sock, strerror ((*__errno_location
()))); } while (0)
2445 "accept on %d failed (%s)",do { do { if (0) printf ("accept on %d failed (%s)", priv->
sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2446, GF_LOG_WARNING
, "accept on %d failed (%s)", priv->sock, strerror ((*__errno_location
()))); } while (0)
2446 priv->sock, strerror (errno))do { do { if (0) printf ("accept on %d failed (%s)", priv->
sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2446, GF_LOG_WARNING
, "accept on %d failed (%s)", priv->sock, strerror ((*__errno_location
()))); } while (0)
;
2447 goto unlock;
2448 }
2449
2450 if (priv->nodelay) {
2451 ret = __socket_nodelay (new_sock);
2452 if (ret == -1) {
2453 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for "
"NODELAY (%s)", strerror ((*__errno_location ()))); } while (
0)
2454 "setsockopt() failed for "do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for "
"NODELAY (%s)", strerror ((*__errno_location ()))); } while (
0)
2455 "NODELAY (%s)",do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for "
"NODELAY (%s)", strerror ((*__errno_location ()))); } while (
0)
2456 strerror (errno))do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for "
"NODELAY (%s)", strerror ((*__errno_location ()))); } while (
0)
;
2457 }
2458 }
2459
2460 if (priv->keepalive &&
2461 new_sockaddr.ss_family != AF_UNIX1) {
2462 ret = __socket_keepalive (new_sock,
2463 new_sockaddr.ss_family,
2464 priv->keepaliveintvl,
2465 priv->keepaliveidle);
2466 if (ret == -1)
2467 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("Failed to set keep-alive: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2469, GF_LOG_WARNING, "Failed to set keep-alive: %s"
, strerror ((*__errno_location ()))); } while (0)
2468 "Failed to set keep-alive: %s",do { do { if (0) printf ("Failed to set keep-alive: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2469, GF_LOG_WARNING, "Failed to set keep-alive: %s"
, strerror ((*__errno_location ()))); } while (0)
2469 strerror (errno))do { do { if (0) printf ("Failed to set keep-alive: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2469, GF_LOG_WARNING, "Failed to set keep-alive: %s"
, strerror ((*__errno_location ()))); } while (0)
;
2470 }
2471
2472 new_trans = GF_CALLOC (1, sizeof (*new_trans),__gf_calloc (1, sizeof (*new_trans), gf_common_mt_rpc_trans_t
)
2473 gf_common_mt_rpc_trans_t)__gf_calloc (1, sizeof (*new_trans), gf_common_mt_rpc_trans_t
)
;
2474 if (!new_trans)
2475 goto unlock;
2476
2477 ret = pthread_mutex_init(&new_trans->lock, NULL((void*)0));
2478 if (ret == -1) {
2479 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("pthread_mutex_init() failed: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2481, GF_LOG_WARNING, "pthread_mutex_init() failed: %s"
, strerror ((*__errno_location ()))); } while (0)
2480 "pthread_mutex_init() failed: %s",do { do { if (0) printf ("pthread_mutex_init() failed: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2481, GF_LOG_WARNING, "pthread_mutex_init() failed: %s"
, strerror ((*__errno_location ()))); } while (0)
2481 strerror (errno))do { do { if (0) printf ("pthread_mutex_init() failed: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2481, GF_LOG_WARNING, "pthread_mutex_init() failed: %s"
, strerror ((*__errno_location ()))); } while (0)
;
2482 close (new_sock);
2483 goto unlock;
2484 }
2485
2486 new_trans->name = gf_strdup (this->name);
2487
2488 memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr,
2489 addrlen);
2490 new_trans->peerinfo.sockaddr_len = addrlen;
2491
2492 new_trans->myinfo.sockaddr_len =
2493 sizeof (new_trans->myinfo.sockaddr);
2494
2495 ret = getsockname (new_sock,
2496 SA (&new_trans->myinfo.sockaddr)((struct sockaddr *)&new_trans->myinfo.sockaddr),
2497 &new_trans->myinfo.sockaddr_len);
2498 if (ret == -1) {
2499 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("getsockname on %d failed (%s)", new_sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2501, GF_LOG_WARNING, "getsockname on %d failed (%s)"
, new_sock, strerror ((*__errno_location ()))); } while (0)
2500 "getsockname on %d failed (%s)",do { do { if (0) printf ("getsockname on %d failed (%s)", new_sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2501, GF_LOG_WARNING, "getsockname on %d failed (%s)"
, new_sock, strerror ((*__errno_location ()))); } while (0)
2501 new_sock, strerror (errno))do { do { if (0) printf ("getsockname on %d failed (%s)", new_sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2501, GF_LOG_WARNING, "getsockname on %d failed (%s)"
, new_sock, strerror ((*__errno_location ()))); } while (0)
;
2502 close (new_sock);
2503 goto unlock;
2504 }
2505
2506 get_transport_identifiers (new_trans);
2507 ret = socket_init(new_trans);
2508 if (ret != 0) {
2509 close(new_sock);
2510 goto unlock;
2511 }
2512 new_trans->ops = this->ops;
2513 new_trans->init = this->init;
2514 new_trans->fini = this->fini;
2515 new_trans->ctx = ctx;
2516 new_trans->xl = this->xl;
2517 new_trans->mydata = this->mydata;
2518 new_trans->notify = this->notify;
2519 new_trans->listener = this;
2520 new_priv = new_trans->private;
2521
2522 new_priv->use_ssl = priv->use_ssl;
2523 new_priv->sock = new_sock;
2524 new_priv->own_thread = priv->own_thread;
2525
2526 new_priv->ssl_ctx = priv->ssl_ctx;
2527 if (priv->use_ssl && !priv->own_thread) {
2528 if (ssl_setup_connection(new_trans,1) < 0) {
2529 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("server setup failed"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 2530, GF_LOG_ERROR
, "server setup failed"); } while (0)
2530 "server setup failed")do { do { if (0) printf ("server setup failed"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 2530, GF_LOG_ERROR
, "server setup failed"); } while (0)
;
2531 close(new_sock);
2532 goto unlock;
2533 }
2534 }
2535
2536 if (!priv->bio && !priv->own_thread) {
2537 ret = __socket_nonblock (new_sock);
2538
2539 if (ret == -1) {
2540 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("NBIO on %d failed (%s)", new_sock, strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2542, GF_LOG_WARNING, "NBIO on %d failed (%s)"
, new_sock, strerror ((*__errno_location ()))); } while (0)
2541 "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", new_sock, strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2542, GF_LOG_WARNING, "NBIO on %d failed (%s)"
, new_sock, strerror ((*__errno_location ()))); } while (0)
2542 new_sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", new_sock, strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2542, GF_LOG_WARNING, "NBIO on %d failed (%s)"
, new_sock, strerror ((*__errno_location ()))); } while (0)
;
2543
2544 close (new_sock);
2545 goto unlock;
2546 }
2547 }
2548
2549 pthread_mutex_lock (&new_priv->lock);
2550 {
2551 /*
2552 * In the own_thread case, this is used to
2553 * indicate that we're initializing a server
2554 * connection.
2555 */
2556 new_priv->connected = 1;
2557 rpc_transport_ref (new_trans);
2558
2559 if (new_priv->own_thread) {
2560 if (pipe(new_priv->pipe) < 0) {
2561 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not create pipe"); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 2562, GF_LOG_ERROR
, "could not create pipe"); } while (0)
2562 "could not create pipe")do { do { if (0) printf ("could not create pipe"); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 2562, GF_LOG_ERROR
, "could not create pipe"); } while (0)
;
2563 }
2564 socket_spawn(new_trans);
2565 }
2566 else {
2567 new_priv->idx =
2568 event_register (ctx->event_pool,
2569 new_sock,
2570 socket_event_handler,
2571 new_trans,
2572 1, 0);
2573 if (new_priv->idx == -1)
2574 ret = -1;
2575 }
2576
2577 }
2578 pthread_mutex_unlock (&new_priv->lock);
2579 if (ret == -1) {
2580 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("failed to register the socket with event"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2581, GF_LOG_WARNING, "failed to register the socket with event"
); } while (0)
2581 "failed to register the socket with event")do { do { if (0) printf ("failed to register the socket with event"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2581, GF_LOG_WARNING, "failed to register the socket with event"
); } while (0)
;
2582 goto unlock;
2583 }
2584
2585 if (!priv->own_thread) {
2586 ret = rpc_transport_notify (this,
2587 RPC_TRANSPORT_ACCEPT, new_trans);
2588 }
2589 }
2590 }
2591unlock:
2592 pthread_mutex_unlock (&priv->lock);
2593
2594out:
2595 return ret;
2596}
2597
2598
2599int
2600socket_disconnect (rpc_transport_t *this)
2601{
2602 socket_private_t *priv = NULL((void*)0);
2603 int ret = -1;
2604
2605 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 2605, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
2606 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 2606, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
2607
2608 priv = this->private;
2609
2610 pthread_mutex_lock (&priv->lock);
2611 {
2612 ret = __socket_disconnect (this);
2613 }
2614 pthread_mutex_unlock (&priv->lock);
2615
2616out:
2617 return ret;
2618}
2619
2620
2621int
2622socket_connect (rpc_transport_t *this, int port)
2623{
2624 int ret = -1;
2625 int sock = -1;
2626 socket_private_t *priv = NULL((void*)0);
2627 socklen_t sockaddr_len = 0;
2628 glusterfs_ctx_t *ctx = NULL((void*)0);
2629 sa_family_t sa_family = {0, };
2630 char *local_addr = NULL((void*)0);
2631 union gf_sock_union sock_union;
2632 struct sockaddr_in *addr = NULL((void*)0);
2633
2634 GF_VALIDATE_OR_GOTO ("socket", this, err)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 2634, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto err; } } while (0)
;
2635 GF_VALIDATE_OR_GOTO ("socket", this->private, err)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 2635, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto err; } } while (0)
;
2636
2637 priv = this->private;
2638 ctx = this->ctx;
2639
2640 if (!priv) {
2641 gf_log_callingfn (this->name, GF_LOG_WARNING,do { do { if (0) printf ("connect() called on uninitialized transport"
); } while (0); _gf_log_callingfn (this->name, "socket.c",
__FUNCTION__, 2642, GF_LOG_WARNING, "connect() called on uninitialized transport"
); } while (0)
2642 "connect() called on uninitialized transport")do { do { if (0) printf ("connect() called on uninitialized transport"
); } while (0); _gf_log_callingfn (this->name, "socket.c",
__FUNCTION__, 2642, GF_LOG_WARNING, "connect() called on uninitialized transport"
); } while (0)
;
2643 goto err;
2644 }
2645
2646 pthread_mutex_lock (&priv->lock);
2647 {
2648 sock = priv->sock;
2649 }
2650 pthread_mutex_unlock (&priv->lock);
2651
2652 if (sock != -1) {
2653 gf_log_callingfn (this->name, GF_LOG_TRACE,do { do { if (0) printf ("connect () called on transport already connected"
); } while (0); _gf_log_callingfn (this->name, "socket.c",
__FUNCTION__, 2654, GF_LOG_TRACE, "connect () called on transport already connected"
); } while (0)
2654 "connect () called on transport already connected")do { do { if (0) printf ("connect () called on transport already connected"
); } while (0); _gf_log_callingfn (this->name, "socket.c",
__FUNCTION__, 2654, GF_LOG_TRACE, "connect () called on transport already connected"
); } while (0)
;
2655 errno(*__errno_location ()) = EINPROGRESS115;
2656 ret = -1;
2657 goto err;
2658 }
2659
2660 ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
2661 &sockaddr_len, &sa_family);
2662 if (ret == -1) {
2663 /* logged inside client_get_remote_sockaddr */
2664 goto err;
2665 }
2666
2667 if (port > 0) {
2668 sock_union.sin.sin_port = htons (port);
2669 }
2670 if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT24007) {
2671 if (priv->use_ssl) {
2672 gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("disabling SSL for portmapper connection"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2673, GF_LOG_DEBUG, "disabling SSL for portmapper connection"
); } while (0)
2673 "disabling SSL for portmapper connection")do { do { if (0) printf ("disabling SSL for portmapper connection"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2673, GF_LOG_DEBUG, "disabling SSL for portmapper connection"
); } while (0)
;
2674 priv->use_ssl = _gf_false;
2675 }
2676 }
2677 else {
2678 if (priv->ssl_enabled && !priv->use_ssl) {
2679 gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("re-enabling SSL for I/O connection"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2680, GF_LOG_DEBUG, "re-enabling SSL for I/O connection"); }
while (0)
2680 "re-enabling SSL for I/O connection")do { do { if (0) printf ("re-enabling SSL for I/O connection"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2680, GF_LOG_DEBUG, "re-enabling SSL for I/O connection"); }
while (0)
;
2681 priv->use_ssl = _gf_true;
2682 }
2683 }
2684 pthread_mutex_lock (&priv->lock);
2685 {
2686 if (priv->sock != -1) {
2687 gf_log (this->name, GF_LOG_TRACE,do { do { if (0) printf ("connect() -- already connected"); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
2688, GF_LOG_TRACE, "connect() -- already connected"); } while
(0)
2688 "connect() -- already connected")do { do { if (0) printf ("connect() -- already connected"); }
while (0); _gf_log (this->name, "socket.c", __FUNCTION__,
2688, GF_LOG_TRACE, "connect() -- already connected"); } while
(0)
;
2689 goto unlock;
2690 }
2691
2692 memcpy (&this->peerinfo.sockaddr, &sock_union.storage,
2693 sockaddr_len);
2694 this->peerinfo.sockaddr_len = sockaddr_len;
2695
2696 priv->sock = socket (sa_family, SOCK_STREAMSOCK_STREAM, 0);
2697 if (priv->sock == -1) {
2698 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("socket creation failed (%s)", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2700, GF_LOG_ERROR, "socket creation failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
2699 "socket creation failed (%s)",do { do { if (0) printf ("socket creation failed (%s)", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2700, GF_LOG_ERROR, "socket creation failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
2700 strerror (errno))do { do { if (0) printf ("socket creation failed (%s)", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2700, GF_LOG_ERROR, "socket creation failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
;
2701 goto unlock;
2702 }
2703
2704 /* Cant help if setting socket options fails. We can continue
2705 * working nonetheless.
2706 */
2707 if (priv->windowsize != 0) {
2708 if (setsockopt (priv->sock, SOL_SOCKET1, SO_RCVBUF8,
2709 &priv->windowsize,
2710 sizeof (priv->windowsize)) < 0) {
2711 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2712 "setting receive window "do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2713 "size failed: %d: %d: %s",do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2714 priv->sock, priv->windowsize,do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2715 strerror (errno))do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
;
2716 }
2717
2718 if (setsockopt (priv->sock, SOL_SOCKET1, SO_SNDBUF7,
2719 &priv->windowsize,
2720 sizeof (priv->windowsize)) < 0) {
2721 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2722 "setting send window size "do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2723 "failed: %d: %d: %s",do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2724 priv->sock, priv->windowsize,do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2725 strerror (errno))do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
;
2726 }
2727 }
2728
2729 if (priv->nodelay) {
2730 ret = __socket_nodelay (priv->sock);
2731
2732 if (ret == -1) {
2733 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NODELAY on %d failed (%s)", priv->
sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2735, GF_LOG_ERROR
, "NODELAY on %d failed (%s)", priv->sock, strerror ((*__errno_location
()))); } while (0)
2734 "NODELAY on %d failed (%s)",do { do { if (0) printf ("NODELAY on %d failed (%s)", priv->
sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2735, GF_LOG_ERROR
, "NODELAY on %d failed (%s)", priv->sock, strerror ((*__errno_location
()))); } while (0)
2735 priv->sock, strerror (errno))do { do { if (0) printf ("NODELAY on %d failed (%s)", priv->
sock, strerror ((*__errno_location ()))); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2735, GF_LOG_ERROR
, "NODELAY on %d failed (%s)", priv->sock, strerror ((*__errno_location
()))); } while (0)
;
2736 }
2737 }
2738
2739 if (priv->keepalive && sa_family != AF_UNIX1) {
2740 ret = __socket_keepalive (priv->sock,
2741 sa_family,
2742 priv->keepaliveintvl,
2743 priv->keepaliveidle);
2744 if (ret == -1)
2745 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("Failed to set keep-alive: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2747, GF_LOG_ERROR, "Failed to set keep-alive: %s"
, strerror ((*__errno_location ()))); } while (0)
2746 "Failed to set keep-alive: %s",do { do { if (0) printf ("Failed to set keep-alive: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2747, GF_LOG_ERROR, "Failed to set keep-alive: %s"
, strerror ((*__errno_location ()))); } while (0)
2747 strerror (errno))do { do { if (0) printf ("Failed to set keep-alive: %s", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2747, GF_LOG_ERROR, "Failed to set keep-alive: %s"
, strerror ((*__errno_location ()))); } while (0)
;
2748 }
2749
2750 SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr)->sa_family =
2751 SA (&this->peerinfo.sockaddr)((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family;
2752
2753 /* If a source addr is explicitly specified, use it */
2754 ret = dict_get_str (this->options,
2755 "transport.socket.source-addr",
2756 &local_addr);
2757 if (!ret && SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr)->sa_family == AF_INET2) {
2758 addr = (struct sockaddr_in *)(&this->myinfo.sockaddr);
2759 ret = inet_pton (AF_INET2, local_addr, &(addr->sin_addr.s_addr));
2760 }
2761
2762 ret = client_bind (this, SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr),
2763 &this->myinfo.sockaddr_len, priv->sock);
2764 if (ret == -1) {
2765 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("client bind failed: %s", strerror (
(*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2766, GF_LOG_WARNING, "client bind failed: %s"
, strerror ((*__errno_location ()))); } while (0)
2766 "client bind failed: %s", strerror (errno))do { do { if (0) printf ("client bind failed: %s", strerror (
(*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2766, GF_LOG_WARNING, "client bind failed: %s"
, strerror ((*__errno_location ()))); } while (0)
;
2767 close (priv->sock);
2768 priv->sock = -1;
2769 goto unlock;
2770 }
2771
2772 if (!priv->use_ssl && !priv->bio && !priv->own_thread) {
2773 ret = __socket_nonblock (priv->sock);
2774 if (ret == -1) {
2775 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2777, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2776 "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2777, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2777 priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2777, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
;
2778 close (priv->sock);
2779 priv->sock = -1;
2780 goto unlock;
2781 }
2782 }
2783
2784 ret = connect (priv->sock, SA (&this->peerinfo.sockaddr)((struct sockaddr *)&this->peerinfo.sockaddr),
2785 this->peerinfo.sockaddr_len);
2786
2787 if (ret == -1 && ((errno(*__errno_location ()) != EINPROGRESS115) && (errno(*__errno_location ()) != ENOENT2))) {
2788 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("connection attempt on %s failed, (%s)"
, this->peerinfo.identifier, strerror ((*__errno_location (
)))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2790, GF_LOG_ERROR, "connection attempt on %s failed, (%s)"
, this->peerinfo.identifier, strerror ((*__errno_location (
)))); } while (0)
2789 "connection attempt on %s failed, (%s)",do { do { if (0) printf ("connection attempt on %s failed, (%s)"
, this->peerinfo.identifier, strerror ((*__errno_location (
)))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2790, GF_LOG_ERROR, "connection attempt on %s failed, (%s)"
, this->peerinfo.identifier, strerror ((*__errno_location (
)))); } while (0)
2790 this->peerinfo.identifier, strerror (errno))do { do { if (0) printf ("connection attempt on %s failed, (%s)"
, this->peerinfo.identifier, strerror ((*__errno_location (
)))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2790, GF_LOG_ERROR, "connection attempt on %s failed, (%s)"
, this->peerinfo.identifier, strerror ((*__errno_location (
)))); } while (0)
;
2791 close (priv->sock);
2792 priv->sock = -1;
2793 goto unlock;
2794 }
2795
2796 if (priv->use_ssl && !priv->own_thread) {
2797 ret = ssl_setup_connection(this,0);
2798 if (ret < 0) {
2799 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("client setup failed"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 2800, GF_LOG_ERROR
, "client setup failed"); } while (0)
2800 "client setup failed")do { do { if (0) printf ("client setup failed"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 2800, GF_LOG_ERROR
, "client setup failed"); } while (0)
;
2801 close(priv->sock);
2802 priv->sock = -1;
2803 goto unlock;
2804 }
2805 }
2806
2807 if (!priv->bio && !priv->own_thread) {
2808 ret = __socket_nonblock (priv->sock);
2809
2810 if (ret == -1) {
2811 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2813, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2812 "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2813, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2813 priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2813, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
;
2814 close (priv->sock);
2815 priv->sock = -1;
2816 goto unlock;
2817 }
2818 }
2819
2820 /*
2821 * In the own_thread case, this is used to indicate that we're
2822 * initializing a client connection.
2823 */
2824 priv->connected = 0;
2825 rpc_transport_ref (this);
2826
2827 if (priv->own_thread) {
2828 if (pipe(priv->pipe) < 0) {
2829 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not create pipe"); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 2830, GF_LOG_ERROR
, "could not create pipe"); } while (0)
2830 "could not create pipe")do { do { if (0) printf ("could not create pipe"); } while (0
); _gf_log (this->name, "socket.c", __FUNCTION__, 2830, GF_LOG_ERROR
, "could not create pipe"); } while (0)
;
2831 }
2832
2833 this->listener = this;
2834 socket_spawn(this);
2835 }
2836 else {
2837 priv->idx = event_register (ctx->event_pool, priv->sock,
2838 socket_event_handler,
2839 this, 1, 1);
2840 if (priv->idx == -1) {
2841 gf_log ("", GF_LOG_WARNING,do { do { if (0) printf ("failed to register the event"); } while
(0); _gf_log ("", "socket.c", __FUNCTION__, 2842, GF_LOG_WARNING
, "failed to register the event"); } while (0)
2842 "failed to register the event")do { do { if (0) printf ("failed to register the event"); } while
(0); _gf_log ("", "socket.c", __FUNCTION__, 2842, GF_LOG_WARNING
, "failed to register the event"); } while (0)
;
2843 ret = -1;
2844 }
2845 }
2846 }
2847unlock:
2848 pthread_mutex_unlock (&priv->lock);
2849
2850err:
2851 return ret;
2852}
2853
2854
2855int
2856socket_listen (rpc_transport_t *this)
2857{
2858 socket_private_t * priv = NULL((void*)0);
2859 int ret = -1;
2860 int sock = -1;
2861 struct sockaddr_storage sockaddr;
2862 socklen_t sockaddr_len = 0;
2863 peer_info_t *myinfo = NULL((void*)0);
2864 glusterfs_ctx_t *ctx = NULL((void*)0);
2865 sa_family_t sa_family = {0, };
2866
2867 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 2867, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
2868 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 2868, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
2869
2870 priv = this->private;
2871 myinfo = &this->myinfo;
2872 ctx = this->ctx;
2873
2874 pthread_mutex_lock (&priv->lock);
2875 {
2876 sock = priv->sock;
2877 }
2878 pthread_mutex_unlock (&priv->lock);
2879
2880 if (sock != -1) {
2881 gf_log_callingfn (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("already listening"); } while (0); _gf_log_callingfn
(this->name, "socket.c", __FUNCTION__, 2882, GF_LOG_DEBUG
, "already listening"); } while (0)
2882 "already listening")do { do { if (0) printf ("already listening"); } while (0); _gf_log_callingfn
(this->name, "socket.c", __FUNCTION__, 2882, GF_LOG_DEBUG
, "already listening"); } while (0)
;
2883 return ret;
2884 }
2885
2886 ret = socket_server_get_local_sockaddr (this, SA (&sockaddr)((struct sockaddr *)&sockaddr),
2887 &sockaddr_len, &sa_family);
2888 if (ret == -1) {
2889 return ret;
2890 }
2891
2892 pthread_mutex_lock (&priv->lock);
2893 {
2894 if (priv->sock != -1) {
2895 gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("already listening"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2896, GF_LOG_DEBUG
, "already listening"); } while (0)
2896 "already listening")do { do { if (0) printf ("already listening"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 2896, GF_LOG_DEBUG
, "already listening"); } while (0)
;
2897 goto unlock;
2898 }
2899
2900 memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len);
2901 myinfo->sockaddr_len = sockaddr_len;
2902
2903 priv->sock = socket (sa_family, SOCK_STREAMSOCK_STREAM, 0);
2904
2905 if (priv->sock == -1) {
2906 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("socket creation failed (%s)", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2908, GF_LOG_ERROR, "socket creation failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
2907 "socket creation failed (%s)",do { do { if (0) printf ("socket creation failed (%s)", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2908, GF_LOG_ERROR, "socket creation failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
2908 strerror (errno))do { do { if (0) printf ("socket creation failed (%s)", strerror
((*__errno_location ()))); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 2908, GF_LOG_ERROR, "socket creation failed (%s)"
, strerror ((*__errno_location ()))); } while (0)
;
2909 goto unlock;
2910 }
2911
2912 /* Cant help if setting socket options fails. We can continue
2913 * working nonetheless.
2914 */
2915 if (priv->windowsize != 0) {
2916 if (setsockopt (priv->sock, SOL_SOCKET1, SO_RCVBUF8,
2917 &priv->windowsize,
2918 sizeof (priv->windowsize)) < 0) {
2919 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2920 "setting receive window size "do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2921 "failed: %d: %d: %s", priv->sock,do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2922 priv->windowsize,do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2923 strerror (errno))do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
;
2924 }
2925
2926 if (setsockopt (priv->sock, SOL_SOCKET1, SO_SNDBUF7,
2927 &priv->windowsize,
2928 sizeof (priv->windowsize)) < 0) {
2929 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2930 "setting send window size failed:"do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2931 " %d: %d: %s", priv->sock,do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2932 priv->windowsize,do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
2933 strerror (errno))do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s"
, priv->sock, priv->windowsize, strerror ((*__errno_location
()))); } while (0)
;
2934 }
2935 }
2936
2937 if (priv->nodelay) {
2938 ret = __socket_nodelay (priv->sock);
2939 if (ret == -1) {
2940 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setsockopt() failed for NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2942, GF_LOG_ERROR, "setsockopt() failed for NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0)
2941 "setsockopt() failed for NODELAY (%s)",do { do { if (0) printf ("setsockopt() failed for NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2942, GF_LOG_ERROR, "setsockopt() failed for NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0)
2942 strerror (errno))do { do { if (0) printf ("setsockopt() failed for NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2942, GF_LOG_ERROR, "setsockopt() failed for NODELAY (%s)"
, strerror ((*__errno_location ()))); } while (0)
;
2943 }
2944 }
2945
2946 if (!priv->bio) {
2947 ret = __socket_nonblock (priv->sock);
2948
2949 if (ret == -1) {
2950 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2952, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2951 "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2952, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
2952 priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 2952, GF_LOG_ERROR, "NBIO on %d failed (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0)
;
2953 close (priv->sock);
2954 priv->sock = -1;
2955 goto unlock;
2956 }
2957 }
2958
2959 ret = __socket_server_bind (this);
2960
2961 if (ret == -1) {
2962 /* logged inside __socket_server_bind() */
2963 close (priv->sock);
2964 priv->sock = -1;
2965 goto unlock;
2966 }
2967
2968 if (priv->backlog)
2969 ret = listen (priv->sock, priv->backlog);
2970 else
2971 ret = listen (priv->sock, 10);
2972
2973 if (ret == -1) {
2974 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("could not set socket %d to listen mode (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 2976, GF_LOG_ERROR
, "could not set socket %d to listen mode (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0)
2975 "could not set socket %d to listen mode (%s)",do { do { if (0) printf ("could not set socket %d to listen mode (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 2976, GF_LOG_ERROR
, "could not set socket %d to listen mode (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0)
2976 priv->sock, strerror (errno))do { do { if (0) printf ("could not set socket %d to listen mode (%s)"
, priv->sock, strerror ((*__errno_location ()))); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 2976, GF_LOG_ERROR
, "could not set socket %d to listen mode (%s)", priv->sock
, strerror ((*__errno_location ()))); } while (0)
;
2977 close (priv->sock);
2978 priv->sock = -1;
2979 goto unlock;
2980 }
2981
2982 rpc_transport_ref (this);
2983
2984 priv->idx = event_register (ctx->event_pool, priv->sock,
2985 socket_server_event_handler,
2986 this, 1, 0);
2987
2988 if (priv->idx == -1) {
2989 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("could not register socket %d with events"
, priv->sock); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 2991, GF_LOG_WARNING, "could not register socket %d with events"
, priv->sock); } while (0)
2990 "could not register socket %d with events",do { do { if (0) printf ("could not register socket %d with events"
, priv->sock); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 2991, GF_LOG_WARNING, "could not register socket %d with events"
, priv->sock); } while (0)
2991 priv->sock)do { do { if (0) printf ("could not register socket %d with events"
, priv->sock); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 2991, GF_LOG_WARNING, "could not register socket %d with events"
, priv->sock); } while (0)
;
2992 ret = -1;
2993 close (priv->sock);
2994 priv->sock = -1;
2995 goto unlock;
2996 }
2997 }
2998unlock:
2999 pthread_mutex_unlock (&priv->lock);
3000
3001out:
3002 return ret;
3003}
3004
3005
3006int32_t
3007socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
3008{
3009 socket_private_t *priv = NULL((void*)0);
3010 int ret = -1;
3011 char need_poll_out = 0;
3012 char need_append = 1;
3013 struct ioq *entry = NULL((void*)0);
3014 glusterfs_ctx_t *ctx = NULL((void*)0);
3015 char a_byte = 'j';
3016
3017 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3017, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
3018 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 3018, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
3019
3020 priv = this->private;
3021 ctx = this->ctx;
3022
3023 pthread_mutex_lock (&priv->lock);
3024 {
3025 if (priv->connected != 1) {
3026 if (!priv->submit_log && !priv->connect_finish_log) {
3027 gf_log (this->name, GF_LOG_INFO,do { do { if (0) printf ("not connected (priv->connected = %d)"
, priv->connected); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 3029, GF_LOG_INFO, "not connected (priv->connected = %d)"
, priv->connected); } while (0)
3028 "not connected (priv->connected = %d)",do { do { if (0) printf ("not connected (priv->connected = %d)"
, priv->connected); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 3029, GF_LOG_INFO, "not connected (priv->connected = %d)"
, priv->connected); } while (0)
3029 priv->connected)do { do { if (0) printf ("not connected (priv->connected = %d)"
, priv->connected); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 3029, GF_LOG_INFO, "not connected (priv->connected = %d)"
, priv->connected); } while (0)
;
3030 priv->submit_log = 1;
3031 }
3032 goto unlock;
3033 }
3034
3035 priv->submit_log = 0;
3036 entry = __socket_ioq_new (this, &req->msg);
3037 if (!entry)
3038 goto unlock;
3039
3040 if (list_empty (&priv->ioq)) {
3041 ret = __socket_ioq_churn_entry (this, entry, 1);
3042
3043 if (ret == 0) {
3044 need_append = 0;
3045 }
3046 if (ret > 0) {
3047 need_poll_out = 1;
3048 }
3049 }
3050
3051 if (need_append) {
3052 list_add_tail (&entry->list, &priv->ioq);
3053 if (priv->own_thread) {
3054 /*
3055 * Make sure the polling thread wakes up, by
3056 * writing a byte to represent this entry.
3057 */
3058 if (write(priv->pipe[1],&a_byte,1) < 1) {
3059 gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("write error on pipe"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 3060, GF_LOG_WARNING
, "write error on pipe"); } while (0)
3060 "write error on pipe")do { do { if (0) printf ("write error on pipe"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 3060, GF_LOG_WARNING
, "write error on pipe"); } while (0)
;
3061 }
3062 }
3063 ret = 0;
3064 }
3065 if (!priv->own_thread && need_poll_out) {
3066 /* first entry to wait. continue writing on POLLOUT */
3067 priv->idx = event_select_on (ctx->event_pool,
3068 priv->sock,
3069 priv->idx, -1, 1);
3070 }
3071 }
3072unlock:
3073 pthread_mutex_unlock (&priv->lock);
3074
3075out:
3076 return ret;
3077}
3078
3079
3080int32_t
3081socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
3082{
3083 socket_private_t *priv = NULL((void*)0);
3084 int ret = -1;
3085 char need_poll_out = 0;
3086 char need_append = 1;
3087 struct ioq *entry = NULL((void*)0);
3088 glusterfs_ctx_t *ctx = NULL((void*)0);
3089 char a_byte = 'd';
3090
3091 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3091, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
3092 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 3092, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
3093
3094 priv = this->private;
3095 ctx = this->ctx;
3096
3097 pthread_mutex_lock (&priv->lock);
3098 {
3099 if (priv->connected != 1) {
3100 if (!priv->submit_log && !priv->connect_finish_log) {
3101 gf_log (this->name, GF_LOG_INFO,do { do { if (0) printf ("not connected (priv->connected = %d)"
, priv->connected); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 3103, GF_LOG_INFO, "not connected (priv->connected = %d)"
, priv->connected); } while (0)
3102 "not connected (priv->connected = %d)",do { do { if (0) printf ("not connected (priv->connected = %d)"
, priv->connected); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 3103, GF_LOG_INFO, "not connected (priv->connected = %d)"
, priv->connected); } while (0)
3103 priv->connected)do { do { if (0) printf ("not connected (priv->connected = %d)"
, priv->connected); } while (0); _gf_log (this->name, "socket.c"
, __FUNCTION__, 3103, GF_LOG_INFO, "not connected (priv->connected = %d)"
, priv->connected); } while (0)
;
3104 priv->submit_log = 1;
3105 }
3106 goto unlock;
3107 }
3108
3109 priv->submit_log = 0;
3110 entry = __socket_ioq_new (this, &reply->msg);
3111 if (!entry)
3112 goto unlock;
3113
3114 if (list_empty (&priv->ioq)) {
3115 ret = __socket_ioq_churn_entry (this, entry, 1);
3116
3117 if (ret == 0) {
3118 need_append = 0;
3119 }
3120 if (ret > 0) {
3121 need_poll_out = 1;
3122 }
3123 }
3124
3125 if (need_append) {
3126 list_add_tail (&entry->list, &priv->ioq);
3127 if (priv->own_thread) {
3128 /*
3129 * Make sure the polling thread wakes up, by
3130 * writing a byte to represent this entry.
3131 */
3132 if (write(priv->pipe[1],&a_byte,1) < 1) {
3133 gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("write error on pipe"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 3134, GF_LOG_WARNING
, "write error on pipe"); } while (0)
3134 "write error on pipe")do { do { if (0) printf ("write error on pipe"); } while (0);
_gf_log (this->name, "socket.c", __FUNCTION__, 3134, GF_LOG_WARNING
, "write error on pipe"); } while (0)
;
3135 }
3136 }
3137 ret = 0;
3138 }
3139 if (!priv->own_thread && need_poll_out) {
3140 /* first entry to wait. continue writing on POLLOUT */
3141 priv->idx = event_select_on (ctx->event_pool,
3142 priv->sock,
3143 priv->idx, -1, 1);
3144 }
3145 }
3146unlock:
3147 pthread_mutex_unlock (&priv->lock);
3148
3149out:
3150 return ret;
3151}
3152
3153
3154int32_t
3155socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen)
3156{
3157 int32_t ret = -1;
3158
3159 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3159, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
3160 GF_VALIDATE_OR_GOTO ("socket", hostname, out)do { if (!hostname) { (*__errno_location ()) = 22; do { do { if
(0) printf ("invalid argument: " "hostname"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3160, GF_LOG_ERROR, "invalid argument: "
"hostname"); } while (0); goto out; } } while (0)
;
3161
3162 if (hostlen < (strlen (this->peerinfo.identifier) + 1)) {
3163 goto out;
3164 }
3165
3166 strcpy (hostname, this->peerinfo.identifier);
3167 ret = 0;
3168out:
3169 return ret;
3170}
3171
3172
3173int32_t
3174socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
3175 struct sockaddr_storage *sa, socklen_t salen)
3176{
3177 int32_t ret = -1;
3178
3179 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3179, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
3180 GF_VALIDATE_OR_GOTO ("socket", sa, out)do { if (!sa) { (*__errno_location ()) = 22; do { do { if (0)
printf ("invalid argument: " "sa"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3180, GF_LOG_ERROR, "invalid argument: "
"sa"); } while (0); goto out; } } while (0)
;
3181
3182 *sa = this->peerinfo.sockaddr;
3183
3184 if (peeraddr != NULL((void*)0)) {
3185 ret = socket_getpeername (this, peeraddr, addrlen);
Value stored to 'ret' is never read
3186 }
3187 ret = 0;
3188
3189out:
3190 return ret;
3191}
3192
3193
3194int32_t
3195socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen)
3196{
3197 int32_t ret = -1;
3198
3199 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3199, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
3200 GF_VALIDATE_OR_GOTO ("socket", hostname, out)do { if (!hostname) { (*__errno_location ()) = 22; do { do { if
(0) printf ("invalid argument: " "hostname"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3200, GF_LOG_ERROR, "invalid argument: "
"hostname"); } while (0); goto out; } } while (0)
;
3201
3202 if (hostlen < (strlen (this->myinfo.identifier) + 1)) {
3203 goto out;
3204 }
3205
3206 strcpy (hostname, this->myinfo.identifier);
3207 ret = 0;
3208out:
3209 return ret;
3210}
3211
3212
3213int32_t
3214socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen,
3215 struct sockaddr_storage *sa, socklen_t salen)
3216{
3217 int32_t ret = 0;
3218
3219 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3219, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
3220 GF_VALIDATE_OR_GOTO ("socket", sa, out)do { if (!sa) { (*__errno_location ()) = 22; do { do { if (0)
printf ("invalid argument: " "sa"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3220, GF_LOG_ERROR, "invalid argument: "
"sa"); } while (0); goto out; } } while (0)
;
3221
3222 *sa = this->myinfo.sockaddr;
3223
3224 if (myaddr != NULL((void*)0)) {
3225 ret = socket_getmyname (this, myaddr, addrlen);
3226 }
3227
3228out:
3229 return ret;
3230}
3231
3232
3233struct rpc_transport_ops tops = {
3234 .listen = socket_listen,
3235 .connect = socket_connect,
3236 .disconnect = socket_disconnect,
3237 .submit_request = socket_submit_request,
3238 .submit_reply = socket_submit_reply,
3239 .get_peername = socket_getpeername,
3240 .get_peeraddr = socket_getpeeraddr,
3241 .get_myname = socket_getmyname,
3242 .get_myaddr = socket_getmyaddr,
3243};
3244
3245int
3246reconfigure (rpc_transport_t *this, dict_t *options)
3247{
3248 socket_private_t *priv = NULL((void*)0);
3249 gf_boolean_t tmp_bool = _gf_false;
3250 char *optstr = NULL((void*)0);
3251 int ret = 0;
3252 uint64_t windowsize = 0;
3253
3254 GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if (
0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn
("socket", "socket.c", __FUNCTION__, 3254, GF_LOG_ERROR, "invalid argument: "
"this"); } while (0); goto out; } } while (0)
;
3255 GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do
{ do { if (0) printf ("invalid argument: " "this->private"
); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__
, 3255, GF_LOG_ERROR, "invalid argument: " "this->private"
); } while (0); goto out; } } while (0)
;
3256
3257 if (!this || !this->private) {
3258 ret =-1;
3259 goto out;
3260 }
3261
3262 priv = this->private;
3263
3264 if (dict_get_str (this->options, "transport.socket.keepalive",
3265 &optstr) == 0) {
3266 if (gf_string2boolean (optstr, &tmp_bool) == -1) {
3267 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'transport.socket.keepalive' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3269, GF_LOG_ERROR
, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"
); } while (0)
3268 "'transport.socket.keepalive' takes only "do { do { if (0) printf ("'transport.socket.keepalive' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3269, GF_LOG_ERROR
, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"
); } while (0)
3269 "boolean options, not taking any action")do { do { if (0) printf ("'transport.socket.keepalive' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3269, GF_LOG_ERROR
, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"
); } while (0)
;
3270 priv->keepalive = 1;
3271 ret = -1;
3272 goto out;
3273 }
3274 gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive")do { do { if (0) printf ("Reconfigured transport.socket.keepalive"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3274, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive"
); } while (0)
;
3275
3276 priv->keepalive = tmp_bool;
3277 }
3278 else
3279 priv->keepalive = 1;
3280
3281 optstr = NULL((void*)0);
3282 if (dict_get_str (this->options, "tcp-window-size",
3283 &optstr) == 0) {
3284 if (gf_string2bytesize (optstr, &windowsize) != 0) {
3285 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid number format: %s", optstr)
; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3286, GF_LOG_ERROR, "invalid number format: %s", optstr); }
while (0)
3286 "invalid number format: %s", optstr)do { do { if (0) printf ("invalid number format: %s", optstr)
; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3286, GF_LOG_ERROR, "invalid number format: %s", optstr); }
while (0)
;
3287 goto out;
3288 }
3289 }
3290
3291 priv->windowsize = (int)windowsize;
3292
3293 ret = 0;
3294out:
3295 return ret;
3296
3297}
3298
3299int
3300socket_init (rpc_transport_t *this)
3301{
3302 socket_private_t *priv = NULL((void*)0);
3303 gf_boolean_t tmp_bool = 0;
3304 uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE(0);
3305 char *optstr = NULL((void*)0);
3306 uint32_t keepalive = 0;
3307 uint32_t backlog = 0;
3308 int session_id = 0;
3309
3310 if (this->private) {
3311 gf_log_callingfn (this->name, GF_LOG_ERROR,do { do { if (0) printf ("double init attempted"); } while (0
); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__
, 3312, GF_LOG_ERROR, "double init attempted"); } while (0)
3312 "double init attempted")do { do { if (0) printf ("double init attempted"); } while (0
); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__
, 3312, GF_LOG_ERROR, "double init attempted"); } while (0)
;
3313 return -1;
3314 }
3315
3316 priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t)__gf_calloc (1, sizeof (*priv), gf_common_mt_socket_private_t
)
;
3317 if (!priv) {
3318 return -1;
3319 }
3320 memset(priv,0,sizeof(*priv));
3321
3322 pthread_mutex_init (&priv->lock, NULL((void*)0));
3323
3324 priv->sock = -1;
3325 priv->idx = -1;
3326 priv->connected = -1;
3327 priv->nodelay = 1;
3328 priv->bio = 0;
3329 priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE(0);
3330 INIT_LIST_HEAD (&priv->ioq)do { (&priv->ioq)->next = (&priv->ioq)->prev
= &priv->ioq; } while (0)
;
3331
3332 /* All the below section needs 'this->options' to be present */
3333 if (!this->options)
3334 goto out;
3335
3336 if (dict_get (this->options, "non-blocking-io")) {
3337 optstr = data_to_str (dict_get (this->options,
3338 "non-blocking-io"));
3339
3340 if (gf_string2boolean (optstr, &tmp_bool) == -1) {
3341 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'non-blocking-io' takes only boolean options,"
" not taking any action"); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 3343, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options,"
" not taking any action"); } while (0)
3342 "'non-blocking-io' takes only boolean options,"do { do { if (0) printf ("'non-blocking-io' takes only boolean options,"
" not taking any action"); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 3343, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options,"
" not taking any action"); } while (0)
3343 " not taking any action")do { do { if (0) printf ("'non-blocking-io' takes only boolean options,"
" not taking any action"); } while (0); _gf_log (this->name
, "socket.c", __FUNCTION__, 3343, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options,"
" not taking any action"); } while (0)
;
3344 tmp_bool = 1;
3345 }
3346
3347 if (!tmp_bool) {
3348 priv->bio = 1;
3349 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("disabling non-blocking IO"); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 3350,
GF_LOG_WARNING, "disabling non-blocking IO"); } while (0)
3350 "disabling non-blocking IO")do { do { if (0) printf ("disabling non-blocking IO"); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 3350,
GF_LOG_WARNING, "disabling non-blocking IO"); } while (0)
;
3351 }
3352 }
3353
3354 optstr = NULL((void*)0);
3355
3356 // By default, we enable NODELAY
3357 if (dict_get (this->options, "transport.socket.nodelay")) {
3358 optstr = data_to_str (dict_get (this->options,
3359 "transport.socket.nodelay"));
3360
3361 if (gf_string2boolean (optstr, &tmp_bool) == -1) {
3362 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'transport.socket.nodelay' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3364, GF_LOG_ERROR
, "'transport.socket.nodelay' takes only " "boolean options, not taking any action"
); } while (0)
3363 "'transport.socket.nodelay' takes only "do { do { if (0) printf ("'transport.socket.nodelay' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3364, GF_LOG_ERROR
, "'transport.socket.nodelay' takes only " "boolean options, not taking any action"
); } while (0)
3364 "boolean options, not taking any action")do { do { if (0) printf ("'transport.socket.nodelay' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3364, GF_LOG_ERROR
, "'transport.socket.nodelay' takes only " "boolean options, not taking any action"
); } while (0)
;
3365 tmp_bool = 1;
3366 }
3367 if (!tmp_bool) {
3368 priv->nodelay = 0;
3369 gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("disabling nodelay"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3370, GF_LOG_DEBUG
, "disabling nodelay"); } while (0)
3370 "disabling nodelay")do { do { if (0) printf ("disabling nodelay"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3370, GF_LOG_DEBUG
, "disabling nodelay"); } while (0)
;
3371 }
3372 }
3373
3374 optstr = NULL((void*)0);
3375 if (dict_get_str (this->options, "tcp-window-size",
3376 &optstr) == 0) {
3377 if (gf_string2bytesize (optstr, &windowsize) != 0) {
3378 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid number format: %s", optstr)
; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3379, GF_LOG_ERROR, "invalid number format: %s", optstr); }
while (0)
3379 "invalid number format: %s", optstr)do { do { if (0) printf ("invalid number format: %s", optstr)
; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3379, GF_LOG_ERROR, "invalid number format: %s", optstr); }
while (0)
;
3380 return -1;
3381 }
3382 }
3383
3384 priv->windowsize = (int)windowsize;
3385
3386 optstr = NULL((void*)0);
3387 /* Enable Keep-alive by default. */
3388 priv->keepalive = 1;
3389 priv->keepaliveintvl = 2;
3390 priv->keepaliveidle = 20;
3391 if (dict_get_str (this->options, "transport.socket.keepalive",
3392 &optstr) == 0) {
3393 if (gf_string2boolean (optstr, &tmp_bool) == -1) {
3394 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'transport.socket.keepalive' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3396, GF_LOG_ERROR
, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"
); } while (0)
3395 "'transport.socket.keepalive' takes only "do { do { if (0) printf ("'transport.socket.keepalive' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3396, GF_LOG_ERROR
, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"
); } while (0)
3396 "boolean options, not taking any action")do { do { if (0) printf ("'transport.socket.keepalive' takes only "
"boolean options, not taking any action"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3396, GF_LOG_ERROR
, "'transport.socket.keepalive' takes only " "boolean options, not taking any action"
); } while (0)
;
3397 tmp_bool = 1;
3398 }
3399
3400 if (!tmp_bool)
3401 priv->keepalive = 0;
3402 }
3403
3404 if (dict_get_uint32 (this->options,
3405 "transport.socket.keepalive-interval",
3406 &keepalive) == 0) {
3407 priv->keepaliveintvl = keepalive;
3408 }
3409
3410 if (dict_get_uint32 (this->options,
3411 "transport.socket.keepalive-time",
3412 &keepalive) == 0) {
3413 priv->keepaliveidle = keepalive;
3414 }
3415
3416 if (dict_get_uint32 (this->options,
3417 "transport.socket.listen-backlog",
3418 &backlog) == 0) {
3419 priv->backlog = backlog;
3420 }
3421
3422 optstr = NULL((void*)0);
3423
3424 /* Check if socket read failures are to be logged */
3425 priv->read_fail_log = 1;
3426 if (dict_get (this->options, "transport.socket.read-fail-log")) {
3427 optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log"));
3428 if (gf_string2boolean (optstr, &tmp_bool) == -1) {
3429 gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("'transport.socket.read-fail-log' takes only "
"boolean options; logging socket read fails"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3431, GF_LOG_WARNING
, "'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails"
); } while (0)
3430 "'transport.socket.read-fail-log' takes only "do { do { if (0) printf ("'transport.socket.read-fail-log' takes only "
"boolean options; logging socket read fails"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3431, GF_LOG_WARNING
, "'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails"
); } while (0)
3431 "boolean options; logging socket read fails")do { do { if (0) printf ("'transport.socket.read-fail-log' takes only "
"boolean options; logging socket read fails"); } while (0); _gf_log
(this->name, "socket.c", __FUNCTION__, 3431, GF_LOG_WARNING
, "'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails"
); } while (0)
;
3432 }
3433 else if (tmp_bool == _gf_false) {
3434 priv->read_fail_log = 0;
3435 }
3436 }
3437
3438 priv->windowsize = (int)windowsize;
3439
3440 priv->ssl_enabled = _gf_false;
3441 if (dict_get_str(this->options,SSL_ENABLED_OPT"transport.socket.ssl-enabled",&optstr) == 0) {
3442 if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) {
3443 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid value given for ssl-enabled boolean"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3444, GF_LOG_ERROR, "invalid value given for ssl-enabled boolean"
); } while (0)
3444 "invalid value given for ssl-enabled boolean")do { do { if (0) printf ("invalid value given for ssl-enabled boolean"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3444, GF_LOG_ERROR, "invalid value given for ssl-enabled boolean"
); } while (0)
;
3445 }
3446 }
3447
3448 priv->ssl_own_cert = DEFAULT_CERT_PATH"/etc/ssl/glusterfs.pem";
3449 if (dict_get_str(this->options,SSL_OWN_CERT_OPT"transport.socket.ssl-own-cert",&optstr) == 0) {
3450 if (!priv->ssl_enabled) {
3451 gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-own-cert", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3453, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-own-cert"
, "transport.socket.ssl-enabled"); } while (0)
3452 "%s specified without %s (ignored)",do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-own-cert", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3453, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-own-cert"
, "transport.socket.ssl-enabled"); } while (0)
3453 SSL_OWN_CERT_OPT, SSL_ENABLED_OPT)do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-own-cert", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3453, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-own-cert"
, "transport.socket.ssl-enabled"); } while (0)
;
3454 }
3455 priv->ssl_own_cert = optstr;
3456 }
3457 priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert);
3458
3459 priv->ssl_private_key = DEFAULT_KEY_PATH"/etc/ssl/glusterfs.key";
3460 if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT"transport.socket.ssl-private-key",&optstr) == 0) {
3461 if (!priv->ssl_enabled) {
3462 gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-private-key", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3464, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-private-key"
, "transport.socket.ssl-enabled"); } while (0)
3463 "%s specified without %s (ignored)",do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-private-key", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3464, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-private-key"
, "transport.socket.ssl-enabled"); } while (0)
3464 SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT)do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-private-key", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3464, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-private-key"
, "transport.socket.ssl-enabled"); } while (0)
;
3465 }
3466 priv->ssl_private_key = optstr;
3467 }
3468 priv->ssl_private_key = gf_strdup(priv->ssl_private_key);
3469
3470 priv->ssl_ca_list = DEFAULT_CA_PATH"/etc/ssl/glusterfs.ca";
3471 if (dict_get_str(this->options,SSL_CA_LIST_OPT"transport.socket.ssl-ca-list",&optstr) == 0) {
3472 if (!priv->ssl_enabled) {
3473 gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-ca-list", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3475, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-ca-list"
, "transport.socket.ssl-enabled"); } while (0)
3474 "%s specified without %s (ignored)",do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-ca-list", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3475, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-ca-list"
, "transport.socket.ssl-enabled"); } while (0)
3475 SSL_CA_LIST_OPT, SSL_ENABLED_OPT)do { do { if (0) printf ("%s specified without %s (ignored)",
"transport.socket.ssl-ca-list", "transport.socket.ssl-enabled"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3475, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-ca-list"
, "transport.socket.ssl-enabled"); } while (0)
;
3476 }
3477 priv->ssl_ca_list = optstr;
3478 }
3479 priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list);
3480
3481 gf_log(this->name,GF_LOG_INFO,"SSL support is %s",do { do { if (0) printf ("SSL support is %s", priv->ssl_enabled
? "ENABLED" : "NOT enabled"); } while (0); _gf_log (this->
name, "socket.c", __FUNCTION__, 3482, GF_LOG_INFO,"SSL support is %s"
, priv->ssl_enabled ? "ENABLED" : "NOT enabled"); } while (
0)
3482 priv->ssl_enabled ? "ENABLED" : "NOT enabled")do { do { if (0) printf ("SSL support is %s", priv->ssl_enabled
? "ENABLED" : "NOT enabled"); } while (0); _gf_log (this->
name, "socket.c", __FUNCTION__, 3482, GF_LOG_INFO,"SSL support is %s"
, priv->ssl_enabled ? "ENABLED" : "NOT enabled"); } while (
0)
;
3483 /*
3484 * This might get overridden temporarily in socket_connect (q.v.)
3485 * if we're using the glusterd portmapper.
3486 */
3487 priv->use_ssl = priv->ssl_enabled;
3488
3489 priv->own_thread = priv->use_ssl;
3490 if (dict_get_str(this->options,OWN_THREAD_OPT"transport.socket.own-thread",&optstr) == 0) {
3491 if (gf_string2boolean (optstr, &priv->own_thread) != 0) {
3492 gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid value given for own-thread boolean"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3493, GF_LOG_ERROR, "invalid value given for own-thread boolean"
); } while (0)
3493 "invalid value given for own-thread boolean")do { do { if (0) printf ("invalid value given for own-thread boolean"
); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3493, GF_LOG_ERROR, "invalid value given for own-thread boolean"
); } while (0)
;
3494 }
3495 }
3496 gf_log(this->name,GF_LOG_INFO,"using %s polling thread",do { do { if (0) printf ("using %s polling thread", priv->
own_thread ? "private" : "system"); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 3497, GF_LOG_INFO,"using %s polling thread"
, priv->own_thread ? "private" : "system"); } while (0)
3497 priv->own_thread ? "private" : "system")do { do { if (0) printf ("using %s polling thread", priv->
own_thread ? "private" : "system"); } while (0); _gf_log (this
->name, "socket.c", __FUNCTION__, 3497, GF_LOG_INFO,"using %s polling thread"
, priv->own_thread ? "private" : "system"); } while (0)
;
3498
3499 if (priv->use_ssl) {
3500 SSL_library_init();
3501 SSL_load_error_strings();
3502 priv->ssl_meth = (SSL_METHOD *)TLSv1_method();
3503 priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth);
3504
3505 if (SSL_CTX_set_cipher_list(priv->ssl_ctx,
3506 "HIGH:-SSLv2") == 0) {
3507 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("failed to find any valid ciphers");
} while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3508, GF_LOG_ERROR, "failed to find any valid ciphers"); } while
(0)
3508 "failed to find any valid ciphers")do { do { if (0) printf ("failed to find any valid ciphers");
} while (0); _gf_log (this->name, "socket.c", __FUNCTION__
, 3508, GF_LOG_ERROR, "failed to find any valid ciphers"); } while
(0)
;
3509 goto err;
3510 }
3511
3512 if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx,
3513 priv->ssl_own_cert)) {
3514 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not load our cert"); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 3515, GF_LOG_ERROR
, "could not load our cert"); } while (0)
3515 "could not load our cert")do { do { if (0) printf ("could not load our cert"); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 3515, GF_LOG_ERROR
, "could not load our cert"); } while (0)
;
3516 goto err;
3517 }
3518
3519 if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx,
3520 priv->ssl_private_key,
3521 SSL_FILETYPE_PEM1)) {
3522 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not load private key"); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 3523,
GF_LOG_ERROR, "could not load private key"); } while (0)
3523 "could not load private key")do { do { if (0) printf ("could not load private key"); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 3523,
GF_LOG_ERROR, "could not load private key"); } while (0)
;
3524 goto err;
3525 }
3526
3527 if (!SSL_CTX_load_verify_locations(priv->ssl_ctx,
3528 priv->ssl_ca_list,0)) {
3529 gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not load CA list"); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 3530, GF_LOG_ERROR
, "could not load CA list"); } while (0)
3530 "could not load CA list")do { do { if (0) printf ("could not load CA list"); } while (
0); _gf_log (this->name, "socket.c", __FUNCTION__, 3530, GF_LOG_ERROR
, "could not load CA list"); } while (0)
;
3531 goto err;
3532 }
3533
3534#if (OPENSSL_VERSION_NUMBER0x10000003 < 0x00905100L)
3535 SSL_CTX_set_verify_depth(ctx,1);
3536#endif
3537
3538 priv->ssl_session_id = ++session_id;
3539 SSL_CTX_set_session_id_context(priv->ssl_ctx,
3540 (void *)&priv->ssl_session_id,
3541 sizeof(priv->ssl_session_id));
3542
3543 SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER0x01,0);
3544 }
3545
3546 if (priv->own_thread) {
3547 priv->ot_state = OT_IDLE;
3548 pthread_cond_init (&priv->ot_event, NULL((void*)0));
3549 }
3550
3551out:
3552 this->private = priv;
3553 return 0;
3554
3555err:
3556 if (priv->ssl_own_cert) {
3557 GF_FREE(priv->ssl_own_cert)__gf_free (priv->ssl_own_cert);
3558 }
3559 if (priv->ssl_private_key) {
3560 GF_FREE(priv->ssl_private_key)__gf_free (priv->ssl_private_key);
3561 }
3562 if (priv->ssl_ca_list) {
3563 GF_FREE(priv->ssl_ca_list)__gf_free (priv->ssl_ca_list);
3564 }
3565 GF_FREE(priv)__gf_free (priv);
3566 return -1;
3567}
3568
3569
3570void
3571fini (rpc_transport_t *this)
3572{
3573 socket_private_t *priv = NULL((void*)0);
3574
3575 if (!this)
3576 return;
3577
3578 priv = this->private;
3579 if (priv) {
3580 if (priv->sock != -1) {
3581 pthread_mutex_lock (&priv->lock);
3582 {
3583 __socket_ioq_flush (this);
3584 __socket_reset (this);
3585 }
3586 pthread_mutex_unlock (&priv->lock);
3587 }
3588 gf_log (this->name, GF_LOG_TRACE,do { do { if (0) printf ("transport %p destroyed", this); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 3589,
GF_LOG_TRACE, "transport %p destroyed", this); } while (0)
3589 "transport %p destroyed", this)do { do { if (0) printf ("transport %p destroyed", this); } while
(0); _gf_log (this->name, "socket.c", __FUNCTION__, 3589,
GF_LOG_TRACE, "transport %p destroyed", this); } while (0)
;
3590
3591 pthread_mutex_destroy (&priv->lock);
3592 if (priv->ssl_private_key) {
3593 GF_FREE(priv->ssl_private_key)__gf_free (priv->ssl_private_key);
3594 }
3595 if (priv->ssl_own_cert) {
3596 GF_FREE(priv->ssl_own_cert)__gf_free (priv->ssl_own_cert);
3597 }
3598 if (priv->ssl_ca_list) {
3599 GF_FREE(priv->ssl_ca_list)__gf_free (priv->ssl_ca_list);
3600 }
3601 GF_FREE (priv)__gf_free (priv);
3602 }
3603
3604 this->private = NULL((void*)0);
3605}
3606
3607
3608int32_t
3609init (rpc_transport_t *this)
3610{
3611 int ret = -1;
3612
3613 ret = socket_init (this);
3614
3615 if (ret == -1) {
3616 gf_log (this->name, GF_LOG_DEBUG, "socket_init() failed")do { do { if (0) printf ("socket_init() failed"); } while (0)
; _gf_log (this->name, "socket.c", __FUNCTION__, 3616, GF_LOG_DEBUG
, "socket_init() failed"); } while (0)
;
3617 }
3618
3619 return ret;
3620}
3621
3622struct volume_options options[] = {
3623 { .key = {"remote-port",
3624 "transport.remote-port",
3625 "transport.socket.remote-port"},
3626 .type = GF_OPTION_TYPE_INT
3627 },
3628 { .key = {"transport.socket.listen-port", "listen-port"},
3629 .type = GF_OPTION_TYPE_INT
3630 },
3631 { .key = {"transport.socket.bind-address", "bind-address" },
3632 .type = GF_OPTION_TYPE_INTERNET_ADDRESS
3633 },
3634 { .key = {"transport.socket.connect-path", "connect-path"},
3635 .type = GF_OPTION_TYPE_ANY
3636 },
3637 { .key = {"transport.socket.bind-path", "bind-path"},
3638 .type = GF_OPTION_TYPE_ANY
3639 },
3640 { .key = {"transport.socket.listen-path", "listen-path"},
3641 .type = GF_OPTION_TYPE_ANY
3642 },
3643 { .key = { "transport.address-family",
3644 "address-family" },
3645 .value = {"inet", "inet6", "unix", "inet-sdp" },
3646 .type = GF_OPTION_TYPE_STR
3647 },
3648
3649 { .key = {"non-blocking-io"},
3650 .type = GF_OPTION_TYPE_BOOL
3651 },
3652 { .key = {"tcp-window-size"},
3653 .type = GF_OPTION_TYPE_SIZET,
3654 .min = GF_MIN_SOCKET_WINDOW_SIZE(0),
3655 .max = GF_MAX_SOCKET_WINDOW_SIZE(1 * 1048576ULL)
3656 },
3657 { .key = {"transport.socket.nodelay"},
3658 .type = GF_OPTION_TYPE_BOOL
3659 },
3660 { .key = {"transport.socket.lowlat"},
3661 .type = GF_OPTION_TYPE_BOOL
3662 },
3663 { .key = {"transport.socket.keepalive"},
3664 .type = GF_OPTION_TYPE_BOOL
3665 },
3666 { .key = {"transport.socket.keepalive-interval"},
3667 .type = GF_OPTION_TYPE_INT
3668 },
3669 { .key = {"transport.socket.keepalive-time"},
3670 .type = GF_OPTION_TYPE_INT
3671 },
3672 { .key = {"transport.socket.listen-backlog"},
3673 .type = GF_OPTION_TYPE_INT
3674 },
3675 { .key = {"transport.socket.read-fail-log"},
3676 .type = GF_OPTION_TYPE_BOOL
3677 },
3678 { .key = {SSL_ENABLED_OPT"transport.socket.ssl-enabled"},
3679 .type = GF_OPTION_TYPE_BOOL
3680 },
3681 { .key = {SSL_OWN_CERT_OPT"transport.socket.ssl-own-cert"},
3682 .type = GF_OPTION_TYPE_STR
3683 },
3684 { .key = {SSL_PRIVATE_KEY_OPT"transport.socket.ssl-private-key"},
3685 .type = GF_OPTION_TYPE_STR
3686 },
3687 { .key = {SSL_CA_LIST_OPT"transport.socket.ssl-ca-list"},
3688 .type = GF_OPTION_TYPE_STR
3689 },
3690 { .key = {OWN_THREAD_OPT"transport.socket.own-thread"},
3691 .type = GF_OPTION_TYPE_BOOL
3692 },
3693 { .key = {NULL((void*)0)} }
3694};