File: | rpc/rpc-transport/socket/src/socket.c |
Location: | line 2759, column 25 |
Description: | Value stored to 'ret' is never read |
1 | /* |
2 | Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> |
3 | This file is part of GlusterFS. |
4 | |
5 | This file is licensed to you under your choice of the GNU Lesser |
6 | General Public License, version 3 or any later version (LGPLv3 or |
7 | later), or the GNU General Public License, version 2 (GPLv2), in all |
8 | cases as published by the Free Software Foundation. |
9 | */ |
10 | |
11 | |
12 | #ifndef _CONFIG_H |
13 | #define _CONFIG_H |
14 | #include "config.h" |
15 | #endif |
16 | |
17 | #include "socket.h" |
18 | #include "name.h" |
19 | #include "dict.h" |
20 | #include "rpc-transport.h" |
21 | #include "logging.h" |
22 | #include "xlator.h" |
23 | #include "byte-order.h" |
24 | #include "common-utils.h" |
25 | #include "compat-errno.h" |
26 | |
27 | |
28 | /* ugly #includes below */ |
29 | #include "protocol-common.h" |
30 | #include "glusterfs3-xdr.h" |
31 | #include "xdr-nfs3.h" |
32 | #include "rpcsvc.h" |
33 | |
34 | #include <fcntl.h> |
35 | #include <errno(*__errno_location ()).h> |
36 | #include <netinet/tcp.h> |
37 | #include <rpc/xdr.h> |
38 | #include <sys/ioctl.h> |
39 | #define GF_LOG_ERRNO(errno)(((*__errno_location ()) == 107) ? GF_LOG_DEBUG : GF_LOG_ERROR ) ((errno(*__errno_location ()) == ENOTCONN107) ? GF_LOG_DEBUG : GF_LOG_ERROR) |
40 | #define SA(ptr)((struct sockaddr *)ptr) ((struct sockaddr *)ptr) |
41 | |
42 | #define SSL_ENABLED_OPT"transport.socket.ssl-enabled" "transport.socket.ssl-enabled" |
43 | #define SSL_OWN_CERT_OPT"transport.socket.ssl-own-cert" "transport.socket.ssl-own-cert" |
44 | #define SSL_PRIVATE_KEY_OPT"transport.socket.ssl-private-key" "transport.socket.ssl-private-key" |
45 | #define SSL_CA_LIST_OPT"transport.socket.ssl-ca-list" "transport.socket.ssl-ca-list" |
46 | #define OWN_THREAD_OPT"transport.socket.own-thread" "transport.socket.own-thread" |
47 | |
48 | /* TBD: do automake substitutions etc. (ick) to set these. */ |
49 | #if !defined(DEFAULT_CERT_PATH"/etc/ssl/glusterfs.pem") |
50 | #define DEFAULT_CERT_PATH"/etc/ssl/glusterfs.pem" "/etc/ssl/glusterfs.pem" |
51 | #endif |
52 | #if !defined(DEFAULT_KEY_PATH"/etc/ssl/glusterfs.key") |
53 | #define DEFAULT_KEY_PATH"/etc/ssl/glusterfs.key" "/etc/ssl/glusterfs.key" |
54 | #endif |
55 | #if !defined(DEFAULT_CA_PATH"/etc/ssl/glusterfs.ca") |
56 | #define DEFAULT_CA_PATH"/etc/ssl/glusterfs.ca" "/etc/ssl/glusterfs.ca" |
57 | #endif |
58 | |
59 | #define POLL_MASK_INPUT(0x001 | 0x002) (POLLIN0x001 | POLLPRI0x002) |
60 | #define POLL_MASK_OUTPUT(0x004) (POLLOUT0x004) |
61 | #define POLL_MASK_ERROR(0x008 | 0x010 | 0x020) (POLLERR0x008 | POLLHUP0x010 | POLLNVAL0x020) |
62 | |
63 | typedef int SSL_unary_func (SSL *); |
64 | typedef int SSL_trinary_func (SSL *, void *, int); |
65 | |
66 | #define __socket_proto_reset_pending(priv)do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0) do { \ |
67 | struct gf_sock_incoming_frag *frag; \ |
68 | frag = &priv->incoming.frag; \ |
69 | \ |
70 | memset (&frag->vector, 0, sizeof (frag->vector)); \ |
71 | frag->pending_vector = &frag->vector; \ |
72 | frag->pending_vector->iov_base = frag->fragcurrent; \ |
73 | priv->incoming.pending_vector = frag->pending_vector; \ |
74 | } while (0) |
75 | |
76 | |
77 | #define __socket_proto_update_pending(priv)do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector-> iov_len == 0) { remaining = (((uint32_t)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->pending_vector ->iov_len = (remaining > frag->remaining_size) ? frag ->remaining_size : remaining; frag->remaining_size -= frag ->pending_vector->iov_len; } } while (0) \ |
78 | do { \ |
79 | uint32_t remaining; \ |
80 | struct gf_sock_incoming_frag *frag; \ |
81 | frag = &priv->incoming.frag; \ |
82 | if (frag->pending_vector->iov_len == 0) { \ |
83 | remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr)((uint32_t)(priv->incoming.fraghdr & 0x7fffffffU)) \ |
84 | - frag->bytes_read); \ |
85 | \ |
86 | frag->pending_vector->iov_len = \ |
87 | (remaining > frag->remaining_size) \ |
88 | ? frag->remaining_size : remaining; \ |
89 | \ |
90 | frag->remaining_size -= \ |
91 | frag->pending_vector->iov_len; \ |
92 | } \ |
93 | } while (0) |
94 | |
95 | #define __socket_proto_update_priv_after_read(priv, ret, bytes_read){ struct gf_sock_incoming_frag *frag; frag = &priv->incoming .frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); } do { do { if (0) printf ("partial read on non-blocking socket" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 95, GF_LOG_TRACE, "partial read on non-blocking socket"); } while (0); break; } } \ |
96 | { \ |
97 | struct gf_sock_incoming_frag *frag; \ |
98 | frag = &priv->incoming.frag; \ |
99 | \ |
100 | frag->fragcurrent += bytes_read; \ |
101 | frag->bytes_read += bytes_read; \ |
102 | \ |
103 | if ((ret > 0) || (frag->remaining_size != 0)) { \ |
104 | if (frag->remaining_size != 0 && ret == 0) { \ |
105 | __socket_proto_reset_pending (priv)do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); \ |
106 | } \ |
107 | \ |
108 | gf_log (this->name, GF_LOG_TRACE, \do { do { if (0) printf ("partial read on non-blocking socket" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 109, GF_LOG_TRACE, "partial read on non-blocking socket"); } while (0) |
109 | "partial read on non-blocking socket")do { do { if (0) printf ("partial read on non-blocking socket" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 109, GF_LOG_TRACE, "partial read on non-blocking socket"); } while (0); \ |
110 | \ |
111 | break; \ |
112 | } \ |
113 | } |
114 | |
115 | #define __socket_proto_init_pending(priv,size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > size ) ? size : remaining; frag->remaining_size = (size - frag-> pending_vector->iov_len); } while(0) \ |
116 | do { \ |
117 | uint32_t remaining = 0; \ |
118 | struct gf_sock_incoming_frag *frag; \ |
119 | frag = &priv->incoming.frag; \ |
120 | \ |
121 | remaining = (RPC_FRAGSIZE (priv->incoming.fraghdr)((uint32_t)(priv->incoming.fraghdr & 0x7fffffffU)) \ |
122 | - frag->bytes_read); \ |
123 | \ |
124 | __socket_proto_reset_pending (priv)do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); \ |
125 | \ |
126 | frag->pending_vector->iov_len = \ |
127 | (remaining > size) ? size : remaining; \ |
128 | \ |
129 | frag->remaining_size = (size - frag->pending_vector->iov_len); \ |
130 | \ |
131 | } while(0) |
132 | |
133 | |
134 | /* This will be used in a switch case and breaks from the switch case if all |
135 | * the pending data is not read. |
136 | */ |
137 | #define __socket_proto_read(priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 137, GF_LOG_WARNING , "reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); break; } { struct gf_sock_incoming_frag *frag; frag = & priv->incoming.frag; frag->fragcurrent += bytes_read; frag ->bytes_read += bytes_read; if ((ret > 0) || (frag-> remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = & priv->incoming.frag; memset (&frag->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag-> vector; frag->pending_vector->iov_base = frag->fragcurrent ; priv->incoming.pending_vector = frag->pending_vector; } while (0); } do { do { if (0) printf ("partial read on non-blocking socket" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 137, GF_LOG_TRACE, "partial read on non-blocking socket"); } while (0); break; } }; } \ |
138 | { \ |
139 | size_t bytes_read = 0; \ |
140 | struct gf_sock_incoming *in; \ |
141 | in = &priv->incoming; \ |
142 | \ |
143 | __socket_proto_update_pending (priv)do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector-> iov_len == 0) { remaining = (((uint32_t)(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read); frag->pending_vector ->iov_len = (remaining > frag->remaining_size) ? frag ->remaining_size : remaining; frag->remaining_size -= frag ->pending_vector->iov_len; } } while (0); \ |
144 | \ |
145 | ret = __socket_readv (this, \ |
146 | in->pending_vector, 1, \ |
147 | &in->pending_vector, \ |
148 | &in->pending_count, \ |
149 | &bytes_read); \ |
150 | if (ret == -1) { \ |
151 | if (priv->read_fail_log) \ |
152 | gf_log (this->name, GF_LOG_WARNING, \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
153 | "reading from socket failed." \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
154 | "Error (%s), peer (%s)", \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
155 | strerror (errno), \do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
156 | this->peerinfo.identifier)do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 156, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); \ |
157 | break; \ |
158 | } \ |
159 | __socket_proto_update_priv_after_read (priv, ret, bytes_read){ struct gf_sock_incoming_frag *frag; frag = &priv->incoming .frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); } do { do { if (0) printf ("partial read on non-blocking socket" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 159, GF_LOG_TRACE, "partial read on non-blocking socket"); } while (0); break; } }; \ |
160 | } |
161 | |
162 | int socket_init (rpc_transport_t *this); |
163 | |
164 | void |
165 | ssl_dump_error_stack (const char *caller) |
166 | { |
167 | unsigned long errnum = 0; |
168 | char errbuf[120] = {0,}; |
169 | |
170 | /* OpenSSL docs explicitly give 120 as the error-string length. */ |
171 | |
172 | while ((errnum = ERR_get_error())) { |
173 | ERR_error_string(errnum,errbuf); |
174 | gf_log(caller,GF_LOG_ERROR," %s",errbuf)do { do { if (0) printf (" %s",errbuf); } while (0); _gf_log (caller, "socket.c", __FUNCTION__, 174, GF_LOG_ERROR," %s", errbuf); } while (0); |
175 | } |
176 | } |
177 | |
178 | int |
179 | ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func) |
180 | { |
181 | int r = (-1); |
182 | struct pollfd pfd = {-1,}; |
183 | socket_private_t *priv = NULL((void*)0); |
184 | |
185 | GF_VALIDATE_OR_GOTO(this->name,this->private,out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 185, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
186 | priv = this->private; |
187 | |
188 | for (;;) { |
189 | if (buf) { |
190 | if (priv->connected == -1) { |
191 | /* |
192 | * Fields in the SSL structure (especially |
193 | * the BIO pointers) are not valid at this |
194 | * point, so we'll segfault if we pass them |
195 | * to SSL_read/SSL_write. |
196 | */ |
197 | gf_log(this->name,GF_LOG_INFO,do { do { if (0) printf ("lost connection in %s", __func__); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 198, GF_LOG_INFO, "lost connection in %s", __func__); } while (0) |
198 | "lost connection in %s", __func__)do { do { if (0) printf ("lost connection in %s", __func__); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 198, GF_LOG_INFO, "lost connection in %s", __func__); } while (0); |
199 | break; |
200 | } |
201 | r = func(priv->ssl_ssl,buf,len); |
202 | } |
203 | else { |
204 | /* |
205 | * We actually need these functions to get to |
206 | * priv->connected == 1. |
207 | */ |
208 | r = ((SSL_unary_func *)func)(priv->ssl_ssl); |
209 | } |
210 | switch (SSL_get_error(priv->ssl_ssl,r)) { |
211 | case SSL_ERROR_NONE0: |
212 | return r; |
213 | case SSL_ERROR_WANT_READ2: |
214 | pfd.fd = priv->sock; |
215 | pfd.events = POLLIN0x001; |
216 | if (poll(&pfd,1,-1) < 0) { |
217 | gf_log(this->name,GF_LOG_ERROR,"poll error %d",do { do { if (0) printf ("poll error %d", (*__errno_location ( ))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 218, GF_LOG_ERROR,"poll error %d", (*__errno_location ())); } while (0) |
218 | errno)do { do { if (0) printf ("poll error %d", (*__errno_location ( ))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 218, GF_LOG_ERROR,"poll error %d", (*__errno_location ())); } while (0); |
219 | } |
220 | break; |
221 | case SSL_ERROR_WANT_WRITE3: |
222 | pfd.fd = priv->sock; |
223 | pfd.events = POLLOUT0x004; |
224 | if (poll(&pfd,1,-1) < 0) { |
225 | gf_log(this->name,GF_LOG_ERROR,"poll error %d",do { do { if (0) printf ("poll error %d", (*__errno_location ( ))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 226, GF_LOG_ERROR,"poll error %d", (*__errno_location ())); } while (0) |
226 | errno)do { do { if (0) printf ("poll error %d", (*__errno_location ( ))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 226, GF_LOG_ERROR,"poll error %d", (*__errno_location ())); } while (0); |
227 | } |
228 | break; |
229 | case SSL_ERROR_SYSCALL5: |
230 | /* This is what we get when remote disconnects. */ |
231 | gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("syscall error (probably remote disconnect)" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 232, GF_LOG_DEBUG, "syscall error (probably remote disconnect)" ); } while (0) |
232 | "syscall error (probably remote disconnect)")do { do { if (0) printf ("syscall error (probably remote disconnect)" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 232, GF_LOG_DEBUG, "syscall error (probably remote disconnect)" ); } while (0); |
233 | errno(*__errno_location ()) = ENODATA61; |
234 | goto out; |
235 | default: |
236 | errno(*__errno_location ()) = EIO5; |
237 | goto out; /* "break" would just loop again */ |
238 | } |
239 | } |
240 | out: |
241 | return -1; |
242 | } |
243 | |
244 | #define ssl_connect_one(t)ssl_do((t),((void*)0),0,(SSL_trinary_func *)SSL_connect) ssl_do((t),NULL((void*)0),0,(SSL_trinary_func *)SSL_connect) |
245 | #define ssl_accept_one(t)ssl_do((t),((void*)0),0,(SSL_trinary_func *)SSL_accept) ssl_do((t),NULL((void*)0),0,(SSL_trinary_func *)SSL_accept) |
246 | #define ssl_read_one(t,b,l)ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_read) |
247 | #define ssl_write_one(t,b,l)ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) ssl_do((t),(b),(l),(SSL_trinary_func *)SSL_write) |
248 | |
249 | int |
250 | ssl_setup_connection (rpc_transport_t *this, int server) |
251 | { |
252 | X509 *peer = NULL((void*)0); |
253 | char peer_CN[256] = ""; |
254 | int ret = -1; |
255 | socket_private_t *priv = NULL((void*)0); |
256 | |
257 | GF_VALIDATE_OR_GOTO(this->name,this->private,done)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 257, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto done; } } while (0); |
258 | priv = this->private; |
259 | |
260 | priv->ssl_ssl = SSL_new(priv->ssl_ctx); |
261 | if (!priv->ssl_ssl) { |
262 | gf_log(this->name,GF_LOG_ERROR,"SSL_new failed")do { do { if (0) printf ("SSL_new failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 262, GF_LOG_ERROR, "SSL_new failed"); } while (0); |
263 | ssl_dump_error_stack(this->name); |
264 | goto done; |
265 | } |
266 | priv->ssl_sbio = BIO_new_socket(priv->sock,BIO_NOCLOSE0x00); |
267 | if (!priv->ssl_sbio) { |
268 | gf_log(this->name,GF_LOG_ERROR,"BIO_new_socket failed")do { do { if (0) printf ("BIO_new_socket failed"); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 268, GF_LOG_ERROR ,"BIO_new_socket failed"); } while (0); |
269 | ssl_dump_error_stack(this->name); |
270 | goto free_ssl; |
271 | } |
272 | SSL_set_bio(priv->ssl_ssl,priv->ssl_sbio,priv->ssl_sbio); |
273 | |
274 | if (server) { |
275 | ret = ssl_accept_one(this)ssl_do((this),((void*)0),0,(SSL_trinary_func *)SSL_accept); |
276 | } |
277 | else { |
278 | ret = ssl_connect_one(this)ssl_do((this),((void*)0),0,(SSL_trinary_func *)SSL_connect); |
279 | } |
280 | |
281 | /* Make sure _the call_ succeeded. */ |
282 | if (ret < 0) { |
283 | goto ssl_error; |
284 | } |
285 | |
286 | /* Make sure _SSL verification_ succeeded, yielding an identity. */ |
287 | if (SSL_get_verify_result(priv->ssl_ssl) != X509_V_OK0) { |
288 | goto ssl_error; |
289 | } |
290 | peer = SSL_get_peer_certificate(priv->ssl_ssl); |
291 | if (!peer) { |
292 | goto ssl_error; |
293 | } |
294 | |
295 | /* Finally, everything seems OK. */ |
296 | X509_NAME_get_text_by_NID(X509_get_subject_name(peer), |
297 | NID_commonName13, peer_CN, sizeof(peer_CN)-1); |
298 | peer_CN[sizeof(peer_CN)-1] = '\0'; |
299 | gf_log(this->name,GF_LOG_INFO,"peer CN = %s", peer_CN)do { do { if (0) printf ("peer CN = %s", peer_CN); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 299, GF_LOG_INFO ,"peer CN = %s", peer_CN); } while (0); |
300 | return 0; |
301 | |
302 | /* Error paths. */ |
303 | ssl_error: |
304 | gf_log(this->name,GF_LOG_ERROR,"SSL connect error")do { do { if (0) printf ("SSL connect error"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 304, GF_LOG_ERROR, "SSL connect error"); } while (0); |
305 | ssl_dump_error_stack(this->name); |
306 | free_ssl: |
307 | SSL_free(priv->ssl_ssl); |
308 | priv->ssl_ssl = NULL((void*)0); |
309 | done: |
310 | return ret; |
311 | } |
312 | |
313 | |
314 | void |
315 | ssl_teardown_connection (socket_private_t *priv) |
316 | { |
317 | SSL_shutdown(priv->ssl_ssl); |
318 | SSL_clear(priv->ssl_ssl); |
319 | SSL_free(priv->ssl_ssl); |
320 | priv->ssl_ssl = NULL((void*)0); |
321 | } |
322 | |
323 | |
324 | ssize_t |
325 | __socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount) |
326 | { |
327 | socket_private_t *priv = NULL((void*)0); |
328 | int sock = -1; |
329 | int ret = -1; |
330 | |
331 | priv = this->private; |
332 | sock = priv->sock; |
333 | |
334 | if (priv->use_ssl) { |
335 | ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len)ssl_do((this),(opvector->iov_base),(opvector->iov_len), (SSL_trinary_func *)SSL_read); |
336 | } else { |
337 | ret = readv (sock, opvector, opcount); |
338 | } |
339 | |
340 | return ret; |
341 | } |
342 | |
343 | |
344 | ssize_t |
345 | __socket_ssl_read (rpc_transport_t *this, void *buf, size_t count) |
346 | { |
347 | struct iovec iov = {0, }; |
348 | int ret = -1; |
349 | |
350 | iov.iov_base = buf; |
351 | iov.iov_len = count; |
352 | |
353 | ret = __socket_ssl_readv (this, &iov, 1); |
354 | |
355 | return ret; |
356 | } |
357 | |
358 | |
359 | int |
360 | __socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount) |
361 | { |
362 | socket_private_t *priv = NULL((void*)0); |
363 | int sock = -1; |
364 | struct gf_sock_incoming *in = NULL((void*)0); |
365 | int req_len = -1; |
366 | int ret = -1; |
367 | |
368 | priv = this->private; |
369 | sock = priv->sock; |
370 | in = &priv->incoming; |
371 | req_len = iov_length (opvector, opcount); |
372 | |
373 | if (in->record_state == SP_STATE_READING_FRAGHDR) { |
374 | in->ra_read = 0; |
375 | in->ra_served = 0; |
376 | in->ra_max = 0; |
377 | in->ra_buf = NULL((void*)0); |
378 | goto uncached; |
379 | } |
380 | |
381 | if (!in->ra_max) { |
382 | /* first call after passing SP_STATE_READING_FRAGHDR */ |
383 | in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX)((((uint32_t)(in->fraghdr & 0x7fffffffU)))<(1024)?( ((uint32_t)(in->fraghdr & 0x7fffffffU))):(1024)); |
384 | /* Note that the in->iobuf is the primary iobuf into which |
385 | headers are read into. By using this itself as our |
386 | read-ahead cache, we can avoid memory copies in iov_load |
387 | */ |
388 | in->ra_buf = iobuf_ptr (in->iobuf)((in->iobuf)->ptr); |
389 | } |
390 | |
391 | /* fill read-ahead */ |
392 | if (in->ra_read < in->ra_max) { |
393 | ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read], |
394 | (in->ra_max - in->ra_read)); |
395 | if (ret > 0) |
396 | in->ra_read += ret; |
397 | |
398 | /* we proceed to test if there is still cached data to |
399 | be served even if readahead could not progress */ |
400 | } |
401 | |
402 | /* serve cached */ |
403 | if (in->ra_served < in->ra_read) { |
404 | ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served], |
405 | min (req_len, (in->ra_read - in->ra_served))((req_len)<((in->ra_read - in->ra_served))?(req_len) :((in->ra_read - in->ra_served)))); |
406 | |
407 | in->ra_served += ret; |
408 | /* Do not read uncached and cached in the same call */ |
409 | goto out; |
410 | } |
411 | |
412 | if (in->ra_read < in->ra_max) |
413 | /* If there was no cached data to be served, (and we are |
414 | guaranteed to have already performed an attempt to progress |
415 | readahead above), and we have not yet read out the full |
416 | readahead capacity, then bail out for now without doing |
417 | the uncached read below (as that will overtake future cached |
418 | read) |
419 | */ |
420 | goto out; |
421 | uncached: |
422 | ret = __socket_ssl_readv (this, opvector, opcount); |
423 | out: |
424 | return ret; |
425 | } |
426 | |
427 | |
428 | /* |
429 | * return value: |
430 | * 0 = success (completed) |
431 | * -1 = error |
432 | * > 0 = incomplete |
433 | */ |
434 | |
435 | int |
436 | __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, |
437 | struct iovec **pending_vector, int *pending_count, size_t *bytes, |
438 | int write) |
439 | { |
440 | socket_private_t *priv = NULL((void*)0); |
441 | int sock = -1; |
442 | int ret = -1; |
443 | struct iovec *opvector = NULL((void*)0); |
444 | int opcount = 0; |
445 | int moved = 0; |
446 | |
447 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 447, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
448 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 448, GF_LOG_ERROR, "invalid argument: " "this->private") ; } while (0); goto out; } } while (0); |
449 | |
450 | priv = this->private; |
451 | sock = priv->sock; |
452 | |
453 | opvector = vector; |
454 | opcount = count; |
455 | |
456 | if (bytes != NULL((void*)0)) { |
457 | *bytes = 0; |
458 | } |
459 | |
460 | while (opcount > 0) { |
461 | if (opvector->iov_len == 0) { |
462 | gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("would have passed zero length to read/write" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 463, GF_LOG_DEBUG, "would have passed zero length to read/write" ); } while (0) |
463 | "would have passed zero length to read/write")do { do { if (0) printf ("would have passed zero length to read/write" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 463, GF_LOG_DEBUG, "would have passed zero length to read/write" ); } while (0); |
464 | ++opvector; |
465 | --opcount; |
466 | continue; |
467 | } |
468 | if (write) { |
469 | if (priv->use_ssl) { |
470 | ret = ssl_write_one(this,ssl_do((this),(opvector->iov_base),(opvector->iov_len), (SSL_trinary_func *)SSL_write) |
471 | opvector->iov_base, opvector->iov_len)ssl_do((this),(opvector->iov_base),(opvector->iov_len), (SSL_trinary_func *)SSL_write); |
472 | } |
473 | else { |
474 | ret = writev (sock, opvector, opcount); |
475 | } |
476 | |
477 | if (ret == 0 || (ret == -1 && errno(*__errno_location ()) == EAGAIN11)) { |
478 | /* done for now */ |
479 | break; |
480 | } |
481 | this->total_bytes_write += ret; |
482 | } else { |
483 | ret = __socket_cached_read (this, opvector, opcount); |
484 | |
485 | if (ret == 0) { |
486 | gf_log(this->name,GF_LOG_DEBUG,"EOF on socket")do { do { if (0) printf ("EOF on socket"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 486, GF_LOG_DEBUG, "EOF on socket"); } while (0); |
487 | errno(*__errno_location ()) = ENODATA61; |
488 | ret = -1; |
489 | } |
490 | if (ret == -1 && errno(*__errno_location ()) == EAGAIN11) { |
491 | /* done for now */ |
492 | break; |
493 | } |
494 | this->total_bytes_read += ret; |
495 | } |
496 | |
497 | if (ret == 0) { |
498 | /* Mostly due to 'umount' in client */ |
499 | |
500 | gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("EOF from peer %s", this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 501, GF_LOG_DEBUG, "EOF from peer %s", this-> peerinfo.identifier); } while (0) |
501 | "EOF from peer %s", this->peerinfo.identifier)do { do { if (0) printf ("EOF from peer %s", this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 501, GF_LOG_DEBUG, "EOF from peer %s", this-> peerinfo.identifier); } while (0); |
502 | opcount = -1; |
503 | errno(*__errno_location ()) = ENOTCONN107; |
504 | break; |
505 | } |
506 | if (ret == -1) { |
507 | if (errno(*__errno_location ()) == EINTR4) |
508 | continue; |
509 | |
510 | if (write || (!write && priv->read_fail_log)) |
511 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
512 | "%s on %s failed (%s)",do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
513 | write ? "writev":"readv",do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
514 | this->peerinfo.identifier,do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
515 | strerror (errno))do { do { if (0) printf ("%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 515, GF_LOG_WARNING, "%s on %s failed (%s)", write ? "writev" :"readv", this->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); |
516 | if (priv->use_ssl) { |
517 | ssl_dump_error_stack(this->name); |
518 | } |
519 | opcount = -1; |
520 | break; |
521 | } |
522 | |
523 | if (bytes != NULL((void*)0)) { |
524 | *bytes += ret; |
525 | } |
526 | |
527 | moved = 0; |
528 | |
529 | while (moved < ret) { |
530 | if (!opcount) { |
531 | gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("ran out of iov, moved %d/%d", moved , ret); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 533, GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret ); } while (0) |
532 | "ran out of iov, moved %d/%d",do { do { if (0) printf ("ran out of iov, moved %d/%d", moved , ret); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 533, GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret ); } while (0) |
533 | moved, ret)do { do { if (0) printf ("ran out of iov, moved %d/%d", moved , ret); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 533, GF_LOG_DEBUG, "ran out of iov, moved %d/%d", moved, ret ); } while (0); |
534 | goto ran_out; |
535 | } |
536 | if (!opvector[0].iov_len) { |
537 | opvector++; |
538 | opcount--; |
539 | continue; |
540 | } |
541 | if ((ret - moved) >= opvector[0].iov_len) { |
542 | moved += opvector[0].iov_len; |
543 | opvector++; |
544 | opcount--; |
545 | } else { |
546 | opvector[0].iov_len -= (ret - moved); |
547 | opvector[0].iov_base += (ret - moved); |
548 | moved += (ret - moved); |
549 | } |
550 | } |
551 | } |
552 | |
553 | ran_out: |
554 | |
555 | if (pending_vector) |
556 | *pending_vector = opvector; |
557 | |
558 | if (pending_count) |
559 | *pending_count = opcount; |
560 | |
561 | out: |
562 | return opcount; |
563 | } |
564 | |
565 | |
566 | int |
567 | __socket_readv (rpc_transport_t *this, struct iovec *vector, int count, |
568 | struct iovec **pending_vector, int *pending_count, |
569 | size_t *bytes) |
570 | { |
571 | int ret = -1; |
572 | |
573 | ret = __socket_rwv (this, vector, count, |
574 | pending_vector, pending_count, bytes, 0); |
575 | |
576 | return ret; |
577 | } |
578 | |
579 | |
580 | int |
581 | __socket_writev (rpc_transport_t *this, struct iovec *vector, int count, |
582 | struct iovec **pending_vector, int *pending_count) |
583 | { |
584 | int ret = -1; |
585 | |
586 | ret = __socket_rwv (this, vector, count, |
587 | pending_vector, pending_count, NULL((void*)0), 1); |
588 | |
589 | return ret; |
590 | } |
591 | |
592 | |
593 | int |
594 | __socket_shutdown (rpc_transport_t *this) |
595 | { |
596 | int ret = -1; |
597 | socket_private_t *priv = this->private; |
598 | |
599 | priv->connected = -1; |
600 | ret = shutdown (priv->sock, SHUT_RDWRSHUT_RDWR); |
601 | if (ret) { |
602 | /* its already disconnected.. no need to understand |
603 | why it failed to shutdown in normal cases */ |
604 | gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("shutdown() returned %d. %s", ret, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 606, GF_LOG_DEBUG, "shutdown() returned %d. %s" , ret, strerror ((*__errno_location ()))); } while (0) |
605 | "shutdown() returned %d. %s",do { do { if (0) printf ("shutdown() returned %d. %s", ret, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 606, GF_LOG_DEBUG, "shutdown() returned %d. %s" , ret, strerror ((*__errno_location ()))); } while (0) |
606 | ret, strerror (errno))do { do { if (0) printf ("shutdown() returned %d. %s", ret, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 606, GF_LOG_DEBUG, "shutdown() returned %d. %s" , ret, strerror ((*__errno_location ()))); } while (0); |
607 | } |
608 | |
609 | return ret; |
610 | } |
611 | |
612 | int |
613 | __socket_disconnect (rpc_transport_t *this) |
614 | { |
615 | int ret = -1; |
616 | socket_private_t *priv = NULL((void*)0); |
617 | |
618 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 618, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
619 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 619, GF_LOG_ERROR, "invalid argument: " "this->private") ; } while (0); goto out; } } while (0); |
620 | |
621 | priv = this->private; |
622 | |
623 | if (priv->sock != -1) { |
624 | ret = __socket_shutdown(this); |
625 | if (priv->own_thread) { |
626 | /* |
627 | * Without this, reconnect (= disconnect + connect) |
628 | * won't work except by accident. |
629 | */ |
630 | close(priv->sock); |
631 | priv->sock = -1; |
632 | /* |
633 | * Closing the socket forces an error that will wake |
634 | * up the polling thread. Wait for it to notice and |
635 | * respond. |
636 | */ |
637 | if (priv->ot_state == OT_ALIVE) { |
638 | priv->ot_state = OT_DYING; |
639 | pthread_cond_wait(&priv->ot_event,&priv->lock); |
640 | } |
641 | } |
642 | else if (priv->use_ssl) { |
643 | ssl_teardown_connection(priv); |
644 | } |
645 | } |
646 | |
647 | out: |
648 | return ret; |
649 | } |
650 | |
651 | |
652 | int |
653 | __socket_server_bind (rpc_transport_t *this) |
654 | { |
655 | socket_private_t *priv = NULL((void*)0); |
656 | int ret = -1; |
657 | int opt = 1; |
658 | int reuse_check_sock = -1; |
659 | struct sockaddr_storage unix_addr = {0}; |
660 | |
661 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 661, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
662 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 662, GF_LOG_ERROR, "invalid argument: " "this->private") ; } while (0); goto out; } } while (0); |
663 | |
664 | priv = this->private; |
665 | |
666 | ret = setsockopt (priv->sock, SOL_SOCKET1, SO_REUSEADDR2, |
667 | &opt, sizeof (opt)); |
668 | |
669 | if (ret == -1) { |
670 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setsockopt() for SO_REUSEADDR failed (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 672, GF_LOG_ERROR, "setsockopt() for SO_REUSEADDR failed (%s)" , strerror ((*__errno_location ()))); } while (0) |
671 | "setsockopt() for SO_REUSEADDR failed (%s)",do { do { if (0) printf ("setsockopt() for SO_REUSEADDR failed (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 672, GF_LOG_ERROR, "setsockopt() for SO_REUSEADDR failed (%s)" , strerror ((*__errno_location ()))); } while (0) |
672 | strerror (errno))do { do { if (0) printf ("setsockopt() for SO_REUSEADDR failed (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 672, GF_LOG_ERROR, "setsockopt() for SO_REUSEADDR failed (%s)" , strerror ((*__errno_location ()))); } while (0); |
673 | } |
674 | |
675 | /* reuse-address doesn't work for unix type sockets */ |
676 | if (AF_UNIX1 == SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr)->sa_family) { |
677 | memcpy (&unix_addr, SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr), |
678 | this->myinfo.sockaddr_len); |
679 | reuse_check_sock = socket (AF_UNIX1, SOCK_STREAMSOCK_STREAM, 0); |
680 | if (reuse_check_sock >= 0) { |
681 | ret = connect (reuse_check_sock, SA (&unix_addr)((struct sockaddr *)&unix_addr), |
682 | this->myinfo.sockaddr_len); |
683 | if ((ret == -1) && (ECONNREFUSED111 == errno(*__errno_location ()))) { |
684 | unlink (((struct sockaddr_un*)&unix_addr)->sun_path); |
685 | } |
686 | close (reuse_check_sock); |
687 | } |
688 | } |
689 | |
690 | ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, |
691 | this->myinfo.sockaddr_len); |
692 | |
693 | if (ret == -1) { |
694 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("binding to %s failed: %s", this-> myinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 696, GF_LOG_ERROR , "binding to %s failed: %s", this->myinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
695 | "binding to %s failed: %s",do { do { if (0) printf ("binding to %s failed: %s", this-> myinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 696, GF_LOG_ERROR , "binding to %s failed: %s", this->myinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
696 | this->myinfo.identifier, strerror (errno))do { do { if (0) printf ("binding to %s failed: %s", this-> myinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 696, GF_LOG_ERROR , "binding to %s failed: %s", this->myinfo.identifier, strerror ((*__errno_location ()))); } while (0); |
697 | if (errno(*__errno_location ()) == EADDRINUSE98) { |
698 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("Port is already in use"); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 699, GF_LOG_ERROR , "Port is already in use"); } while (0) |
699 | "Port is already in use")do { do { if (0) printf ("Port is already in use"); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 699, GF_LOG_ERROR , "Port is already in use"); } while (0); |
700 | } |
701 | } |
702 | |
703 | out: |
704 | return ret; |
705 | } |
706 | |
707 | |
708 | int |
709 | __socket_nonblock (int fd) |
710 | { |
711 | int flags = 0; |
712 | int ret = -1; |
713 | |
714 | flags = fcntl (fd, F_GETFL3); |
715 | |
716 | if (flags != -1) |
717 | ret = fcntl (fd, F_SETFL4, flags | O_NONBLOCK04000); |
718 | |
719 | return ret; |
720 | } |
721 | |
722 | int |
723 | __socket_nodelay (int fd) |
724 | { |
725 | int on = 1; |
726 | int ret = -1; |
727 | |
728 | ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP, TCP_NODELAY1, |
729 | &on, sizeof (on)); |
730 | if (!ret) |
731 | gf_log (THIS->name, GF_LOG_TRACE,do { do { if (0) printf ("NODELAY enabled for socket %d", fd) ; } while (0); _gf_log ((*__glusterfs_this_location())->name , "socket.c", __FUNCTION__, 732, GF_LOG_TRACE, "NODELAY enabled for socket %d" , fd); } while (0) |
732 | "NODELAY enabled for socket %d", fd)do { do { if (0) printf ("NODELAY enabled for socket %d", fd) ; } while (0); _gf_log ((*__glusterfs_this_location())->name , "socket.c", __FUNCTION__, 732, GF_LOG_TRACE, "NODELAY enabled for socket %d" , fd); } while (0); |
733 | |
734 | return ret; |
735 | } |
736 | |
737 | |
738 | static int |
739 | __socket_keepalive (int fd, int family, int keepalive_intvl, int keepalive_idle) |
740 | { |
741 | int on = 1; |
742 | int ret = -1; |
743 | |
744 | ret = setsockopt (fd, SOL_SOCKET1, SO_KEEPALIVE9, &on, sizeof (on)); |
745 | if (ret == -1) { |
746 | gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep alive option on socket %d" , fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__ , 747, GF_LOG_WARNING, "failed to set keep alive option on socket %d" , fd); } while (0) |
747 | "failed to set keep alive option on socket %d", fd)do { do { if (0) printf ("failed to set keep alive option on socket %d" , fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__ , 747, GF_LOG_WARNING, "failed to set keep alive option on socket %d" , fd); } while (0); |
748 | goto err; |
749 | } |
750 | |
751 | if (keepalive_intvl == GF_USE_DEFAULT_KEEPALIVE(-1)) |
752 | goto done; |
753 | |
754 | #if !defined(GF_LINUX_HOST_OS1) && !defined(__NetBSD__) |
755 | #ifdef GF_SOLARIS_HOST_OS |
756 | ret = setsockopt (fd, SOL_SOCKET1, SO_KEEPALIVE9, &keepalive_intvl, |
757 | sizeof (keepalive_intvl)); |
758 | #else |
759 | ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP, TCP_KEEPALIVE, &keepalive_intvl, |
760 | sizeof (keepalive_intvl)); |
761 | #endif |
762 | if (ret == -1) { |
763 | gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep alive interval on socket %d" , fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__ , 764, GF_LOG_WARNING, "failed to set keep alive interval on socket %d" , fd); } while (0) |
764 | "failed to set keep alive interval on socket %d", fd)do { do { if (0) printf ("failed to set keep alive interval on socket %d" , fd); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__ , 764, GF_LOG_WARNING, "failed to set keep alive interval on socket %d" , fd); } while (0); |
765 | goto err; |
766 | } |
767 | #else |
768 | if (family != AF_INET2) |
769 | goto done; |
770 | |
771 | ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP, TCP_KEEPIDLE4, &keepalive_idle, |
772 | sizeof (keepalive_intvl)); |
773 | if (ret == -1) { |
774 | gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep idle %d on socket %d, %s" , keepalive_idle, fd, strerror((*__errno_location ()))); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__, 776, GF_LOG_WARNING , "failed to set keep idle %d on socket %d, %s", keepalive_idle , fd, strerror((*__errno_location ()))); } while (0) |
775 | "failed to set keep idle %d on socket %d, %s",do { do { if (0) printf ("failed to set keep idle %d on socket %d, %s" , keepalive_idle, fd, strerror((*__errno_location ()))); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__, 776, GF_LOG_WARNING , "failed to set keep idle %d on socket %d, %s", keepalive_idle , fd, strerror((*__errno_location ()))); } while (0) |
776 | keepalive_idle, fd, strerror(errno))do { do { if (0) printf ("failed to set keep idle %d on socket %d, %s" , keepalive_idle, fd, strerror((*__errno_location ()))); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__, 776, GF_LOG_WARNING , "failed to set keep idle %d on socket %d, %s", keepalive_idle , fd, strerror((*__errno_location ()))); } while (0); |
777 | goto err; |
778 | } |
779 | ret = setsockopt (fd, IPPROTO_TCPIPPROTO_TCP , TCP_KEEPINTVL5, &keepalive_intvl, |
780 | sizeof (keepalive_intvl)); |
781 | if (ret == -1) { |
782 | gf_log ("socket", GF_LOG_WARNING,do { do { if (0) printf ("failed to set keep interval %d on socket %d, %s" , keepalive_intvl, fd, strerror((*__errno_location ()))); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__, 784, GF_LOG_WARNING , "failed to set keep interval %d on socket %d, %s", keepalive_intvl , fd, strerror((*__errno_location ()))); } while (0) |
783 | "failed to set keep interval %d on socket %d, %s",do { do { if (0) printf ("failed to set keep interval %d on socket %d, %s" , keepalive_intvl, fd, strerror((*__errno_location ()))); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__, 784, GF_LOG_WARNING , "failed to set keep interval %d on socket %d, %s", keepalive_intvl , fd, strerror((*__errno_location ()))); } while (0) |
784 | keepalive_intvl, fd, strerror(errno))do { do { if (0) printf ("failed to set keep interval %d on socket %d, %s" , keepalive_intvl, fd, strerror((*__errno_location ()))); } while (0); _gf_log ("socket", "socket.c", __FUNCTION__, 784, GF_LOG_WARNING , "failed to set keep interval %d on socket %d, %s", keepalive_intvl , fd, strerror((*__errno_location ()))); } while (0); |
785 | goto err; |
786 | } |
787 | #endif |
788 | |
789 | done: |
790 | gf_log (THIS->name, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval "do { do { if (0) printf ("Keep-alive enabled for socket %d, interval " "%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while (0); _gf_log ((*__glusterfs_this_location())->name, "socket.c" , __FUNCTION__, 791, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval " "%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while (0) |
791 | "%d, idle: %d", fd, keepalive_intvl, keepalive_idle)do { do { if (0) printf ("Keep-alive enabled for socket %d, interval " "%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while (0); _gf_log ((*__glusterfs_this_location())->name, "socket.c" , __FUNCTION__, 791, GF_LOG_TRACE, "Keep-alive enabled for socket %d, interval " "%d, idle: %d", fd, keepalive_intvl, keepalive_idle); } while (0); |
792 | |
793 | err: |
794 | return ret; |
795 | } |
796 | |
797 | |
798 | int |
799 | __socket_connect_finish (int fd) |
800 | { |
801 | int ret = -1; |
802 | int optval = 0; |
803 | socklen_t optlen = sizeof (int); |
804 | |
805 | ret = getsockopt (fd, SOL_SOCKET1, SO_ERROR4, (void *)&optval, &optlen); |
806 | |
807 | if (ret == 0 && optval) { |
808 | errno(*__errno_location ()) = optval; |
809 | ret = -1; |
810 | } |
811 | |
812 | return ret; |
813 | } |
814 | |
815 | |
816 | void |
817 | __socket_reset (rpc_transport_t *this) |
818 | { |
819 | socket_private_t *priv = NULL((void*)0); |
820 | |
821 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 821, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
822 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 822, GF_LOG_ERROR, "invalid argument: " "this->private") ; } while (0); goto out; } } while (0); |
823 | |
824 | priv = this->private; |
825 | |
826 | /* TODO: use mem-pool on incoming data */ |
827 | |
828 | if (priv->incoming.iobref) { |
829 | iobref_unref (priv->incoming.iobref); |
830 | priv->incoming.iobref = NULL((void*)0); |
831 | } |
832 | |
833 | if (priv->incoming.iobuf) { |
834 | iobuf_unref (priv->incoming.iobuf); |
835 | } |
836 | |
837 | GF_FREE (priv->incoming.request_info)__gf_free (priv->incoming.request_info); |
838 | |
839 | memset (&priv->incoming, 0, sizeof (priv->incoming)); |
840 | |
841 | event_unregister (this->ctx->event_pool, priv->sock, priv->idx); |
842 | |
843 | close (priv->sock); |
844 | priv->sock = -1; |
845 | priv->idx = -1; |
846 | priv->connected = -1; |
847 | |
848 | out: |
849 | return; |
850 | } |
851 | |
852 | |
853 | void |
854 | socket_set_lastfrag (uint32_t *fragsize) { |
855 | (*fragsize) |= 0x80000000U; |
856 | } |
857 | |
858 | |
859 | void |
860 | socket_set_frag_header_size (uint32_t size, char *haddr) |
861 | { |
862 | size = htonl (size); |
863 | memcpy (haddr, &size, sizeof (size)); |
864 | } |
865 | |
866 | |
867 | void |
868 | socket_set_last_frag_header_size (uint32_t size, char *haddr) |
869 | { |
870 | socket_set_lastfrag (&size); |
871 | socket_set_frag_header_size (size, haddr); |
872 | } |
873 | |
874 | struct ioq * |
875 | __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) |
876 | { |
877 | struct ioq *entry = NULL((void*)0); |
878 | int count = 0; |
879 | uint32_t size = 0; |
880 | |
881 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 881, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
882 | |
883 | /* TODO: use mem-pool */ |
884 | entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq)__gf_calloc (1, sizeof (*entry), gf_common_mt_ioq); |
885 | if (!entry) |
886 | return NULL((void*)0); |
887 | |
888 | count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; |
889 | |
890 | GF_ASSERT (count <= (MAX_IOVEC - 1))do { if (!(count <= (16 - 1))) { do { do { if (0) printf ( "Assertion failed: " "count <= (MAX_IOVEC - 1)"); } while ( 0); _gf_log_callingfn ("", "socket.c", __FUNCTION__, 890, GF_LOG_ERROR , "Assertion failed: " "count <= (MAX_IOVEC - 1)"); } while (0); } } while (0); |
891 | |
892 | size = iov_length (msg->rpchdr, msg->rpchdrcount) |
893 | + iov_length (msg->proghdr, msg->proghdrcount) |
894 | + iov_length (msg->progpayload, msg->progpayloadcount); |
895 | |
896 | if (size > RPC_MAX_FRAGMENT_SIZE0x7fffffff) { |
897 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("msg size (%u) bigger than the maximum allowed size on " "sockets (%u)", size, 0x7fffffff); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 899, GF_LOG_ERROR, "msg size (%u) bigger than the maximum allowed size on " "sockets (%u)", size, 0x7fffffff); } while (0) |
898 | "msg size (%u) bigger than the maximum allowed size on "do { do { if (0) printf ("msg size (%u) bigger than the maximum allowed size on " "sockets (%u)", size, 0x7fffffff); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 899, GF_LOG_ERROR, "msg size (%u) bigger than the maximum allowed size on " "sockets (%u)", size, 0x7fffffff); } while (0) |
899 | "sockets (%u)", size, RPC_MAX_FRAGMENT_SIZE)do { do { if (0) printf ("msg size (%u) bigger than the maximum allowed size on " "sockets (%u)", size, 0x7fffffff); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 899, GF_LOG_ERROR, "msg size (%u) bigger than the maximum allowed size on " "sockets (%u)", size, 0x7fffffff); } while (0); |
900 | GF_FREE (entry)__gf_free (entry); |
901 | return NULL((void*)0); |
902 | } |
903 | |
904 | socket_set_last_frag_header_size (size, (char *)&entry->fraghdr); |
905 | |
906 | entry->vector[0].iov_base = (char *)&entry->fraghdr; |
907 | entry->vector[0].iov_len = sizeof (entry->fraghdr); |
908 | entry->count = 1; |
909 | |
910 | if (msg->rpchdr != NULL((void*)0)) { |
911 | memcpy (&entry->vector[1], msg->rpchdr, |
912 | sizeof (struct iovec) * msg->rpchdrcount); |
913 | entry->count += msg->rpchdrcount; |
914 | } |
915 | |
916 | if (msg->proghdr != NULL((void*)0)) { |
917 | memcpy (&entry->vector[entry->count], msg->proghdr, |
918 | sizeof (struct iovec) * msg->proghdrcount); |
919 | entry->count += msg->proghdrcount; |
920 | } |
921 | |
922 | if (msg->progpayload != NULL((void*)0)) { |
923 | memcpy (&entry->vector[entry->count], msg->progpayload, |
924 | sizeof (struct iovec) * msg->progpayloadcount); |
925 | entry->count += msg->progpayloadcount; |
926 | } |
927 | |
928 | entry->pending_vector = entry->vector; |
929 | entry->pending_count = entry->count; |
930 | |
931 | if (msg->iobref != NULL((void*)0)) |
932 | entry->iobref = iobref_ref (msg->iobref); |
933 | |
934 | INIT_LIST_HEAD (&entry->list)do { (&entry->list)->next = (&entry->list)-> prev = &entry->list; } while (0); |
935 | |
936 | out: |
937 | return entry; |
938 | } |
939 | |
940 | |
941 | void |
942 | __socket_ioq_entry_free (struct ioq *entry) |
943 | { |
944 | GF_VALIDATE_OR_GOTO ("socket", entry, out)do { if (!entry) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "entry"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 944, GF_LOG_ERROR, "invalid argument: " "entry"); } while (0); goto out; } } while (0); |
945 | |
946 | list_del_init (&entry->list); |
947 | if (entry->iobref) |
948 | iobref_unref (entry->iobref); |
949 | |
950 | /* TODO: use mem-pool */ |
951 | GF_FREE (entry)__gf_free (entry); |
952 | |
953 | out: |
954 | return; |
955 | } |
956 | |
957 | |
958 | void |
959 | __socket_ioq_flush (rpc_transport_t *this) |
960 | { |
961 | socket_private_t *priv = NULL((void*)0); |
962 | struct ioq *entry = NULL((void*)0); |
963 | |
964 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 964, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
965 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 965, GF_LOG_ERROR, "invalid argument: " "this->private") ; } while (0); goto out; } } while (0); |
966 | |
967 | priv = this->private; |
968 | |
969 | while (!list_empty (&priv->ioq)) { |
970 | entry = priv->ioq_next; |
971 | __socket_ioq_entry_free (entry); |
972 | } |
973 | |
974 | out: |
975 | return; |
976 | } |
977 | |
978 | |
979 | int |
980 | __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) |
981 | { |
982 | int ret = -1; |
983 | socket_private_t *priv = NULL((void*)0); |
984 | char a_byte = 0; |
985 | |
986 | ret = __socket_writev (this, entry->pending_vector, |
987 | entry->pending_count, |
988 | &entry->pending_vector, |
989 | &entry->pending_count); |
990 | |
991 | if (ret == 0) { |
992 | /* current entry was completely written */ |
993 | GF_ASSERT (entry->pending_count == 0)do { if (!(entry->pending_count == 0)) { do { do { if (0) printf ("Assertion failed: " "entry->pending_count == 0"); } while (0); _gf_log_callingfn ("", "socket.c", __FUNCTION__, 993, GF_LOG_ERROR , "Assertion failed: " "entry->pending_count == 0"); } while (0); } } while (0); |
994 | __socket_ioq_entry_free (entry); |
995 | priv = this->private; |
996 | if (priv->own_thread) { |
997 | /* |
998 | * The pipe should only remain readable if there are |
999 | * more entries after this, so drain the byte |
1000 | * representing this entry. |
1001 | */ |
1002 | if (!direct && read(priv->pipe[0],&a_byte,1) < 1) { |
1003 | gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("read error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1004, GF_LOG_WARNING , "read error on pipe"); } while (0) |
1004 | "read error on pipe")do { do { if (0) printf ("read error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1004, GF_LOG_WARNING , "read error on pipe"); } while (0); |
1005 | } |
1006 | } |
1007 | } |
1008 | |
1009 | return ret; |
1010 | } |
1011 | |
1012 | |
1013 | int |
1014 | __socket_ioq_churn (rpc_transport_t *this) |
1015 | { |
1016 | socket_private_t *priv = NULL((void*)0); |
1017 | int ret = 0; |
1018 | struct ioq *entry = NULL((void*)0); |
1019 | |
1020 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1020, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1021 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1021, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1022 | |
1023 | priv = this->private; |
1024 | |
1025 | while (!list_empty (&priv->ioq)) { |
1026 | /* pick next entry */ |
1027 | entry = priv->ioq_next; |
1028 | |
1029 | ret = __socket_ioq_churn_entry (this, entry, 0); |
1030 | |
1031 | if (ret != 0) |
1032 | break; |
1033 | } |
1034 | |
1035 | if (!priv->own_thread && list_empty (&priv->ioq)) { |
1036 | /* all pending writes done, not interested in POLLOUT */ |
1037 | priv->idx = event_select_on (this->ctx->event_pool, |
1038 | priv->sock, priv->idx, -1, 0); |
1039 | } |
1040 | |
1041 | out: |
1042 | return ret; |
1043 | } |
1044 | |
1045 | |
1046 | int |
1047 | socket_event_poll_err (rpc_transport_t *this) |
1048 | { |
1049 | socket_private_t *priv = NULL((void*)0); |
1050 | int ret = -1; |
1051 | |
1052 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1052, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1053 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1053, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1054 | |
1055 | priv = this->private; |
1056 | |
1057 | pthread_mutex_lock (&priv->lock); |
1058 | { |
1059 | __socket_ioq_flush (this); |
1060 | __socket_reset (this); |
1061 | } |
1062 | pthread_mutex_unlock (&priv->lock); |
1063 | |
1064 | rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); |
1065 | |
1066 | out: |
1067 | return ret; |
1068 | } |
1069 | |
1070 | |
1071 | int |
1072 | socket_event_poll_out (rpc_transport_t *this) |
1073 | { |
1074 | socket_private_t *priv = NULL((void*)0); |
1075 | int ret = -1; |
1076 | |
1077 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1077, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1078 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1078, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1079 | |
1080 | priv = this->private; |
1081 | |
1082 | pthread_mutex_lock (&priv->lock); |
1083 | { |
1084 | if (priv->connected == 1) { |
1085 | ret = __socket_ioq_churn (this); |
1086 | |
1087 | if (ret == -1) { |
1088 | __socket_disconnect (this); |
1089 | } |
1090 | } |
1091 | } |
1092 | pthread_mutex_unlock (&priv->lock); |
1093 | |
1094 | ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL((void*)0)); |
1095 | |
1096 | out: |
1097 | return ret; |
1098 | } |
1099 | |
1100 | |
1101 | static inline int |
1102 | __socket_read_simple_msg (rpc_transport_t *this) |
1103 | { |
1104 | int ret = 0; |
1105 | uint32_t remaining_size = 0; |
1106 | size_t bytes_read = 0; |
1107 | socket_private_t *priv = NULL((void*)0); |
1108 | struct gf_sock_incoming *in = NULL((void*)0); |
1109 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1110 | |
1111 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1111, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1112 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1112, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1113 | |
1114 | priv = this->private; |
1115 | |
1116 | in = &priv->incoming; |
1117 | frag = &in->frag; |
1118 | |
1119 | switch (frag->simple_state) { |
1120 | |
1121 | case SP_STATE_SIMPLE_MSG_INIT: |
1122 | remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read; |
1123 | |
1124 | __socket_proto_init_pending (priv, remaining_size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > remaining_size ) ? remaining_size : remaining; frag->remaining_size = (remaining_size - frag->pending_vector->iov_len); } while(0); |
1125 | |
1126 | frag->simple_state = SP_STATE_READING_SIMPLE_MSG; |
1127 | |
1128 | /* fall through */ |
1129 | |
1130 | case SP_STATE_READING_SIMPLE_MSG: |
1131 | ret = 0; |
1132 | |
1133 | remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read; |
1134 | |
1135 | if (remaining_size > 0) { |
1136 | ret = __socket_readv (this, |
1137 | in->pending_vector, 1, |
1138 | &in->pending_vector, |
1139 | &in->pending_count, |
1140 | &bytes_read); |
1141 | } |
1142 | |
1143 | if (ret == -1) { |
1144 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0) |
1145 | "reading from socket failed. Error (%s), "do { do { if (0) printf ("reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0) |
1146 | "peer (%s)", strerror (errno),do { do { if (0) printf ("reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0) |
1147 | this->peerinfo.identifier)do { do { if (0) printf ("reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1147, GF_LOG_WARNING, "reading from socket failed. Error (%s), " "peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); |
1148 | break; |
1149 | } |
1150 | |
1151 | frag->bytes_read += bytes_read; |
1152 | frag->fragcurrent += bytes_read; |
1153 | |
1154 | if (ret > 0) { |
1155 | gf_log (this->name, GF_LOG_TRACE,do { do { if (0) printf ("partial read on non-blocking socket." ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 1156, GF_LOG_TRACE, "partial read on non-blocking socket.") ; } while (0) |
1156 | "partial read on non-blocking socket.")do { do { if (0) printf ("partial read on non-blocking socket." ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 1156, GF_LOG_TRACE, "partial read on non-blocking socket.") ; } while (0); |
1157 | break; |
1158 | } |
1159 | |
1160 | if (ret == 0) { |
1161 | frag->simple_state = SP_STATE_SIMPLE_MSG_INIT; |
1162 | } |
1163 | } |
1164 | |
1165 | out: |
1166 | return ret; |
1167 | } |
1168 | |
1169 | |
1170 | static inline int |
1171 | __socket_read_simple_request (rpc_transport_t *this) |
1172 | { |
1173 | return __socket_read_simple_msg (this); |
1174 | } |
1175 | |
1176 | |
1177 | #define rpc_cred_addr(buf)(buf + 8 + 24 - 4) (buf + RPC_MSGTYPE_SIZE8 + RPC_CALL_BODY_SIZE24 - 4) |
1178 | |
1179 | #define rpc_verf_addr(fragcurrent)(fragcurrent - 4) (fragcurrent - 4) |
1180 | |
1181 | #define rpc_msgtype_addr(buf)(buf + 4) (buf + 4) |
1182 | |
1183 | #define rpc_prognum_addr(buf)(buf + 8 + 4) (buf + RPC_MSGTYPE_SIZE8 + 4) |
1184 | #define rpc_progver_addr(buf)(buf + 8 + 8) (buf + RPC_MSGTYPE_SIZE8 + 8) |
1185 | #define rpc_procnum_addr(buf)(buf + 8 + 12) (buf + RPC_MSGTYPE_SIZE8 + 12) |
1186 | |
1187 | static inline int |
1188 | __socket_read_vectored_request (rpc_transport_t *this, rpcsvc_vector_sizer vector_sizer) |
1189 | { |
1190 | socket_private_t *priv = NULL((void*)0); |
1191 | int ret = 0; |
1192 | uint32_t credlen = 0, verflen = 0; |
1193 | char *addr = NULL((void*)0); |
1194 | struct iobuf *iobuf = NULL((void*)0); |
1195 | uint32_t remaining_size = 0; |
1196 | ssize_t readsize = 0; |
1197 | size_t size = 0; |
1198 | struct gf_sock_incoming *in = NULL((void*)0); |
1199 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1200 | sp_rpcfrag_request_state_t *request = NULL((void*)0); |
1201 | |
1202 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1202, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1203 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1203, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1204 | |
1205 | priv = this->private; |
1206 | |
1207 | /* used to reduce the indirection */ |
1208 | in = &priv->incoming; |
1209 | frag = &in->frag; |
1210 | request = &frag->call_body.request; |
1211 | |
1212 | switch (request->vector_state) { |
1213 | case SP_STATE_VECTORED_REQUEST_INIT: |
1214 | request->vector_sizer_state = 0; |
1215 | |
1216 | addr = rpc_cred_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 24 - 4); |
1217 | |
1218 | /* also read verf flavour and verflen */ |
1219 | credlen = ntoh32hton32 (*((uint32_t *)addr)) |
1220 | + RPC_AUTH_FLAVOUR_N_LENGTH_SIZE8; |
1221 | |
1222 | __socket_proto_init_pending (priv, credlen)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > credlen ) ? credlen : remaining; frag->remaining_size = (credlen - frag->pending_vector->iov_len); } while(0); |
1223 | |
1224 | request->vector_state = SP_STATE_READING_CREDBYTES; |
1225 | |
1226 | /* fall through */ |
1227 | |
1228 | case SP_STATE_READING_CREDBYTES: |
1229 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1229, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1229, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1230 | |
1231 | request->vector_state = SP_STATE_READ_CREDBYTES; |
1232 | |
1233 | /* fall through */ |
1234 | |
1235 | case SP_STATE_READ_CREDBYTES: |
1236 | addr = rpc_verf_addr (frag->fragcurrent)(frag->fragcurrent - 4); |
1237 | verflen = ntoh32hton32 (*((uint32_t *)addr)); |
1238 | |
1239 | if (verflen == 0) { |
1240 | request->vector_state = SP_STATE_READ_VERFBYTES; |
1241 | goto sp_state_read_verfbytes; |
1242 | } |
1243 | __socket_proto_init_pending (priv, verflen)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > verflen ) ? verflen : remaining; frag->remaining_size = (verflen - frag->pending_vector->iov_len); } while(0); |
1244 | |
1245 | request->vector_state = SP_STATE_READING_VERFBYTES; |
1246 | |
1247 | /* fall through */ |
1248 | |
1249 | case SP_STATE_READING_VERFBYTES: |
1250 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1250, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1250, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1251 | |
1252 | request->vector_state = SP_STATE_READ_VERFBYTES; |
1253 | |
1254 | /* fall through */ |
1255 | |
1256 | case SP_STATE_READ_VERFBYTES: |
1257 | sp_state_read_verfbytes: |
1258 | /* set the base_addr 'persistently' across multiple calls |
1259 | into the state machine */ |
1260 | in->proghdr_base_addr = frag->fragcurrent; |
1261 | |
1262 | request->vector_sizer_state = |
1263 | vector_sizer (request->vector_sizer_state, |
1264 | &readsize, in->proghdr_base_addr, |
1265 | frag->fragcurrent); |
1266 | __socket_proto_init_pending (priv, readsize)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > readsize ) ? readsize : remaining; frag->remaining_size = (readsize - frag->pending_vector->iov_len); } while(0); |
1267 | |
1268 | request->vector_state = SP_STATE_READING_PROGHDR; |
1269 | |
1270 | /* fall through */ |
1271 | |
1272 | case SP_STATE_READING_PROGHDR: |
1273 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1273, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1273, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1274 | |
1275 | request->vector_state = SP_STATE_READ_PROGHDR; |
1276 | |
1277 | /* fall through */ |
1278 | |
1279 | case SP_STATE_READ_PROGHDR: |
1280 | sp_state_read_proghdr: |
1281 | request->vector_sizer_state = |
1282 | vector_sizer (request->vector_sizer_state, |
1283 | &readsize, in->proghdr_base_addr, |
1284 | frag->fragcurrent); |
1285 | if (readsize == 0) { |
1286 | request->vector_state = SP_STATE_READ_PROGHDR_XDATA; |
1287 | goto sp_state_read_proghdr_xdata; |
1288 | } |
1289 | |
1290 | __socket_proto_init_pending (priv, readsize)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > readsize ) ? readsize : remaining; frag->remaining_size = (readsize - frag->pending_vector->iov_len); } while(0); |
1291 | |
1292 | request->vector_state = SP_STATE_READING_PROGHDR_XDATA; |
1293 | |
1294 | /* fall through */ |
1295 | |
1296 | case SP_STATE_READING_PROGHDR_XDATA: |
1297 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1297, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1297, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1298 | |
1299 | request->vector_state = SP_STATE_READ_PROGHDR; |
1300 | /* check if the vector_sizer() has more to say */ |
1301 | goto sp_state_read_proghdr; |
1302 | |
1303 | case SP_STATE_READ_PROGHDR_XDATA: |
1304 | sp_state_read_proghdr_xdata: |
1305 | if (in->payload_vector.iov_base == NULL((void*)0)) { |
1306 | |
1307 | size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read; |
1308 | iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); |
1309 | if (!iobuf) { |
1310 | ret = -1; |
1311 | break; |
1312 | } |
1313 | |
1314 | if (in->iobref == NULL((void*)0)) { |
1315 | in->iobref = iobref_new (); |
1316 | if (in->iobref == NULL((void*)0)) { |
1317 | ret = -1; |
1318 | iobuf_unref (iobuf); |
1319 | break; |
1320 | } |
1321 | } |
1322 | |
1323 | iobref_add (in->iobref, iobuf); |
1324 | iobuf_unref (iobuf); |
1325 | |
1326 | in->payload_vector.iov_base = iobuf_ptr (iobuf)((iobuf)->ptr); |
1327 | |
1328 | frag->fragcurrent = iobuf_ptr (iobuf)((iobuf)->ptr); |
1329 | } |
1330 | |
1331 | request->vector_state = SP_STATE_READING_PROG; |
1332 | |
1333 | /* fall through */ |
1334 | |
1335 | case SP_STATE_READING_PROG: |
1336 | /* now read the remaining rpc msg into buffer pointed by |
1337 | * fragcurrent |
1338 | */ |
1339 | |
1340 | ret = __socket_read_simple_msg (this); |
1341 | |
1342 | remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read; |
1343 | |
1344 | if ((ret == -1) || |
1345 | ((ret == 0) && (remaining_size == 0) |
1346 | && RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U)))) { |
1347 | request->vector_state = SP_STATE_VECTORED_REQUEST_INIT; |
1348 | in->payload_vector.iov_len |
1349 | = ((unsigned long)frag->fragcurrent |
1350 | - (unsigned long)in->payload_vector.iov_base); |
1351 | } |
1352 | break; |
1353 | } |
1354 | |
1355 | out: |
1356 | return ret; |
1357 | } |
1358 | |
1359 | static inline int |
1360 | __socket_read_request (rpc_transport_t *this) |
1361 | { |
1362 | socket_private_t *priv = NULL((void*)0); |
1363 | uint32_t prognum = 0, procnum = 0, progver = 0; |
1364 | uint32_t remaining_size = 0; |
1365 | int ret = -1; |
1366 | char *buf = NULL((void*)0); |
1367 | rpcsvc_vector_sizer vector_sizer = NULL((void*)0); |
1368 | struct gf_sock_incoming *in = NULL((void*)0); |
1369 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1370 | sp_rpcfrag_request_state_t *request = NULL((void*)0); |
1371 | |
1372 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1372, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1373 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1373, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1374 | |
1375 | priv = this->private; |
1376 | |
1377 | /* used to reduce the indirection */ |
1378 | in = &priv->incoming; |
1379 | frag = &in->frag; |
1380 | request = &frag->call_body.request; |
1381 | |
1382 | switch (request->header_state) { |
1383 | |
1384 | case SP_STATE_REQUEST_HEADER_INIT: |
1385 | |
1386 | __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > 24 ) ? 24 : remaining; frag->remaining_size = (24 - frag-> pending_vector->iov_len); } while(0); |
1387 | |
1388 | request->header_state = SP_STATE_READING_RPCHDR1; |
1389 | |
1390 | /* fall through */ |
1391 | |
1392 | case SP_STATE_READING_RPCHDR1: |
1393 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1393, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1393, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1394 | |
1395 | request->header_state = SP_STATE_READ_RPCHDR1; |
1396 | |
1397 | /* fall through */ |
1398 | |
1399 | case SP_STATE_READ_RPCHDR1: |
1400 | buf = rpc_prognum_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 4); |
1401 | prognum = ntoh32hton32 (*((uint32_t *)buf)); |
1402 | |
1403 | buf = rpc_progver_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 8); |
1404 | progver = ntoh32hton32 (*((uint32_t *)buf)); |
1405 | |
1406 | buf = rpc_procnum_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 8 + 12); |
1407 | procnum = ntoh32hton32 (*((uint32_t *)buf)); |
1408 | |
1409 | if (this->listener) { |
1410 | /* this check is needed as rpcsvc and rpc-clnt |
1411 | * actor structures are not same */ |
1412 | vector_sizer = |
1413 | rpcsvc_get_program_vector_sizer ((rpcsvc_t *)this->mydata, |
1414 | prognum, progver, procnum); |
1415 | } |
1416 | |
1417 | if (vector_sizer) { |
1418 | ret = __socket_read_vectored_request (this, vector_sizer); |
1419 | } else { |
1420 | ret = __socket_read_simple_request (this); |
1421 | } |
1422 | |
1423 | remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read; |
1424 | |
1425 | if ((ret == -1) |
1426 | || ((ret == 0) |
1427 | && (remaining_size == 0) |
1428 | && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) { |
1429 | request->header_state = SP_STATE_REQUEST_HEADER_INIT; |
1430 | } |
1431 | |
1432 | break; |
1433 | } |
1434 | |
1435 | out: |
1436 | return ret; |
1437 | } |
1438 | |
1439 | |
1440 | static inline int |
1441 | __socket_read_accepted_successful_reply (rpc_transport_t *this) |
1442 | { |
1443 | socket_private_t *priv = NULL((void*)0); |
1444 | int ret = 0; |
1445 | struct iobuf *iobuf = NULL((void*)0); |
1446 | gfs3_read_rsp read_rsp = {0, }; |
1447 | ssize_t size = 0; |
1448 | ssize_t default_read_size = 0; |
1449 | char *proghdr_buf = NULL((void*)0); |
1450 | XDR xdr; |
1451 | struct gf_sock_incoming *in = NULL((void*)0); |
1452 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1453 | |
1454 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1454, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1455 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1455, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1456 | |
1457 | priv = this->private; |
1458 | |
1459 | /* used to reduce the indirection */ |
1460 | in = &priv->incoming; |
1461 | frag = &in->frag; |
1462 | |
1463 | switch (frag->call_body.reply.accepted_success_state) { |
1464 | |
1465 | case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT: |
1466 | default_read_size = xdr_sizeof ((xdrproc_t) xdr_gfs3_read_rsp, |
1467 | &read_rsp); |
1468 | |
1469 | proghdr_buf = frag->fragcurrent; |
1470 | |
1471 | __socket_proto_init_pending (priv, default_read_size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > default_read_size ) ? default_read_size : remaining; frag->remaining_size = ( default_read_size - frag->pending_vector->iov_len); } while (0); |
1472 | |
1473 | frag->call_body.reply.accepted_success_state |
1474 | = SP_STATE_READING_PROC_HEADER; |
1475 | |
1476 | /* fall through */ |
1477 | |
1478 | case SP_STATE_READING_PROC_HEADER: |
1479 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1479, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1479, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1480 | |
1481 | /* there can be 'xdata' in read response, figure it out */ |
1482 | xdrmem_create (&xdr, proghdr_buf, default_read_size, |
1483 | XDR_DECODE); |
1484 | |
1485 | /* This will fail if there is xdata sent from server, if not, |
1486 | well and good, we don't need to worry about */ |
1487 | xdr_gfs3_read_rsp (&xdr, &read_rsp); |
1488 | |
1489 | free (read_rsp.xdata.xdata_val); |
1490 | |
1491 | /* need to round off to proper roof (%4), as XDR packing pads |
1492 | the end of opaque object with '0' */ |
1493 | size = roof (read_rsp.xdata.xdata_len, 4)((((read_rsp.xdata.xdata_len)+(4)-1)/((4)?(4):1))*(4)); |
1494 | |
1495 | if (!size) { |
1496 | frag->call_body.reply.accepted_success_state |
1497 | = SP_STATE_READ_PROC_OPAQUE; |
1498 | goto read_proc_opaque; |
1499 | } |
1500 | |
1501 | __socket_proto_init_pending (priv, size)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > size ) ? size : remaining; frag->remaining_size = (size - frag-> pending_vector->iov_len); } while(0); |
1502 | |
1503 | frag->call_body.reply.accepted_success_state |
1504 | = SP_STATE_READING_PROC_OPAQUE; |
1505 | |
1506 | case SP_STATE_READING_PROC_OPAQUE: |
1507 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1507, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1507, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1508 | |
1509 | frag->call_body.reply.accepted_success_state |
1510 | = SP_STATE_READ_PROC_OPAQUE; |
1511 | |
1512 | case SP_STATE_READ_PROC_OPAQUE: |
1513 | read_proc_opaque: |
1514 | if (in->payload_vector.iov_base == NULL((void*)0)) { |
1515 | |
1516 | size = (RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read); |
1517 | |
1518 | iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); |
1519 | if (iobuf == NULL((void*)0)) { |
1520 | ret = -1; |
1521 | goto out; |
1522 | } |
1523 | |
1524 | if (in->iobref == NULL((void*)0)) { |
1525 | in->iobref = iobref_new (); |
1526 | if (in->iobref == NULL((void*)0)) { |
1527 | ret = -1; |
1528 | iobuf_unref (iobuf); |
1529 | goto out; |
1530 | } |
1531 | } |
1532 | |
1533 | iobref_add (in->iobref, iobuf); |
1534 | iobuf_unref (iobuf); |
1535 | |
1536 | in->payload_vector.iov_base = iobuf_ptr (iobuf)((iobuf)->ptr); |
1537 | |
1538 | in->payload_vector.iov_len = size; |
1539 | } |
1540 | |
1541 | frag->fragcurrent = in->payload_vector.iov_base; |
1542 | |
1543 | frag->call_body.reply.accepted_success_state |
1544 | = SP_STATE_READ_PROC_HEADER; |
1545 | |
1546 | /* fall through */ |
1547 | |
1548 | case SP_STATE_READ_PROC_HEADER: |
1549 | /* now read the entire remaining msg into new iobuf */ |
1550 | ret = __socket_read_simple_msg (this); |
1551 | if ((ret == -1) |
1552 | || ((ret == 0) && RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U)))) { |
1553 | frag->call_body.reply.accepted_success_state |
1554 | = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT; |
1555 | } |
1556 | |
1557 | break; |
1558 | } |
1559 | |
1560 | out: |
1561 | return ret; |
1562 | } |
1563 | |
1564 | #define rpc_reply_verflen_addr(fragcurrent)((char *)fragcurrent - 4) ((char *)fragcurrent - 4) |
1565 | #define rpc_reply_accept_status_addr(fragcurrent)((char *)fragcurrent - 4) ((char *)fragcurrent - 4) |
1566 | |
1567 | static inline int |
1568 | __socket_read_accepted_reply (rpc_transport_t *this) |
1569 | { |
1570 | socket_private_t *priv = NULL((void*)0); |
1571 | int ret = -1; |
1572 | char *buf = NULL((void*)0); |
1573 | uint32_t verflen = 0, len = 0; |
1574 | uint32_t remaining_size = 0; |
1575 | struct gf_sock_incoming *in = NULL((void*)0); |
1576 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1577 | |
1578 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1578, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1579 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1579, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1580 | |
1581 | priv = this->private; |
1582 | /* used to reduce the indirection */ |
1583 | in = &priv->incoming; |
1584 | frag = &in->frag; |
1585 | |
1586 | switch (frag->call_body.reply.accepted_state) { |
1587 | |
1588 | case SP_STATE_ACCEPTED_REPLY_INIT: |
1589 | __socket_proto_init_pending (priv,do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > 8 ) ? 8 : remaining; frag->remaining_size = (8 - frag->pending_vector ->iov_len); } while(0) |
1590 | RPC_AUTH_FLAVOUR_N_LENGTH_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > 8 ) ? 8 : remaining; frag->remaining_size = (8 - frag->pending_vector ->iov_len); } while(0); |
1591 | |
1592 | frag->call_body.reply.accepted_state |
1593 | = SP_STATE_READING_REPLY_VERFLEN; |
1594 | |
1595 | /* fall through */ |
1596 | |
1597 | case SP_STATE_READING_REPLY_VERFLEN: |
1598 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1598, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1598, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1599 | |
1600 | frag->call_body.reply.accepted_state |
1601 | = SP_STATE_READ_REPLY_VERFLEN; |
1602 | |
1603 | /* fall through */ |
1604 | |
1605 | case SP_STATE_READ_REPLY_VERFLEN: |
1606 | buf = rpc_reply_verflen_addr (frag->fragcurrent)((char *)frag->fragcurrent - 4); |
1607 | |
1608 | verflen = ntoh32hton32 (*((uint32_t *) buf)); |
1609 | |
1610 | /* also read accept status along with verf data */ |
1611 | len = verflen + RPC_ACCEPT_STATUS_LEN4; |
1612 | |
1613 | __socket_proto_init_pending (priv, len)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > len ) ? len : remaining; frag->remaining_size = (len - frag-> pending_vector->iov_len); } while(0); |
1614 | |
1615 | frag->call_body.reply.accepted_state |
1616 | = SP_STATE_READING_REPLY_VERFBYTES; |
1617 | |
1618 | /* fall through */ |
1619 | |
1620 | case SP_STATE_READING_REPLY_VERFBYTES: |
1621 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1621, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1621, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1622 | |
1623 | frag->call_body.reply.accepted_state |
1624 | = SP_STATE_READ_REPLY_VERFBYTES; |
1625 | |
1626 | buf = rpc_reply_accept_status_addr (frag->fragcurrent)((char *)frag->fragcurrent - 4); |
1627 | |
1628 | frag->call_body.reply.accept_status |
1629 | = ntoh32hton32 (*(uint32_t *) buf); |
1630 | |
1631 | /* fall through */ |
1632 | |
1633 | case SP_STATE_READ_REPLY_VERFBYTES: |
1634 | |
1635 | if (frag->call_body.reply.accept_status |
1636 | == SUCCESS) { |
1637 | ret = __socket_read_accepted_successful_reply (this); |
1638 | } else { |
1639 | /* read entire remaining msg into buffer pointed to by |
1640 | * fragcurrent |
1641 | */ |
1642 | ret = __socket_read_simple_msg (this); |
1643 | } |
1644 | |
1645 | remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) |
1646 | - frag->bytes_read; |
1647 | |
1648 | if ((ret == -1) |
1649 | || ((ret == 0) && (remaining_size == 0) |
1650 | && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) { |
1651 | frag->call_body.reply.accepted_state |
1652 | = SP_STATE_ACCEPTED_REPLY_INIT; |
1653 | } |
1654 | |
1655 | break; |
1656 | } |
1657 | |
1658 | out: |
1659 | return ret; |
1660 | } |
1661 | |
1662 | |
1663 | static inline int |
1664 | __socket_read_denied_reply (rpc_transport_t *this) |
1665 | { |
1666 | return __socket_read_simple_msg (this); |
1667 | } |
1668 | |
1669 | |
1670 | #define rpc_reply_status_addr(fragcurrent)((char *)fragcurrent - 4) ((char *)fragcurrent - 4) |
1671 | |
1672 | |
1673 | static inline int |
1674 | __socket_read_vectored_reply (rpc_transport_t *this) |
1675 | { |
1676 | socket_private_t *priv = NULL((void*)0); |
1677 | int ret = 0; |
1678 | char *buf = NULL((void*)0); |
1679 | uint32_t remaining_size = 0; |
1680 | struct gf_sock_incoming *in = NULL((void*)0); |
1681 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1682 | |
1683 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1683, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1684 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1684, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1685 | |
1686 | priv = this->private; |
1687 | in = &priv->incoming; |
1688 | frag = &in->frag; |
1689 | |
1690 | switch (frag->call_body.reply.status_state) { |
1691 | |
1692 | case SP_STATE_ACCEPTED_REPLY_INIT: |
1693 | __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > 4 ) ? 4 : remaining; frag->remaining_size = (4 - frag->pending_vector ->iov_len); } while(0); |
1694 | |
1695 | frag->call_body.reply.status_state |
1696 | = SP_STATE_READING_REPLY_STATUS; |
1697 | |
1698 | /* fall through */ |
1699 | |
1700 | case SP_STATE_READING_REPLY_STATUS: |
1701 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1701, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1701, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1702 | |
1703 | buf = rpc_reply_status_addr (frag->fragcurrent)((char *)frag->fragcurrent - 4); |
1704 | |
1705 | frag->call_body.reply.accept_status |
1706 | = ntoh32hton32 (*((uint32_t *) buf)); |
1707 | |
1708 | frag->call_body.reply.status_state |
1709 | = SP_STATE_READ_REPLY_STATUS; |
1710 | |
1711 | /* fall through */ |
1712 | |
1713 | case SP_STATE_READ_REPLY_STATUS: |
1714 | if (frag->call_body.reply.accept_status == MSG_ACCEPTED) { |
1715 | ret = __socket_read_accepted_reply (this); |
1716 | } else { |
1717 | ret = __socket_read_denied_reply (this); |
1718 | } |
1719 | |
1720 | remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read; |
1721 | |
1722 | if ((ret == -1) |
1723 | || ((ret == 0) && (remaining_size == 0) |
1724 | && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) { |
1725 | frag->call_body.reply.status_state |
1726 | = SP_STATE_ACCEPTED_REPLY_INIT; |
1727 | in->payload_vector.iov_len |
1728 | = (unsigned long)frag->fragcurrent |
1729 | - (unsigned long)in->payload_vector.iov_base; |
1730 | } |
1731 | break; |
1732 | } |
1733 | |
1734 | out: |
1735 | return ret; |
1736 | } |
1737 | |
1738 | |
1739 | static inline int |
1740 | __socket_read_simple_reply (rpc_transport_t *this) |
1741 | { |
1742 | return __socket_read_simple_msg (this); |
1743 | } |
1744 | |
1745 | #define rpc_xid_addr(buf)(buf) (buf) |
1746 | |
1747 | static inline int |
1748 | __socket_read_reply (rpc_transport_t *this) |
1749 | { |
1750 | socket_private_t *priv = NULL((void*)0); |
1751 | char *buf = NULL((void*)0); |
1752 | int32_t ret = -1; |
1753 | rpc_request_info_t *request_info = NULL((void*)0); |
1754 | char map_xid = 0; |
1755 | struct gf_sock_incoming *in = NULL((void*)0); |
1756 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1757 | |
1758 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1758, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1759 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1759, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1760 | |
1761 | priv = this->private; |
1762 | in = &priv->incoming; |
1763 | frag = &in->frag; |
1764 | |
1765 | buf = rpc_xid_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr)); |
1766 | |
1767 | if (in->request_info == NULL((void*)0)) { |
1768 | in->request_info = GF_CALLOC (1, sizeof (*request_info),__gf_calloc (1, sizeof (*request_info), gf_common_mt_rpc_trans_reqinfo_t ) |
1769 | gf_common_mt_rpc_trans_reqinfo_t)__gf_calloc (1, sizeof (*request_info), gf_common_mt_rpc_trans_reqinfo_t ); |
1770 | if (in->request_info == NULL((void*)0)) { |
1771 | goto out; |
1772 | } |
1773 | |
1774 | map_xid = 1; |
1775 | } |
1776 | |
1777 | request_info = in->request_info; |
1778 | |
1779 | if (map_xid) { |
1780 | request_info->xid = ntoh32hton32 (*((uint32_t *) buf)); |
1781 | |
1782 | /* release priv->lock, so as to avoid deadlock b/w conn->lock |
1783 | * and priv->lock, since we are doing an upcall here. |
1784 | */ |
1785 | pthread_mutex_unlock (&priv->lock); |
1786 | { |
1787 | ret = rpc_transport_notify (this, |
1788 | RPC_TRANSPORT_MAP_XID_REQUEST, |
1789 | in->request_info); |
1790 | } |
1791 | pthread_mutex_lock (&priv->lock); |
1792 | |
1793 | if (ret == -1) { |
1794 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("notify for event MAP_XID failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1795, GF_LOG_WARNING, "notify for event MAP_XID failed"); } while (0) |
1795 | "notify for event MAP_XID failed")do { do { if (0) printf ("notify for event MAP_XID failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1795, GF_LOG_WARNING, "notify for event MAP_XID failed"); } while (0); |
1796 | goto out; |
1797 | } |
1798 | } |
1799 | |
1800 | if ((request_info->prognum == GLUSTER_FOP_PROGRAM1298437) |
1801 | && (request_info->procnum == GF_FOP_READ)) { |
1802 | if (map_xid && request_info->rsp.rsp_payload_count != 0) { |
1803 | in->iobref = iobref_ref (request_info->rsp.rsp_iobref); |
1804 | in->payload_vector = *request_info->rsp.rsp_payload; |
1805 | } |
1806 | |
1807 | ret = __socket_read_vectored_reply (this); |
1808 | } else { |
1809 | ret = __socket_read_simple_reply (this); |
1810 | } |
1811 | out: |
1812 | return ret; |
1813 | } |
1814 | |
1815 | |
1816 | /* returns the number of bytes yet to be read in a fragment */ |
1817 | static inline int |
1818 | __socket_read_frag (rpc_transport_t *this) |
1819 | { |
1820 | socket_private_t *priv = NULL((void*)0); |
1821 | int32_t ret = 0; |
1822 | char *buf = NULL((void*)0); |
1823 | uint32_t remaining_size = 0; |
1824 | struct gf_sock_incoming *in = NULL((void*)0); |
1825 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1826 | |
1827 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1827, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1828 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1828, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1829 | |
1830 | priv = this->private; |
1831 | /* used to reduce the indirection */ |
1832 | in = &priv->incoming; |
1833 | frag = &in->frag; |
1834 | |
1835 | switch (frag->state) { |
1836 | case SP_STATE_NADA: |
1837 | __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE)do { uint32_t remaining = 0; struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; remaining = (((uint32_t )(priv->incoming.fraghdr & 0x7fffffffU)) - frag->bytes_read ); do { struct gf_sock_incoming_frag *frag; frag = &priv-> incoming.frag; memset (&frag->vector, 0, sizeof (frag-> vector)); frag->pending_vector = &frag->vector; frag ->pending_vector->iov_base = frag->fragcurrent; priv ->incoming.pending_vector = frag->pending_vector; } while (0); frag->pending_vector->iov_len = (remaining > 8 ) ? 8 : remaining; frag->remaining_size = (8 - frag->pending_vector ->iov_len); } while(0); |
1838 | |
1839 | frag->state = SP_STATE_READING_MSGTYPE; |
1840 | |
1841 | /* fall through */ |
1842 | |
1843 | case SP_STATE_READING_MSGTYPE: |
1844 | __socket_proto_read (priv, ret){ size_t bytes_read = 0; struct gf_sock_incoming *in; in = & priv->incoming; do { uint32_t remaining; struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; if (frag->pending_vector ->iov_len == 0) { remaining = (((uint32_t)(priv->incoming .fraghdr & 0x7fffffffU)) - frag->bytes_read); frag-> pending_vector->iov_len = (remaining > frag->remaining_size ) ? frag->remaining_size : remaining; frag->remaining_size -= frag->pending_vector->iov_len; } } while (0); ret = __socket_readv (this, in->pending_vector, 1, &in-> pending_vector, &in->pending_count, &bytes_read); if (ret == -1) { if (priv->read_fail_log) do { do { if (0) printf ("reading from socket failed." "Error (%s), peer (%s)", strerror ((*__errno_location ())), this->peerinfo.identifier); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1844, GF_LOG_WARNING, "reading from socket failed." "Error (%s), peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); break; } { struct gf_sock_incoming_frag *frag ; frag = &priv->incoming.frag; frag->fragcurrent += bytes_read; frag->bytes_read += bytes_read; if ((ret > 0) || (frag->remaining_size != 0)) { if (frag->remaining_size != 0 && ret == 0) { do { struct gf_sock_incoming_frag *frag; frag = &priv->incoming.frag; memset (&frag ->vector, 0, sizeof (frag->vector)); frag->pending_vector = &frag->vector; frag->pending_vector->iov_base = frag->fragcurrent; priv->incoming.pending_vector = frag ->pending_vector; } while (0); } do { do { if (0) printf ( "partial read on non-blocking socket"); } while (0); _gf_log ( this->name, "socket.c", __FUNCTION__, 1844, GF_LOG_TRACE, "partial read on non-blocking socket" ); } while (0); break; } }; }; |
1845 | |
1846 | frag->state = SP_STATE_READ_MSGTYPE; |
1847 | /* fall through */ |
1848 | |
1849 | case SP_STATE_READ_MSGTYPE: |
1850 | buf = rpc_msgtype_addr (iobuf_ptr (in->iobuf))(((in->iobuf)->ptr) + 4); |
1851 | in->msg_type = ntoh32hton32 (*((uint32_t *)buf)); |
1852 | |
1853 | if (in->msg_type == CALL) { |
1854 | ret = __socket_read_request (this); |
1855 | } else if (in->msg_type == REPLY) { |
1856 | ret = __socket_read_reply (this); |
1857 | } else if (in->msg_type == GF_UNIVERSAL_ANSWER42) { |
1858 | gf_log ("rpc", GF_LOG_ERROR,do { do { if (0) printf ("older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c" , __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0) |
1859 | "older version of protocol/process trying to "do { do { if (0) printf ("older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c" , __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0) |
1860 | "connect from %s. use newer version on that node",do { do { if (0) printf ("older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c" , __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0) |
1861 | this->peerinfo.identifier)do { do { if (0) printf ("older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0); _gf_log ("rpc", "socket.c" , __FUNCTION__, 1861, GF_LOG_ERROR, "older version of protocol/process trying to " "connect from %s. use newer version on that node", this-> peerinfo.identifier); } while (0); |
1862 | } else { |
1863 | gf_log ("rpc", GF_LOG_ERROR,do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s" , in->msg_type, this->peerinfo.identifier); } while (0) ; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR , "wrong MSG-TYPE (%d) received from %s", in->msg_type, this ->peerinfo.identifier); } while (0) |
1864 | "wrong MSG-TYPE (%d) received from %s",do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s" , in->msg_type, this->peerinfo.identifier); } while (0) ; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR , "wrong MSG-TYPE (%d) received from %s", in->msg_type, this ->peerinfo.identifier); } while (0) |
1865 | in->msg_type,do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s" , in->msg_type, this->peerinfo.identifier); } while (0) ; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR , "wrong MSG-TYPE (%d) received from %s", in->msg_type, this ->peerinfo.identifier); } while (0) |
1866 | this->peerinfo.identifier)do { do { if (0) printf ("wrong MSG-TYPE (%d) received from %s" , in->msg_type, this->peerinfo.identifier); } while (0) ; _gf_log ("rpc", "socket.c", __FUNCTION__, 1866, GF_LOG_ERROR , "wrong MSG-TYPE (%d) received from %s", in->msg_type, this ->peerinfo.identifier); } while (0); |
1867 | ret = -1; |
1868 | } |
1869 | |
1870 | remaining_size = RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)) - frag->bytes_read; |
1871 | |
1872 | if ((ret == -1) |
1873 | || ((ret == 0) && (remaining_size == 0) |
1874 | && (RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))))) { |
1875 | frag->state = SP_STATE_NADA; |
1876 | } |
1877 | |
1878 | break; |
1879 | } |
1880 | |
1881 | out: |
1882 | return ret; |
1883 | } |
1884 | |
1885 | |
1886 | static inline |
1887 | void __socket_reset_priv (socket_private_t *priv) |
1888 | { |
1889 | struct gf_sock_incoming *in = NULL((void*)0); |
1890 | |
1891 | /* used to reduce the indirection */ |
1892 | in = &priv->incoming; |
1893 | |
1894 | if (in->iobref) { |
1895 | iobref_unref (in->iobref); |
1896 | in->iobref = NULL((void*)0); |
1897 | } |
1898 | |
1899 | if (in->iobuf) { |
1900 | iobuf_unref (in->iobuf); |
1901 | } |
1902 | |
1903 | if (in->request_info != NULL((void*)0)) { |
1904 | GF_FREE (in->request_info)__gf_free (in->request_info); |
1905 | in->request_info = NULL((void*)0); |
1906 | } |
1907 | |
1908 | memset (&in->payload_vector, 0, |
1909 | sizeof (in->payload_vector)); |
1910 | |
1911 | in->iobuf = NULL((void*)0); |
1912 | } |
1913 | |
1914 | |
1915 | int |
1916 | __socket_proto_state_machine (rpc_transport_t *this, |
1917 | rpc_transport_pollin_t **pollin) |
1918 | { |
1919 | int ret = -1; |
1920 | socket_private_t *priv = NULL((void*)0); |
1921 | struct iobuf *iobuf = NULL((void*)0); |
1922 | struct iobref *iobref = NULL((void*)0); |
1923 | struct iovec vector[2]; |
1924 | struct gf_sock_incoming *in = NULL((void*)0); |
1925 | struct gf_sock_incoming_frag *frag = NULL((void*)0); |
1926 | |
1927 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 1927, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
1928 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 1928, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
1929 | |
1930 | priv = this->private; |
1931 | /* used to reduce the indirection */ |
1932 | in = &priv->incoming; |
1933 | frag = &in->frag; |
1934 | |
1935 | while (in->record_state != SP_STATE_COMPLETE) { |
1936 | switch (in->record_state) { |
1937 | |
1938 | case SP_STATE_NADA: |
1939 | in->total_bytes_read = 0; |
1940 | in->payload_vector.iov_len = 0; |
1941 | |
1942 | in->pending_vector = in->vector; |
1943 | in->pending_vector->iov_base = &in->fraghdr; |
1944 | |
1945 | in->pending_vector->iov_len = sizeof (in->fraghdr); |
1946 | |
1947 | in->record_state = SP_STATE_READING_FRAGHDR; |
1948 | |
1949 | /* fall through */ |
1950 | |
1951 | case SP_STATE_READING_FRAGHDR: |
1952 | ret = __socket_readv (this, in->pending_vector, 1, |
1953 | &in->pending_vector, |
1954 | &in->pending_count, |
1955 | NULL((void*)0)); |
1956 | if (ret == -1) { |
1957 | if (priv->read_fail_log == 1) { |
1958 | gf_log (this->name,do { do { if (0) printf ("reading from socket failed. Error (%s)" ", peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING : GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
1959 | ((priv->connected == 1) ?do { do { if (0) printf ("reading from socket failed. Error (%s)" ", peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING : GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
1960 | GF_LOG_WARNING : GF_LOG_DEBUG),do { do { if (0) printf ("reading from socket failed. Error (%s)" ", peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING : GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
1961 | "reading from socket failed. Error (%s)"do { do { if (0) printf ("reading from socket failed. Error (%s)" ", peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING : GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
1962 | ", peer (%s)", strerror (errno),do { do { if (0) printf ("reading from socket failed. Error (%s)" ", peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING : GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0) |
1963 | this->peerinfo.identifier)do { do { if (0) printf ("reading from socket failed. Error (%s)" ", peer (%s)", strerror ((*__errno_location ())), this->peerinfo .identifier); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 1963, ((priv->connected == 1) ? GF_LOG_WARNING : GF_LOG_DEBUG), "reading from socket failed. Error (%s)" ", peer (%s)" , strerror ((*__errno_location ())), this->peerinfo.identifier ); } while (0); |
1964 | } |
1965 | goto out; |
1966 | } |
1967 | |
1968 | if (ret > 0) { |
1969 | gf_log (this->name, GF_LOG_TRACE, "partial "do { do { if (0) printf ("partial " "fragment header read"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1970, GF_LOG_TRACE, "partial " "fragment header read"); } while (0) |
1970 | "fragment header read")do { do { if (0) printf ("partial " "fragment header read"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 1970, GF_LOG_TRACE, "partial " "fragment header read"); } while (0); |
1971 | goto out; |
1972 | } |
1973 | |
1974 | if (ret == 0) { |
1975 | in->record_state = SP_STATE_READ_FRAGHDR; |
1976 | } |
1977 | /* fall through */ |
1978 | |
1979 | case SP_STATE_READ_FRAGHDR: |
1980 | |
1981 | in->fraghdr = ntoh32hton32 (in->fraghdr); |
1982 | in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)); |
1983 | iobuf = iobuf_get2 (this->ctx->iobuf_pool, |
1984 | (in->total_bytes_read + |
1985 | sizeof (in->fraghdr))); |
1986 | if (!iobuf) { |
1987 | ret = -ENOMEM12; |
1988 | goto out; |
1989 | } |
1990 | |
1991 | in->iobuf = iobuf; |
1992 | in->iobuf_size = 0; |
1993 | frag->fragcurrent = iobuf_ptr (iobuf)((iobuf)->ptr); |
1994 | in->record_state = SP_STATE_READING_FRAG; |
1995 | /* fall through */ |
1996 | |
1997 | case SP_STATE_READING_FRAG: |
1998 | ret = __socket_read_frag (this); |
1999 | |
2000 | if ((ret == -1) || |
2001 | (frag->bytes_read != RPC_FRAGSIZE (in->fraghdr)((uint32_t)(in->fraghdr & 0x7fffffffU)))) { |
2002 | goto out; |
2003 | } |
2004 | |
2005 | frag->bytes_read = 0; |
2006 | |
2007 | if (!RPC_LASTFRAG (in->fraghdr)((uint32_t)(in->fraghdr & 0x80000000U))) { |
2008 | in->record_state = SP_STATE_READING_FRAGHDR; |
2009 | break; |
2010 | } |
2011 | |
2012 | /* we've read the entire rpc record, notify the |
2013 | * upper layers. |
2014 | */ |
2015 | if (pollin != NULL((void*)0)) { |
2016 | int count = 0; |
2017 | in->iobuf_size = (in->total_bytes_read - |
2018 | in->payload_vector.iov_len); |
2019 | |
2020 | memset (vector, 0, sizeof (vector)); |
2021 | |
2022 | if (in->iobref == NULL((void*)0)) { |
2023 | in->iobref = iobref_new (); |
2024 | if (in->iobref == NULL((void*)0)) { |
2025 | ret = -1; |
2026 | goto out; |
2027 | } |
2028 | } |
2029 | |
2030 | vector[count].iov_base = iobuf_ptr (in->iobuf)((in->iobuf)->ptr); |
2031 | vector[count].iov_len = in->iobuf_size; |
2032 | |
2033 | iobref = in->iobref; |
2034 | |
2035 | count++; |
2036 | |
2037 | if (in->payload_vector.iov_base != NULL((void*)0)) { |
2038 | vector[count] = in->payload_vector; |
2039 | count++; |
2040 | } |
2041 | |
2042 | *pollin = rpc_transport_pollin_alloc (this, |
2043 | vector, |
2044 | count, |
2045 | in->iobuf, |
2046 | iobref, |
2047 | in->request_info); |
2048 | iobuf_unref (in->iobuf); |
2049 | in->iobuf = NULL((void*)0); |
2050 | |
2051 | if (*pollin == NULL((void*)0)) { |
2052 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("transport pollin allocation failed" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2053, GF_LOG_WARNING, "transport pollin allocation failed") ; } while (0) |
2053 | "transport pollin allocation failed")do { do { if (0) printf ("transport pollin allocation failed" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2053, GF_LOG_WARNING, "transport pollin allocation failed") ; } while (0); |
2054 | ret = -1; |
2055 | goto out; |
2056 | } |
2057 | if (in->msg_type == REPLY) |
2058 | (*pollin)->is_reply = 1; |
2059 | |
2060 | in->request_info = NULL((void*)0); |
2061 | } |
2062 | in->record_state = SP_STATE_COMPLETE; |
2063 | break; |
2064 | |
2065 | case SP_STATE_COMPLETE: |
2066 | /* control should not reach here */ |
2067 | gf_log (this->name, GF_LOG_WARNING, "control reached to "do { do { if (0) printf ("control reached to " "SP_STATE_COMPLETE, which should not have " "happened"); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 2069, GF_LOG_WARNING, "control reached to " "SP_STATE_COMPLETE, which should not have " "happened"); } while (0) |
2068 | "SP_STATE_COMPLETE, which should not have "do { do { if (0) printf ("control reached to " "SP_STATE_COMPLETE, which should not have " "happened"); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 2069, GF_LOG_WARNING, "control reached to " "SP_STATE_COMPLETE, which should not have " "happened"); } while (0) |
2069 | "happened")do { do { if (0) printf ("control reached to " "SP_STATE_COMPLETE, which should not have " "happened"); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 2069, GF_LOG_WARNING, "control reached to " "SP_STATE_COMPLETE, which should not have " "happened"); } while (0); |
2070 | break; |
2071 | } |
2072 | } |
2073 | |
2074 | if (in->record_state == SP_STATE_COMPLETE) { |
2075 | in->record_state = SP_STATE_NADA; |
2076 | __socket_reset_priv (priv); |
2077 | } |
2078 | |
2079 | out: |
2080 | if ((ret == -1) && (errno(*__errno_location ()) == EAGAIN11)) { |
2081 | ret = 0; |
2082 | } |
2083 | return ret; |
2084 | } |
2085 | |
2086 | |
2087 | int |
2088 | socket_proto_state_machine (rpc_transport_t *this, |
2089 | rpc_transport_pollin_t **pollin) |
2090 | { |
2091 | socket_private_t *priv = NULL((void*)0); |
2092 | int ret = 0; |
2093 | |
2094 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2094, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
2095 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 2095, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
2096 | |
2097 | priv = this->private; |
2098 | |
2099 | pthread_mutex_lock (&priv->lock); |
2100 | { |
2101 | ret = __socket_proto_state_machine (this, pollin); |
2102 | } |
2103 | pthread_mutex_unlock (&priv->lock); |
2104 | |
2105 | out: |
2106 | return ret; |
2107 | } |
2108 | |
2109 | |
2110 | int |
2111 | socket_event_poll_in (rpc_transport_t *this) |
2112 | { |
2113 | int ret = -1; |
2114 | rpc_transport_pollin_t *pollin = NULL((void*)0); |
2115 | |
2116 | ret = socket_proto_state_machine (this, &pollin); |
2117 | |
2118 | if (pollin != NULL((void*)0)) { |
2119 | ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, |
2120 | pollin); |
2121 | rpc_transport_pollin_destroy (pollin); |
2122 | } |
2123 | |
2124 | return ret; |
2125 | } |
2126 | |
2127 | |
2128 | int |
2129 | socket_connect_finish (rpc_transport_t *this) |
2130 | { |
2131 | int ret = -1; |
2132 | socket_private_t *priv = NULL((void*)0); |
2133 | rpc_transport_event_t event = 0; |
2134 | char notify_rpc = 0; |
2135 | |
2136 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2136, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
2137 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 2137, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
2138 | |
2139 | priv = this->private; |
2140 | |
2141 | pthread_mutex_lock (&priv->lock); |
2142 | { |
2143 | if (priv->connected != 0) |
2144 | goto unlock; |
2145 | |
2146 | get_transport_identifiers (this); |
2147 | |
2148 | ret = __socket_connect_finish (priv->sock); |
2149 | |
2150 | if (ret == -1 && errno(*__errno_location ()) == EINPROGRESS115) |
2151 | ret = 1; |
2152 | |
2153 | if (ret == -1 && errno(*__errno_location ()) != EINPROGRESS115) { |
2154 | if (!priv->connect_finish_log) { |
2155 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("connection to %s failed (%s)", this ->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this-> peerinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
2156 | "connection to %s failed (%s)",do { do { if (0) printf ("connection to %s failed (%s)", this ->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this-> peerinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
2157 | this->peerinfo.identifier,do { do { if (0) printf ("connection to %s failed (%s)", this ->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this-> peerinfo.identifier, strerror ((*__errno_location ()))); } while (0) |
2158 | strerror (errno))do { do { if (0) printf ("connection to %s failed (%s)", this ->peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2158, GF_LOG_ERROR, "connection to %s failed (%s)", this-> peerinfo.identifier, strerror ((*__errno_location ()))); } while (0); |
2159 | priv->connect_finish_log = 1; |
2160 | } |
2161 | __socket_disconnect (this); |
2162 | notify_rpc = 1; |
2163 | event = RPC_TRANSPORT_DISCONNECT; |
2164 | goto unlock; |
2165 | } |
2166 | |
2167 | if (ret == 0) { |
2168 | notify_rpc = 1; |
2169 | |
2170 | this->myinfo.sockaddr_len = |
2171 | sizeof (this->myinfo.sockaddr); |
2172 | |
2173 | ret = getsockname (priv->sock, |
2174 | SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr), |
2175 | &this->myinfo.sockaddr_len); |
2176 | if (ret == -1) { |
2177 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("getsockname on (%d) failed (%s)", priv ->sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2179, GF_LOG_WARNING , "getsockname on (%d) failed (%s)", priv->sock, strerror ( (*__errno_location ()))); } while (0) |
2178 | "getsockname on (%d) failed (%s)",do { do { if (0) printf ("getsockname on (%d) failed (%s)", priv ->sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2179, GF_LOG_WARNING , "getsockname on (%d) failed (%s)", priv->sock, strerror ( (*__errno_location ()))); } while (0) |
2179 | priv->sock, strerror (errno))do { do { if (0) printf ("getsockname on (%d) failed (%s)", priv ->sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2179, GF_LOG_WARNING , "getsockname on (%d) failed (%s)", priv->sock, strerror ( (*__errno_location ()))); } while (0); |
2180 | __socket_disconnect (this); |
2181 | event = GF_EVENT_POLLERR; |
2182 | goto unlock; |
2183 | } |
2184 | |
2185 | priv->connected = 1; |
2186 | priv->connect_finish_log = 0; |
2187 | event = RPC_TRANSPORT_CONNECT; |
2188 | } |
2189 | } |
2190 | unlock: |
2191 | pthread_mutex_unlock (&priv->lock); |
2192 | |
2193 | if (notify_rpc) { |
2194 | rpc_transport_notify (this, event, this); |
2195 | } |
2196 | out: |
2197 | return 0; |
2198 | } |
2199 | |
2200 | |
2201 | /* reads rpc_requests during pollin */ |
2202 | int |
2203 | socket_event_handler (int fd, int idx, void *data, |
2204 | int poll_in, int poll_out, int poll_err) |
2205 | { |
2206 | rpc_transport_t *this = NULL((void*)0); |
2207 | socket_private_t *priv = NULL((void*)0); |
2208 | int ret = -1; |
2209 | |
2210 | this = data; |
2211 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2211, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
2212 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 2212, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
2213 | GF_VALIDATE_OR_GOTO ("socket", this->xl, out)do { if (!this->xl) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->xl"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2213 , GF_LOG_ERROR, "invalid argument: " "this->xl"); } while ( 0); goto out; } } while (0); |
2214 | |
2215 | THIS(*__glusterfs_this_location()) = this->xl; |
2216 | priv = this->private; |
2217 | |
2218 | pthread_mutex_lock (&priv->lock); |
2219 | { |
2220 | priv->idx = idx; |
2221 | } |
2222 | pthread_mutex_unlock (&priv->lock); |
2223 | |
2224 | ret = (priv->connected == 1) ? 0 : socket_connect_finish(this); |
2225 | |
2226 | if (!ret && poll_out) { |
2227 | ret = socket_event_poll_out (this); |
2228 | } |
2229 | |
2230 | if (!ret && poll_in) { |
2231 | ret = socket_event_poll_in (this); |
2232 | } |
2233 | |
2234 | if ((ret < 0) || poll_err) { |
2235 | /* Logging has happened already in earlier cases */ |
2236 | gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),do { do { if (0) printf ("disconnecting now"); } while (0); _gf_log ("transport", "socket.c", __FUNCTION__, 2237, ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), "disconnecting now"); } while (0) |
2237 | "disconnecting now")do { do { if (0) printf ("disconnecting now"); } while (0); _gf_log ("transport", "socket.c", __FUNCTION__, 2237, ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), "disconnecting now"); } while (0); |
2238 | socket_event_poll_err (this); |
2239 | rpc_transport_unref (this); |
2240 | } |
2241 | |
2242 | out: |
2243 | return ret; |
2244 | } |
2245 | |
2246 | |
2247 | void * |
2248 | socket_poller (void *ctx) |
2249 | { |
2250 | rpc_transport_t *this = ctx; |
2251 | socket_private_t *priv = this->private; |
2252 | struct pollfd pfd[2] = {{0,},}; |
2253 | gf_boolean_t to_write = _gf_false; |
2254 | int ret = 0; |
2255 | |
2256 | if (priv->use_ssl) { |
2257 | if (ssl_setup_connection(this,priv->connected) < 0) { |
2258 | gf_log (this->name,GF_LOG_ERROR, "%s setup failed",do { do { if (0) printf ("%s setup failed", priv->connected ? "server" : "client"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2259, GF_LOG_ERROR, "%s setup failed" , priv->connected ? "server" : "client"); } while (0) |
2259 | priv->connected ? "server" : "client")do { do { if (0) printf ("%s setup failed", priv->connected ? "server" : "client"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2259, GF_LOG_ERROR, "%s setup failed" , priv->connected ? "server" : "client"); } while (0); |
2260 | goto err; |
2261 | } |
2262 | } |
2263 | |
2264 | if (!priv->bio) { |
2265 | ret = __socket_nonblock (priv->sock); |
2266 | if (ret == -1) { |
2267 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2269, GF_LOG_WARNING, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2268 | "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2269, GF_LOG_WARNING, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2269 | priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2269, GF_LOG_WARNING, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0); |
2270 | goto err; |
2271 | } |
2272 | } |
2273 | |
2274 | if (priv->connected == 0) { |
2275 | THIS(*__glusterfs_this_location()) = this->xl; |
2276 | ret = socket_connect_finish (this); |
2277 | if (ret != 0) { |
2278 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("asynchronous socket_connect_finish failed" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2279, GF_LOG_WARNING, "asynchronous socket_connect_finish failed" ); } while (0) |
2279 | "asynchronous socket_connect_finish failed")do { do { if (0) printf ("asynchronous socket_connect_finish failed" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2279, GF_LOG_WARNING, "asynchronous socket_connect_finish failed" ); } while (0); |
2280 | } |
2281 | } |
2282 | |
2283 | ret = rpc_transport_notify (this->listener, |
2284 | RPC_TRANSPORT_ACCEPT, this); |
2285 | if (ret != 0) { |
2286 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("asynchronous rpc_transport_notify failed" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2287, GF_LOG_WARNING, "asynchronous rpc_transport_notify failed" ); } while (0) |
2287 | "asynchronous rpc_transport_notify failed")do { do { if (0) printf ("asynchronous rpc_transport_notify failed" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2287, GF_LOG_WARNING, "asynchronous rpc_transport_notify failed" ); } while (0); |
2288 | } |
2289 | |
2290 | for (;;) { |
2291 | pthread_mutex_lock(&priv->lock); |
2292 | to_write = !list_empty(&priv->ioq); |
2293 | pthread_mutex_unlock(&priv->lock); |
2294 | pfd[0].fd = priv->pipe[0]; |
2295 | pfd[0].events = POLL_MASK_ERROR(0x008 | 0x010 | 0x020); |
2296 | pfd[0].revents = 0; |
2297 | pfd[1].fd = priv->sock; |
2298 | pfd[1].events = POLL_MASK_INPUT(0x001 | 0x002) | POLL_MASK_ERROR(0x008 | 0x010 | 0x020); |
2299 | pfd[1].revents = 0; |
2300 | if (to_write) { |
2301 | pfd[1].events |= POLL_MASK_OUTPUT(0x004); |
2302 | } |
2303 | else { |
2304 | pfd[0].events |= POLL_MASK_INPUT(0x001 | 0x002); |
2305 | } |
2306 | if (poll(pfd,2,-1) < 0) { |
2307 | gf_log(this->name,GF_LOG_ERROR,"poll failed")do { do { if (0) printf ("poll failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2307, GF_LOG_ERROR ,"poll failed"); } while (0); |
2308 | break; |
2309 | } |
2310 | if (pfd[0].revents & POLL_MASK_ERROR(0x008 | 0x010 | 0x020)) { |
2311 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("poll error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2312, GF_LOG_ERROR , "poll error on pipe"); } while (0) |
2312 | "poll error on pipe")do { do { if (0) printf ("poll error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2312, GF_LOG_ERROR , "poll error on pipe"); } while (0); |
2313 | break; |
2314 | } |
2315 | /* Only glusterd actually seems to need this. */ |
2316 | THIS(*__glusterfs_this_location()) = this->xl; |
2317 | if (pfd[1].revents & POLL_MASK_INPUT(0x001 | 0x002)) { |
2318 | ret = socket_event_poll_in(this); |
2319 | if (ret >= 0) { |
2320 | /* Suppress errors while making progress. */ |
2321 | pfd[1].revents &= ~POLL_MASK_ERROR(0x008 | 0x010 | 0x020); |
2322 | } |
2323 | else if (errno(*__errno_location ()) == ENOTCONN107) { |
2324 | ret = 0; |
2325 | } |
2326 | } |
2327 | else if (pfd[1].revents & POLL_MASK_OUTPUT(0x004)) { |
2328 | ret = socket_event_poll_out(this); |
2329 | if (ret >= 0) { |
2330 | /* Suppress errors while making progress. */ |
2331 | pfd[1].revents &= ~POLL_MASK_ERROR(0x008 | 0x010 | 0x020); |
2332 | } |
2333 | else if (errno(*__errno_location ()) == ENOTCONN107) { |
2334 | ret = 0; |
2335 | } |
2336 | } |
2337 | else { |
2338 | /* |
2339 | * This usually means that we left poll() because |
2340 | * somebody pushed a byte onto our pipe. That wakeup |
2341 | * is why the pipe is there, but once awake we can do |
2342 | * all the checking we need on the next iteration. |
2343 | */ |
2344 | ret = 0; |
2345 | } |
2346 | if (pfd[1].revents & POLL_MASK_ERROR(0x008 | 0x010 | 0x020)) { |
2347 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("poll error on socket"); } while (0) ; _gf_log (this->name, "socket.c", __FUNCTION__, 2348, GF_LOG_ERROR , "poll error on socket"); } while (0) |
2348 | "poll error on socket")do { do { if (0) printf ("poll error on socket"); } while (0) ; _gf_log (this->name, "socket.c", __FUNCTION__, 2348, GF_LOG_ERROR , "poll error on socket"); } while (0); |
2349 | break; |
2350 | } |
2351 | if (ret < 0) { |
2352 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("error in polling loop"); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 2353, GF_LOG_ERROR , "error in polling loop"); } while (0) |
2353 | "error in polling loop")do { do { if (0) printf ("error in polling loop"); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 2353, GF_LOG_ERROR , "error in polling loop"); } while (0); |
2354 | break; |
2355 | } |
2356 | } |
2357 | |
2358 | err: |
2359 | /* All (and only) I/O errors should come here. */ |
2360 | pthread_mutex_lock(&priv->lock); |
2361 | if (priv->ot_state == OT_ALIVE) { |
2362 | /* |
2363 | * We have to do this if we're here because of an error we |
2364 | * detected ourselves, but need to avoid a recursive call |
2365 | * if our death is the result of an external disconnect. |
2366 | */ |
2367 | __socket_shutdown(this); |
2368 | close(priv->sock); |
2369 | priv->sock = -1; |
2370 | } |
2371 | if (priv->ssl_ssl) { |
2372 | /* |
2373 | * We're always responsible for this part, but only actually |
2374 | * have to do it if we got far enough for ssl_ssl to be valid |
2375 | * (i.e. errors in ssl_setup_connection don't count). |
2376 | */ |
2377 | ssl_teardown_connection(priv); |
2378 | } |
2379 | priv->ot_state = OT_IDLE; |
2380 | /* |
2381 | * We expect there to be only one waiter, but if there do happen to |
2382 | * be multiple it's probably better to unblock them than to let them |
2383 | * hang. If there are none, this is a harmless no-op. |
2384 | */ |
2385 | pthread_cond_broadcast(&priv->ot_event); |
2386 | pthread_mutex_unlock(&priv->lock); |
2387 | rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this); |
2388 | rpc_transport_unref (this); |
2389 | return NULL((void*)0); |
2390 | } |
2391 | |
2392 | |
2393 | void |
2394 | socket_spawn (rpc_transport_t *this) |
2395 | { |
2396 | socket_private_t *priv = this->private; |
2397 | |
2398 | if (priv->ot_state == OT_ALIVE) { |
2399 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("refusing to start redundant poller" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2400, GF_LOG_WARNING, "refusing to start redundant poller") ; } while (0) |
2400 | "refusing to start redundant poller")do { do { if (0) printf ("refusing to start redundant poller" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2400, GF_LOG_WARNING, "refusing to start redundant poller") ; } while (0); |
2401 | return; |
2402 | } |
2403 | |
2404 | priv->ot_state = OT_ALIVE; |
2405 | |
2406 | if (pthread_create(&priv->thread,NULL((void*)0),socket_poller,this) != 0) { |
2407 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("could not create poll thread"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2408, GF_LOG_ERROR, "could not create poll thread"); } while (0) |
2408 | "could not create poll thread")do { do { if (0) printf ("could not create poll thread"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2408, GF_LOG_ERROR, "could not create poll thread"); } while (0); |
2409 | } |
2410 | } |
2411 | |
2412 | int |
2413 | socket_server_event_handler (int fd, int idx, void *data, |
2414 | int poll_in, int poll_out, int poll_err) |
2415 | { |
2416 | rpc_transport_t *this = NULL((void*)0); |
2417 | socket_private_t *priv = NULL((void*)0); |
2418 | int ret = 0; |
2419 | int new_sock = -1; |
2420 | rpc_transport_t *new_trans = NULL((void*)0); |
2421 | struct sockaddr_storage new_sockaddr = {0, }; |
2422 | socklen_t addrlen = sizeof (new_sockaddr); |
2423 | socket_private_t *new_priv = NULL((void*)0); |
2424 | glusterfs_ctx_t *ctx = NULL((void*)0); |
2425 | |
2426 | this = data; |
2427 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2427, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
2428 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 2428, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
2429 | GF_VALIDATE_OR_GOTO ("socket", this->xl, out)do { if (!this->xl) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->xl"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2429 , GF_LOG_ERROR, "invalid argument: " "this->xl"); } while ( 0); goto out; } } while (0); |
2430 | |
2431 | THIS(*__glusterfs_this_location()) = this->xl; |
2432 | priv = this->private; |
2433 | ctx = this->ctx; |
2434 | |
2435 | pthread_mutex_lock (&priv->lock); |
2436 | { |
2437 | priv->idx = idx; |
2438 | |
2439 | if (poll_in) { |
2440 | new_sock = accept (priv->sock, SA (&new_sockaddr)((struct sockaddr *)&new_sockaddr), |
2441 | &addrlen); |
2442 | |
2443 | if (new_sock == -1) { |
2444 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("accept on %d failed (%s)", priv-> sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2446, GF_LOG_WARNING , "accept on %d failed (%s)", priv->sock, strerror ((*__errno_location ()))); } while (0) |
2445 | "accept on %d failed (%s)",do { do { if (0) printf ("accept on %d failed (%s)", priv-> sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2446, GF_LOG_WARNING , "accept on %d failed (%s)", priv->sock, strerror ((*__errno_location ()))); } while (0) |
2446 | priv->sock, strerror (errno))do { do { if (0) printf ("accept on %d failed (%s)", priv-> sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2446, GF_LOG_WARNING , "accept on %d failed (%s)", priv->sock, strerror ((*__errno_location ()))); } while (0); |
2447 | goto unlock; |
2448 | } |
2449 | |
2450 | if (priv->nodelay) { |
2451 | ret = __socket_nodelay (new_sock); |
2452 | if (ret == -1) { |
2453 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for " "NODELAY (%s)", strerror ((*__errno_location ()))); } while ( 0) |
2454 | "setsockopt() failed for "do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for " "NODELAY (%s)", strerror ((*__errno_location ()))); } while ( 0) |
2455 | "NODELAY (%s)",do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for " "NODELAY (%s)", strerror ((*__errno_location ()))); } while ( 0) |
2456 | strerror (errno))do { do { if (0) printf ("setsockopt() failed for " "NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2456, GF_LOG_WARNING, "setsockopt() failed for " "NODELAY (%s)", strerror ((*__errno_location ()))); } while ( 0); |
2457 | } |
2458 | } |
2459 | |
2460 | if (priv->keepalive && |
2461 | new_sockaddr.ss_family != AF_UNIX1) { |
2462 | ret = __socket_keepalive (new_sock, |
2463 | new_sockaddr.ss_family, |
2464 | priv->keepaliveintvl, |
2465 | priv->keepaliveidle); |
2466 | if (ret == -1) |
2467 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("Failed to set keep-alive: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2469, GF_LOG_WARNING, "Failed to set keep-alive: %s" , strerror ((*__errno_location ()))); } while (0) |
2468 | "Failed to set keep-alive: %s",do { do { if (0) printf ("Failed to set keep-alive: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2469, GF_LOG_WARNING, "Failed to set keep-alive: %s" , strerror ((*__errno_location ()))); } while (0) |
2469 | strerror (errno))do { do { if (0) printf ("Failed to set keep-alive: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2469, GF_LOG_WARNING, "Failed to set keep-alive: %s" , strerror ((*__errno_location ()))); } while (0); |
2470 | } |
2471 | |
2472 | new_trans = GF_CALLOC (1, sizeof (*new_trans),__gf_calloc (1, sizeof (*new_trans), gf_common_mt_rpc_trans_t ) |
2473 | gf_common_mt_rpc_trans_t)__gf_calloc (1, sizeof (*new_trans), gf_common_mt_rpc_trans_t ); |
2474 | if (!new_trans) |
2475 | goto unlock; |
2476 | |
2477 | ret = pthread_mutex_init(&new_trans->lock, NULL((void*)0)); |
2478 | if (ret == -1) { |
2479 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("pthread_mutex_init() failed: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2481, GF_LOG_WARNING, "pthread_mutex_init() failed: %s" , strerror ((*__errno_location ()))); } while (0) |
2480 | "pthread_mutex_init() failed: %s",do { do { if (0) printf ("pthread_mutex_init() failed: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2481, GF_LOG_WARNING, "pthread_mutex_init() failed: %s" , strerror ((*__errno_location ()))); } while (0) |
2481 | strerror (errno))do { do { if (0) printf ("pthread_mutex_init() failed: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2481, GF_LOG_WARNING, "pthread_mutex_init() failed: %s" , strerror ((*__errno_location ()))); } while (0); |
2482 | close (new_sock); |
2483 | goto unlock; |
2484 | } |
2485 | |
2486 | new_trans->name = gf_strdup (this->name); |
2487 | |
2488 | memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, |
2489 | addrlen); |
2490 | new_trans->peerinfo.sockaddr_len = addrlen; |
2491 | |
2492 | new_trans->myinfo.sockaddr_len = |
2493 | sizeof (new_trans->myinfo.sockaddr); |
2494 | |
2495 | ret = getsockname (new_sock, |
2496 | SA (&new_trans->myinfo.sockaddr)((struct sockaddr *)&new_trans->myinfo.sockaddr), |
2497 | &new_trans->myinfo.sockaddr_len); |
2498 | if (ret == -1) { |
2499 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("getsockname on %d failed (%s)", new_sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2501, GF_LOG_WARNING, "getsockname on %d failed (%s)" , new_sock, strerror ((*__errno_location ()))); } while (0) |
2500 | "getsockname on %d failed (%s)",do { do { if (0) printf ("getsockname on %d failed (%s)", new_sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2501, GF_LOG_WARNING, "getsockname on %d failed (%s)" , new_sock, strerror ((*__errno_location ()))); } while (0) |
2501 | new_sock, strerror (errno))do { do { if (0) printf ("getsockname on %d failed (%s)", new_sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2501, GF_LOG_WARNING, "getsockname on %d failed (%s)" , new_sock, strerror ((*__errno_location ()))); } while (0); |
2502 | close (new_sock); |
2503 | goto unlock; |
2504 | } |
2505 | |
2506 | get_transport_identifiers (new_trans); |
2507 | ret = socket_init(new_trans); |
2508 | if (ret != 0) { |
2509 | close(new_sock); |
2510 | goto unlock; |
2511 | } |
2512 | new_trans->ops = this->ops; |
2513 | new_trans->init = this->init; |
2514 | new_trans->fini = this->fini; |
2515 | new_trans->ctx = ctx; |
2516 | new_trans->xl = this->xl; |
2517 | new_trans->mydata = this->mydata; |
2518 | new_trans->notify = this->notify; |
2519 | new_trans->listener = this; |
2520 | new_priv = new_trans->private; |
2521 | |
2522 | new_priv->use_ssl = priv->use_ssl; |
2523 | new_priv->sock = new_sock; |
2524 | new_priv->own_thread = priv->own_thread; |
2525 | |
2526 | new_priv->ssl_ctx = priv->ssl_ctx; |
2527 | if (priv->use_ssl && !priv->own_thread) { |
2528 | if (ssl_setup_connection(new_trans,1) < 0) { |
2529 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("server setup failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2530, GF_LOG_ERROR , "server setup failed"); } while (0) |
2530 | "server setup failed")do { do { if (0) printf ("server setup failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2530, GF_LOG_ERROR , "server setup failed"); } while (0); |
2531 | close(new_sock); |
2532 | goto unlock; |
2533 | } |
2534 | } |
2535 | |
2536 | if (!priv->bio && !priv->own_thread) { |
2537 | ret = __socket_nonblock (new_sock); |
2538 | |
2539 | if (ret == -1) { |
2540 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("NBIO on %d failed (%s)", new_sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2542, GF_LOG_WARNING, "NBIO on %d failed (%s)" , new_sock, strerror ((*__errno_location ()))); } while (0) |
2541 | "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", new_sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2542, GF_LOG_WARNING, "NBIO on %d failed (%s)" , new_sock, strerror ((*__errno_location ()))); } while (0) |
2542 | new_sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", new_sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2542, GF_LOG_WARNING, "NBIO on %d failed (%s)" , new_sock, strerror ((*__errno_location ()))); } while (0); |
2543 | |
2544 | close (new_sock); |
2545 | goto unlock; |
2546 | } |
2547 | } |
2548 | |
2549 | pthread_mutex_lock (&new_priv->lock); |
2550 | { |
2551 | /* |
2552 | * In the own_thread case, this is used to |
2553 | * indicate that we're initializing a server |
2554 | * connection. |
2555 | */ |
2556 | new_priv->connected = 1; |
2557 | rpc_transport_ref (new_trans); |
2558 | |
2559 | if (new_priv->own_thread) { |
2560 | if (pipe(new_priv->pipe) < 0) { |
2561 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not create pipe"); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 2562, GF_LOG_ERROR , "could not create pipe"); } while (0) |
2562 | "could not create pipe")do { do { if (0) printf ("could not create pipe"); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 2562, GF_LOG_ERROR , "could not create pipe"); } while (0); |
2563 | } |
2564 | socket_spawn(new_trans); |
2565 | } |
2566 | else { |
2567 | new_priv->idx = |
2568 | event_register (ctx->event_pool, |
2569 | new_sock, |
2570 | socket_event_handler, |
2571 | new_trans, |
2572 | 1, 0); |
2573 | if (new_priv->idx == -1) |
2574 | ret = -1; |
2575 | } |
2576 | |
2577 | } |
2578 | pthread_mutex_unlock (&new_priv->lock); |
2579 | if (ret == -1) { |
2580 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("failed to register the socket with event" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2581, GF_LOG_WARNING, "failed to register the socket with event" ); } while (0) |
2581 | "failed to register the socket with event")do { do { if (0) printf ("failed to register the socket with event" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2581, GF_LOG_WARNING, "failed to register the socket with event" ); } while (0); |
2582 | goto unlock; |
2583 | } |
2584 | |
2585 | if (!priv->own_thread) { |
2586 | ret = rpc_transport_notify (this, |
2587 | RPC_TRANSPORT_ACCEPT, new_trans); |
2588 | } |
2589 | } |
2590 | } |
2591 | unlock: |
2592 | pthread_mutex_unlock (&priv->lock); |
2593 | |
2594 | out: |
2595 | return ret; |
2596 | } |
2597 | |
2598 | |
2599 | int |
2600 | socket_disconnect (rpc_transport_t *this) |
2601 | { |
2602 | socket_private_t *priv = NULL((void*)0); |
2603 | int ret = -1; |
2604 | |
2605 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2605, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
2606 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 2606, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
2607 | |
2608 | priv = this->private; |
2609 | |
2610 | pthread_mutex_lock (&priv->lock); |
2611 | { |
2612 | ret = __socket_disconnect (this); |
2613 | } |
2614 | pthread_mutex_unlock (&priv->lock); |
2615 | |
2616 | out: |
2617 | return ret; |
2618 | } |
2619 | |
2620 | |
2621 | int |
2622 | socket_connect (rpc_transport_t *this, int port) |
2623 | { |
2624 | int ret = -1; |
2625 | int sock = -1; |
2626 | socket_private_t *priv = NULL((void*)0); |
2627 | socklen_t sockaddr_len = 0; |
2628 | glusterfs_ctx_t *ctx = NULL((void*)0); |
2629 | sa_family_t sa_family = {0, }; |
2630 | char *local_addr = NULL((void*)0); |
2631 | union gf_sock_union sock_union; |
2632 | struct sockaddr_in *addr = NULL((void*)0); |
2633 | |
2634 | GF_VALIDATE_OR_GOTO ("socket", this, err)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2634, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto err; } } while (0); |
2635 | GF_VALIDATE_OR_GOTO ("socket", this->private, err)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 2635, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto err; } } while (0); |
2636 | |
2637 | priv = this->private; |
2638 | ctx = this->ctx; |
2639 | |
2640 | if (!priv) { |
2641 | gf_log_callingfn (this->name, GF_LOG_WARNING,do { do { if (0) printf ("connect() called on uninitialized transport" ); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 2642, GF_LOG_WARNING, "connect() called on uninitialized transport" ); } while (0) |
2642 | "connect() called on uninitialized transport")do { do { if (0) printf ("connect() called on uninitialized transport" ); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 2642, GF_LOG_WARNING, "connect() called on uninitialized transport" ); } while (0); |
2643 | goto err; |
2644 | } |
2645 | |
2646 | pthread_mutex_lock (&priv->lock); |
2647 | { |
2648 | sock = priv->sock; |
2649 | } |
2650 | pthread_mutex_unlock (&priv->lock); |
2651 | |
2652 | if (sock != -1) { |
2653 | gf_log_callingfn (this->name, GF_LOG_TRACE,do { do { if (0) printf ("connect () called on transport already connected" ); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 2654, GF_LOG_TRACE, "connect () called on transport already connected" ); } while (0) |
2654 | "connect () called on transport already connected")do { do { if (0) printf ("connect () called on transport already connected" ); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 2654, GF_LOG_TRACE, "connect () called on transport already connected" ); } while (0); |
2655 | errno(*__errno_location ()) = EINPROGRESS115; |
2656 | ret = -1; |
2657 | goto err; |
2658 | } |
2659 | |
2660 | ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, |
2661 | &sockaddr_len, &sa_family); |
2662 | if (ret == -1) { |
2663 | /* logged inside client_get_remote_sockaddr */ |
2664 | goto err; |
2665 | } |
2666 | |
2667 | if (port > 0) { |
2668 | sock_union.sin.sin_port = htons (port); |
2669 | } |
2670 | if (ntohs(sock_union.sin.sin_port) == GF_DEFAULT_SOCKET_LISTEN_PORT24007) { |
2671 | if (priv->use_ssl) { |
2672 | gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("disabling SSL for portmapper connection" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2673, GF_LOG_DEBUG, "disabling SSL for portmapper connection" ); } while (0) |
2673 | "disabling SSL for portmapper connection")do { do { if (0) printf ("disabling SSL for portmapper connection" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2673, GF_LOG_DEBUG, "disabling SSL for portmapper connection" ); } while (0); |
2674 | priv->use_ssl = _gf_false; |
2675 | } |
2676 | } |
2677 | else { |
2678 | if (priv->ssl_enabled && !priv->use_ssl) { |
2679 | gf_log(this->name,GF_LOG_DEBUG,do { do { if (0) printf ("re-enabling SSL for I/O connection" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2680, GF_LOG_DEBUG, "re-enabling SSL for I/O connection"); } while (0) |
2680 | "re-enabling SSL for I/O connection")do { do { if (0) printf ("re-enabling SSL for I/O connection" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2680, GF_LOG_DEBUG, "re-enabling SSL for I/O connection"); } while (0); |
2681 | priv->use_ssl = _gf_true; |
2682 | } |
2683 | } |
2684 | pthread_mutex_lock (&priv->lock); |
2685 | { |
2686 | if (priv->sock != -1) { |
2687 | gf_log (this->name, GF_LOG_TRACE,do { do { if (0) printf ("connect() -- already connected"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2688, GF_LOG_TRACE, "connect() -- already connected"); } while (0) |
2688 | "connect() -- already connected")do { do { if (0) printf ("connect() -- already connected"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2688, GF_LOG_TRACE, "connect() -- already connected"); } while (0); |
2689 | goto unlock; |
2690 | } |
2691 | |
2692 | memcpy (&this->peerinfo.sockaddr, &sock_union.storage, |
2693 | sockaddr_len); |
2694 | this->peerinfo.sockaddr_len = sockaddr_len; |
2695 | |
2696 | priv->sock = socket (sa_family, SOCK_STREAMSOCK_STREAM, 0); |
2697 | if (priv->sock == -1) { |
2698 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("socket creation failed (%s)", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2700, GF_LOG_ERROR, "socket creation failed (%s)" , strerror ((*__errno_location ()))); } while (0) |
2699 | "socket creation failed (%s)",do { do { if (0) printf ("socket creation failed (%s)", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2700, GF_LOG_ERROR, "socket creation failed (%s)" , strerror ((*__errno_location ()))); } while (0) |
2700 | strerror (errno))do { do { if (0) printf ("socket creation failed (%s)", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2700, GF_LOG_ERROR, "socket creation failed (%s)" , strerror ((*__errno_location ()))); } while (0); |
2701 | goto unlock; |
2702 | } |
2703 | |
2704 | /* Cant help if setting socket options fails. We can continue |
2705 | * working nonetheless. |
2706 | */ |
2707 | if (priv->windowsize != 0) { |
2708 | if (setsockopt (priv->sock, SOL_SOCKET1, SO_RCVBUF8, |
2709 | &priv->windowsize, |
2710 | sizeof (priv->windowsize)) < 0) { |
2711 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2712 | "setting receive window "do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2713 | "size failed: %d: %d: %s",do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2714 | priv->sock, priv->windowsize,do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2715 | strerror (errno))do { do { if (0) printf ("setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2715, GF_LOG_ERROR, "setting receive window " "size failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); |
2716 | } |
2717 | |
2718 | if (setsockopt (priv->sock, SOL_SOCKET1, SO_SNDBUF7, |
2719 | &priv->windowsize, |
2720 | sizeof (priv->windowsize)) < 0) { |
2721 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2722 | "setting send window size "do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2723 | "failed: %d: %d: %s",do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2724 | priv->sock, priv->windowsize,do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2725 | strerror (errno))do { do { if (0) printf ("setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2725, GF_LOG_ERROR, "setting send window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); |
2726 | } |
2727 | } |
2728 | |
2729 | if (priv->nodelay) { |
2730 | ret = __socket_nodelay (priv->sock); |
2731 | |
2732 | if (ret == -1) { |
2733 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NODELAY on %d failed (%s)", priv-> sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2735, GF_LOG_ERROR , "NODELAY on %d failed (%s)", priv->sock, strerror ((*__errno_location ()))); } while (0) |
2734 | "NODELAY on %d failed (%s)",do { do { if (0) printf ("NODELAY on %d failed (%s)", priv-> sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2735, GF_LOG_ERROR , "NODELAY on %d failed (%s)", priv->sock, strerror ((*__errno_location ()))); } while (0) |
2735 | priv->sock, strerror (errno))do { do { if (0) printf ("NODELAY on %d failed (%s)", priv-> sock, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2735, GF_LOG_ERROR , "NODELAY on %d failed (%s)", priv->sock, strerror ((*__errno_location ()))); } while (0); |
2736 | } |
2737 | } |
2738 | |
2739 | if (priv->keepalive && sa_family != AF_UNIX1) { |
2740 | ret = __socket_keepalive (priv->sock, |
2741 | sa_family, |
2742 | priv->keepaliveintvl, |
2743 | priv->keepaliveidle); |
2744 | if (ret == -1) |
2745 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("Failed to set keep-alive: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2747, GF_LOG_ERROR, "Failed to set keep-alive: %s" , strerror ((*__errno_location ()))); } while (0) |
2746 | "Failed to set keep-alive: %s",do { do { if (0) printf ("Failed to set keep-alive: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2747, GF_LOG_ERROR, "Failed to set keep-alive: %s" , strerror ((*__errno_location ()))); } while (0) |
2747 | strerror (errno))do { do { if (0) printf ("Failed to set keep-alive: %s", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2747, GF_LOG_ERROR, "Failed to set keep-alive: %s" , strerror ((*__errno_location ()))); } while (0); |
2748 | } |
2749 | |
2750 | SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr)->sa_family = |
2751 | SA (&this->peerinfo.sockaddr)((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family; |
2752 | |
2753 | /* If a source addr is explicitly specified, use it */ |
2754 | ret = dict_get_str (this->options, |
2755 | "transport.socket.source-addr", |
2756 | &local_addr); |
2757 | if (!ret && SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr)->sa_family == AF_INET2) { |
2758 | addr = (struct sockaddr_in *)(&this->myinfo.sockaddr); |
2759 | ret = inet_pton (AF_INET2, local_addr, &(addr->sin_addr.s_addr)); |
Value stored to 'ret' is never read | |
2760 | } |
2761 | |
2762 | ret = client_bind (this, SA (&this->myinfo.sockaddr)((struct sockaddr *)&this->myinfo.sockaddr), |
2763 | &this->myinfo.sockaddr_len, priv->sock); |
2764 | if (ret == -1) { |
2765 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("client bind failed: %s", strerror ( (*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2766, GF_LOG_WARNING, "client bind failed: %s" , strerror ((*__errno_location ()))); } while (0) |
2766 | "client bind failed: %s", strerror (errno))do { do { if (0) printf ("client bind failed: %s", strerror ( (*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2766, GF_LOG_WARNING, "client bind failed: %s" , strerror ((*__errno_location ()))); } while (0); |
2767 | close (priv->sock); |
2768 | priv->sock = -1; |
2769 | goto unlock; |
2770 | } |
2771 | |
2772 | if (!priv->use_ssl && !priv->bio && !priv->own_thread) { |
2773 | ret = __socket_nonblock (priv->sock); |
2774 | if (ret == -1) { |
2775 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2777, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2776 | "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2777, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2777 | priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2777, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0); |
2778 | close (priv->sock); |
2779 | priv->sock = -1; |
2780 | goto unlock; |
2781 | } |
2782 | } |
2783 | |
2784 | ret = connect (priv->sock, SA (&this->peerinfo.sockaddr)((struct sockaddr *)&this->peerinfo.sockaddr), |
2785 | this->peerinfo.sockaddr_len); |
2786 | |
2787 | if (ret == -1 && ((errno(*__errno_location ()) != EINPROGRESS115) && (errno(*__errno_location ()) != ENOENT2))) { |
2788 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("connection attempt on %s failed, (%s)" , this->peerinfo.identifier, strerror ((*__errno_location ( )))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2790, GF_LOG_ERROR, "connection attempt on %s failed, (%s)" , this->peerinfo.identifier, strerror ((*__errno_location ( )))); } while (0) |
2789 | "connection attempt on %s failed, (%s)",do { do { if (0) printf ("connection attempt on %s failed, (%s)" , this->peerinfo.identifier, strerror ((*__errno_location ( )))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2790, GF_LOG_ERROR, "connection attempt on %s failed, (%s)" , this->peerinfo.identifier, strerror ((*__errno_location ( )))); } while (0) |
2790 | this->peerinfo.identifier, strerror (errno))do { do { if (0) printf ("connection attempt on %s failed, (%s)" , this->peerinfo.identifier, strerror ((*__errno_location ( )))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2790, GF_LOG_ERROR, "connection attempt on %s failed, (%s)" , this->peerinfo.identifier, strerror ((*__errno_location ( )))); } while (0); |
2791 | close (priv->sock); |
2792 | priv->sock = -1; |
2793 | goto unlock; |
2794 | } |
2795 | |
2796 | if (priv->use_ssl && !priv->own_thread) { |
2797 | ret = ssl_setup_connection(this,0); |
2798 | if (ret < 0) { |
2799 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("client setup failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2800, GF_LOG_ERROR , "client setup failed"); } while (0) |
2800 | "client setup failed")do { do { if (0) printf ("client setup failed"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2800, GF_LOG_ERROR , "client setup failed"); } while (0); |
2801 | close(priv->sock); |
2802 | priv->sock = -1; |
2803 | goto unlock; |
2804 | } |
2805 | } |
2806 | |
2807 | if (!priv->bio && !priv->own_thread) { |
2808 | ret = __socket_nonblock (priv->sock); |
2809 | |
2810 | if (ret == -1) { |
2811 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2813, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2812 | "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2813, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2813 | priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2813, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0); |
2814 | close (priv->sock); |
2815 | priv->sock = -1; |
2816 | goto unlock; |
2817 | } |
2818 | } |
2819 | |
2820 | /* |
2821 | * In the own_thread case, this is used to indicate that we're |
2822 | * initializing a client connection. |
2823 | */ |
2824 | priv->connected = 0; |
2825 | rpc_transport_ref (this); |
2826 | |
2827 | if (priv->own_thread) { |
2828 | if (pipe(priv->pipe) < 0) { |
2829 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not create pipe"); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 2830, GF_LOG_ERROR , "could not create pipe"); } while (0) |
2830 | "could not create pipe")do { do { if (0) printf ("could not create pipe"); } while (0 ); _gf_log (this->name, "socket.c", __FUNCTION__, 2830, GF_LOG_ERROR , "could not create pipe"); } while (0); |
2831 | } |
2832 | |
2833 | this->listener = this; |
2834 | socket_spawn(this); |
2835 | } |
2836 | else { |
2837 | priv->idx = event_register (ctx->event_pool, priv->sock, |
2838 | socket_event_handler, |
2839 | this, 1, 1); |
2840 | if (priv->idx == -1) { |
2841 | gf_log ("", GF_LOG_WARNING,do { do { if (0) printf ("failed to register the event"); } while (0); _gf_log ("", "socket.c", __FUNCTION__, 2842, GF_LOG_WARNING , "failed to register the event"); } while (0) |
2842 | "failed to register the event")do { do { if (0) printf ("failed to register the event"); } while (0); _gf_log ("", "socket.c", __FUNCTION__, 2842, GF_LOG_WARNING , "failed to register the event"); } while (0); |
2843 | ret = -1; |
2844 | } |
2845 | } |
2846 | } |
2847 | unlock: |
2848 | pthread_mutex_unlock (&priv->lock); |
2849 | |
2850 | err: |
2851 | return ret; |
2852 | } |
2853 | |
2854 | |
2855 | int |
2856 | socket_listen (rpc_transport_t *this) |
2857 | { |
2858 | socket_private_t * priv = NULL((void*)0); |
2859 | int ret = -1; |
2860 | int sock = -1; |
2861 | struct sockaddr_storage sockaddr; |
2862 | socklen_t sockaddr_len = 0; |
2863 | peer_info_t *myinfo = NULL((void*)0); |
2864 | glusterfs_ctx_t *ctx = NULL((void*)0); |
2865 | sa_family_t sa_family = {0, }; |
2866 | |
2867 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 2867, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
2868 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 2868, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
2869 | |
2870 | priv = this->private; |
2871 | myinfo = &this->myinfo; |
2872 | ctx = this->ctx; |
2873 | |
2874 | pthread_mutex_lock (&priv->lock); |
2875 | { |
2876 | sock = priv->sock; |
2877 | } |
2878 | pthread_mutex_unlock (&priv->lock); |
2879 | |
2880 | if (sock != -1) { |
2881 | gf_log_callingfn (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("already listening"); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 2882, GF_LOG_DEBUG , "already listening"); } while (0) |
2882 | "already listening")do { do { if (0) printf ("already listening"); } while (0); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__, 2882, GF_LOG_DEBUG , "already listening"); } while (0); |
2883 | return ret; |
2884 | } |
2885 | |
2886 | ret = socket_server_get_local_sockaddr (this, SA (&sockaddr)((struct sockaddr *)&sockaddr), |
2887 | &sockaddr_len, &sa_family); |
2888 | if (ret == -1) { |
2889 | return ret; |
2890 | } |
2891 | |
2892 | pthread_mutex_lock (&priv->lock); |
2893 | { |
2894 | if (priv->sock != -1) { |
2895 | gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("already listening"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2896, GF_LOG_DEBUG , "already listening"); } while (0) |
2896 | "already listening")do { do { if (0) printf ("already listening"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 2896, GF_LOG_DEBUG , "already listening"); } while (0); |
2897 | goto unlock; |
2898 | } |
2899 | |
2900 | memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len); |
2901 | myinfo->sockaddr_len = sockaddr_len; |
2902 | |
2903 | priv->sock = socket (sa_family, SOCK_STREAMSOCK_STREAM, 0); |
2904 | |
2905 | if (priv->sock == -1) { |
2906 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("socket creation failed (%s)", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2908, GF_LOG_ERROR, "socket creation failed (%s)" , strerror ((*__errno_location ()))); } while (0) |
2907 | "socket creation failed (%s)",do { do { if (0) printf ("socket creation failed (%s)", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2908, GF_LOG_ERROR, "socket creation failed (%s)" , strerror ((*__errno_location ()))); } while (0) |
2908 | strerror (errno))do { do { if (0) printf ("socket creation failed (%s)", strerror ((*__errno_location ()))); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 2908, GF_LOG_ERROR, "socket creation failed (%s)" , strerror ((*__errno_location ()))); } while (0); |
2909 | goto unlock; |
2910 | } |
2911 | |
2912 | /* Cant help if setting socket options fails. We can continue |
2913 | * working nonetheless. |
2914 | */ |
2915 | if (priv->windowsize != 0) { |
2916 | if (setsockopt (priv->sock, SOL_SOCKET1, SO_RCVBUF8, |
2917 | &priv->windowsize, |
2918 | sizeof (priv->windowsize)) < 0) { |
2919 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2920 | "setting receive window size "do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2921 | "failed: %d: %d: %s", priv->sock,do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2922 | priv->windowsize,do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2923 | strerror (errno))do { do { if (0) printf ("setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2923, GF_LOG_ERROR, "setting receive window size " "failed: %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); |
2924 | } |
2925 | |
2926 | if (setsockopt (priv->sock, SOL_SOCKET1, SO_SNDBUF7, |
2927 | &priv->windowsize, |
2928 | sizeof (priv->windowsize)) < 0) { |
2929 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2930 | "setting send window size failed:"do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2931 | " %d: %d: %s", priv->sock,do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2932 | priv->windowsize,do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0) |
2933 | strerror (errno))do { do { if (0) printf ("setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 2933, GF_LOG_ERROR, "setting send window size failed:" " %d: %d: %s" , priv->sock, priv->windowsize, strerror ((*__errno_location ()))); } while (0); |
2934 | } |
2935 | } |
2936 | |
2937 | if (priv->nodelay) { |
2938 | ret = __socket_nodelay (priv->sock); |
2939 | if (ret == -1) { |
2940 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("setsockopt() failed for NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2942, GF_LOG_ERROR, "setsockopt() failed for NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0) |
2941 | "setsockopt() failed for NODELAY (%s)",do { do { if (0) printf ("setsockopt() failed for NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2942, GF_LOG_ERROR, "setsockopt() failed for NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0) |
2942 | strerror (errno))do { do { if (0) printf ("setsockopt() failed for NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2942, GF_LOG_ERROR, "setsockopt() failed for NODELAY (%s)" , strerror ((*__errno_location ()))); } while (0); |
2943 | } |
2944 | } |
2945 | |
2946 | if (!priv->bio) { |
2947 | ret = __socket_nonblock (priv->sock); |
2948 | |
2949 | if (ret == -1) { |
2950 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2952, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2951 | "NBIO on %d failed (%s)",do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2952, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0) |
2952 | priv->sock, strerror (errno))do { do { if (0) printf ("NBIO on %d failed (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 2952, GF_LOG_ERROR, "NBIO on %d failed (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0); |
2953 | close (priv->sock); |
2954 | priv->sock = -1; |
2955 | goto unlock; |
2956 | } |
2957 | } |
2958 | |
2959 | ret = __socket_server_bind (this); |
2960 | |
2961 | if (ret == -1) { |
2962 | /* logged inside __socket_server_bind() */ |
2963 | close (priv->sock); |
2964 | priv->sock = -1; |
2965 | goto unlock; |
2966 | } |
2967 | |
2968 | if (priv->backlog) |
2969 | ret = listen (priv->sock, priv->backlog); |
2970 | else |
2971 | ret = listen (priv->sock, 10); |
2972 | |
2973 | if (ret == -1) { |
2974 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("could not set socket %d to listen mode (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 2976, GF_LOG_ERROR , "could not set socket %d to listen mode (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0) |
2975 | "could not set socket %d to listen mode (%s)",do { do { if (0) printf ("could not set socket %d to listen mode (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 2976, GF_LOG_ERROR , "could not set socket %d to listen mode (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0) |
2976 | priv->sock, strerror (errno))do { do { if (0) printf ("could not set socket %d to listen mode (%s)" , priv->sock, strerror ((*__errno_location ()))); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 2976, GF_LOG_ERROR , "could not set socket %d to listen mode (%s)", priv->sock , strerror ((*__errno_location ()))); } while (0); |
2977 | close (priv->sock); |
2978 | priv->sock = -1; |
2979 | goto unlock; |
2980 | } |
2981 | |
2982 | rpc_transport_ref (this); |
2983 | |
2984 | priv->idx = event_register (ctx->event_pool, priv->sock, |
2985 | socket_server_event_handler, |
2986 | this, 1, 0); |
2987 | |
2988 | if (priv->idx == -1) { |
2989 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("could not register socket %d with events" , priv->sock); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 2991, GF_LOG_WARNING, "could not register socket %d with events" , priv->sock); } while (0) |
2990 | "could not register socket %d with events",do { do { if (0) printf ("could not register socket %d with events" , priv->sock); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 2991, GF_LOG_WARNING, "could not register socket %d with events" , priv->sock); } while (0) |
2991 | priv->sock)do { do { if (0) printf ("could not register socket %d with events" , priv->sock); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 2991, GF_LOG_WARNING, "could not register socket %d with events" , priv->sock); } while (0); |
2992 | ret = -1; |
2993 | close (priv->sock); |
2994 | priv->sock = -1; |
2995 | goto unlock; |
2996 | } |
2997 | } |
2998 | unlock: |
2999 | pthread_mutex_unlock (&priv->lock); |
3000 | |
3001 | out: |
3002 | return ret; |
3003 | } |
3004 | |
3005 | |
3006 | int32_t |
3007 | socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) |
3008 | { |
3009 | socket_private_t *priv = NULL((void*)0); |
3010 | int ret = -1; |
3011 | char need_poll_out = 0; |
3012 | char need_append = 1; |
3013 | struct ioq *entry = NULL((void*)0); |
3014 | glusterfs_ctx_t *ctx = NULL((void*)0); |
3015 | char a_byte = 'j'; |
3016 | |
3017 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3017, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
3018 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 3018, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
3019 | |
3020 | priv = this->private; |
3021 | ctx = this->ctx; |
3022 | |
3023 | pthread_mutex_lock (&priv->lock); |
3024 | { |
3025 | if (priv->connected != 1) { |
3026 | if (!priv->submit_log && !priv->connect_finish_log) { |
3027 | gf_log (this->name, GF_LOG_INFO,do { do { if (0) printf ("not connected (priv->connected = %d)" , priv->connected); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 3029, GF_LOG_INFO, "not connected (priv->connected = %d)" , priv->connected); } while (0) |
3028 | "not connected (priv->connected = %d)",do { do { if (0) printf ("not connected (priv->connected = %d)" , priv->connected); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 3029, GF_LOG_INFO, "not connected (priv->connected = %d)" , priv->connected); } while (0) |
3029 | priv->connected)do { do { if (0) printf ("not connected (priv->connected = %d)" , priv->connected); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 3029, GF_LOG_INFO, "not connected (priv->connected = %d)" , priv->connected); } while (0); |
3030 | priv->submit_log = 1; |
3031 | } |
3032 | goto unlock; |
3033 | } |
3034 | |
3035 | priv->submit_log = 0; |
3036 | entry = __socket_ioq_new (this, &req->msg); |
3037 | if (!entry) |
3038 | goto unlock; |
3039 | |
3040 | if (list_empty (&priv->ioq)) { |
3041 | ret = __socket_ioq_churn_entry (this, entry, 1); |
3042 | |
3043 | if (ret == 0) { |
3044 | need_append = 0; |
3045 | } |
3046 | if (ret > 0) { |
3047 | need_poll_out = 1; |
3048 | } |
3049 | } |
3050 | |
3051 | if (need_append) { |
3052 | list_add_tail (&entry->list, &priv->ioq); |
3053 | if (priv->own_thread) { |
3054 | /* |
3055 | * Make sure the polling thread wakes up, by |
3056 | * writing a byte to represent this entry. |
3057 | */ |
3058 | if (write(priv->pipe[1],&a_byte,1) < 1) { |
3059 | gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("write error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3060, GF_LOG_WARNING , "write error on pipe"); } while (0) |
3060 | "write error on pipe")do { do { if (0) printf ("write error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3060, GF_LOG_WARNING , "write error on pipe"); } while (0); |
3061 | } |
3062 | } |
3063 | ret = 0; |
3064 | } |
3065 | if (!priv->own_thread && need_poll_out) { |
3066 | /* first entry to wait. continue writing on POLLOUT */ |
3067 | priv->idx = event_select_on (ctx->event_pool, |
3068 | priv->sock, |
3069 | priv->idx, -1, 1); |
3070 | } |
3071 | } |
3072 | unlock: |
3073 | pthread_mutex_unlock (&priv->lock); |
3074 | |
3075 | out: |
3076 | return ret; |
3077 | } |
3078 | |
3079 | |
3080 | int32_t |
3081 | socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) |
3082 | { |
3083 | socket_private_t *priv = NULL((void*)0); |
3084 | int ret = -1; |
3085 | char need_poll_out = 0; |
3086 | char need_append = 1; |
3087 | struct ioq *entry = NULL((void*)0); |
3088 | glusterfs_ctx_t *ctx = NULL((void*)0); |
3089 | char a_byte = 'd'; |
3090 | |
3091 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3091, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
3092 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 3092, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
3093 | |
3094 | priv = this->private; |
3095 | ctx = this->ctx; |
3096 | |
3097 | pthread_mutex_lock (&priv->lock); |
3098 | { |
3099 | if (priv->connected != 1) { |
3100 | if (!priv->submit_log && !priv->connect_finish_log) { |
3101 | gf_log (this->name, GF_LOG_INFO,do { do { if (0) printf ("not connected (priv->connected = %d)" , priv->connected); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 3103, GF_LOG_INFO, "not connected (priv->connected = %d)" , priv->connected); } while (0) |
3102 | "not connected (priv->connected = %d)",do { do { if (0) printf ("not connected (priv->connected = %d)" , priv->connected); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 3103, GF_LOG_INFO, "not connected (priv->connected = %d)" , priv->connected); } while (0) |
3103 | priv->connected)do { do { if (0) printf ("not connected (priv->connected = %d)" , priv->connected); } while (0); _gf_log (this->name, "socket.c" , __FUNCTION__, 3103, GF_LOG_INFO, "not connected (priv->connected = %d)" , priv->connected); } while (0); |
3104 | priv->submit_log = 1; |
3105 | } |
3106 | goto unlock; |
3107 | } |
3108 | |
3109 | priv->submit_log = 0; |
3110 | entry = __socket_ioq_new (this, &reply->msg); |
3111 | if (!entry) |
3112 | goto unlock; |
3113 | |
3114 | if (list_empty (&priv->ioq)) { |
3115 | ret = __socket_ioq_churn_entry (this, entry, 1); |
3116 | |
3117 | if (ret == 0) { |
3118 | need_append = 0; |
3119 | } |
3120 | if (ret > 0) { |
3121 | need_poll_out = 1; |
3122 | } |
3123 | } |
3124 | |
3125 | if (need_append) { |
3126 | list_add_tail (&entry->list, &priv->ioq); |
3127 | if (priv->own_thread) { |
3128 | /* |
3129 | * Make sure the polling thread wakes up, by |
3130 | * writing a byte to represent this entry. |
3131 | */ |
3132 | if (write(priv->pipe[1],&a_byte,1) < 1) { |
3133 | gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("write error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3134, GF_LOG_WARNING , "write error on pipe"); } while (0) |
3134 | "write error on pipe")do { do { if (0) printf ("write error on pipe"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3134, GF_LOG_WARNING , "write error on pipe"); } while (0); |
3135 | } |
3136 | } |
3137 | ret = 0; |
3138 | } |
3139 | if (!priv->own_thread && need_poll_out) { |
3140 | /* first entry to wait. continue writing on POLLOUT */ |
3141 | priv->idx = event_select_on (ctx->event_pool, |
3142 | priv->sock, |
3143 | priv->idx, -1, 1); |
3144 | } |
3145 | } |
3146 | unlock: |
3147 | pthread_mutex_unlock (&priv->lock); |
3148 | |
3149 | out: |
3150 | return ret; |
3151 | } |
3152 | |
3153 | |
3154 | int32_t |
3155 | socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen) |
3156 | { |
3157 | int32_t ret = -1; |
3158 | |
3159 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3159, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
3160 | GF_VALIDATE_OR_GOTO ("socket", hostname, out)do { if (!hostname) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "hostname"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3160, GF_LOG_ERROR, "invalid argument: " "hostname"); } while (0); goto out; } } while (0); |
3161 | |
3162 | if (hostlen < (strlen (this->peerinfo.identifier) + 1)) { |
3163 | goto out; |
3164 | } |
3165 | |
3166 | strcpy (hostname, this->peerinfo.identifier); |
3167 | ret = 0; |
3168 | out: |
3169 | return ret; |
3170 | } |
3171 | |
3172 | |
3173 | int32_t |
3174 | socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, |
3175 | struct sockaddr_storage *sa, socklen_t salen) |
3176 | { |
3177 | int32_t ret = -1; |
3178 | |
3179 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3179, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
3180 | GF_VALIDATE_OR_GOTO ("socket", sa, out)do { if (!sa) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "sa"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3180, GF_LOG_ERROR, "invalid argument: " "sa"); } while (0); goto out; } } while (0); |
3181 | |
3182 | *sa = this->peerinfo.sockaddr; |
3183 | |
3184 | if (peeraddr != NULL((void*)0)) { |
3185 | ret = socket_getpeername (this, peeraddr, addrlen); |
3186 | } |
3187 | ret = 0; |
3188 | |
3189 | out: |
3190 | return ret; |
3191 | } |
3192 | |
3193 | |
3194 | int32_t |
3195 | socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen) |
3196 | { |
3197 | int32_t ret = -1; |
3198 | |
3199 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3199, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
3200 | GF_VALIDATE_OR_GOTO ("socket", hostname, out)do { if (!hostname) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "hostname"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3200, GF_LOG_ERROR, "invalid argument: " "hostname"); } while (0); goto out; } } while (0); |
3201 | |
3202 | if (hostlen < (strlen (this->myinfo.identifier) + 1)) { |
3203 | goto out; |
3204 | } |
3205 | |
3206 | strcpy (hostname, this->myinfo.identifier); |
3207 | ret = 0; |
3208 | out: |
3209 | return ret; |
3210 | } |
3211 | |
3212 | |
3213 | int32_t |
3214 | socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, |
3215 | struct sockaddr_storage *sa, socklen_t salen) |
3216 | { |
3217 | int32_t ret = 0; |
3218 | |
3219 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3219, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
3220 | GF_VALIDATE_OR_GOTO ("socket", sa, out)do { if (!sa) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "sa"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3220, GF_LOG_ERROR, "invalid argument: " "sa"); } while (0); goto out; } } while (0); |
3221 | |
3222 | *sa = this->myinfo.sockaddr; |
3223 | |
3224 | if (myaddr != NULL((void*)0)) { |
3225 | ret = socket_getmyname (this, myaddr, addrlen); |
3226 | } |
3227 | |
3228 | out: |
3229 | return ret; |
3230 | } |
3231 | |
3232 | |
3233 | struct rpc_transport_ops tops = { |
3234 | .listen = socket_listen, |
3235 | .connect = socket_connect, |
3236 | .disconnect = socket_disconnect, |
3237 | .submit_request = socket_submit_request, |
3238 | .submit_reply = socket_submit_reply, |
3239 | .get_peername = socket_getpeername, |
3240 | .get_peeraddr = socket_getpeeraddr, |
3241 | .get_myname = socket_getmyname, |
3242 | .get_myaddr = socket_getmyaddr, |
3243 | }; |
3244 | |
3245 | int |
3246 | reconfigure (rpc_transport_t *this, dict_t *options) |
3247 | { |
3248 | socket_private_t *priv = NULL((void*)0); |
3249 | gf_boolean_t tmp_bool = _gf_false; |
3250 | char *optstr = NULL((void*)0); |
3251 | int ret = 0; |
3252 | uint64_t windowsize = 0; |
3253 | |
3254 | GF_VALIDATE_OR_GOTO ("socket", this, out)do { if (!this) { (*__errno_location ()) = 22; do { do { if ( 0) printf ("invalid argument: " "this"); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__, 3254, GF_LOG_ERROR, "invalid argument: " "this"); } while (0); goto out; } } while (0); |
3255 | GF_VALIDATE_OR_GOTO ("socket", this->private, out)do { if (!this->private) { (*__errno_location ()) = 22; do { do { if (0) printf ("invalid argument: " "this->private" ); } while (0); _gf_log_callingfn ("socket", "socket.c", __FUNCTION__ , 3255, GF_LOG_ERROR, "invalid argument: " "this->private" ); } while (0); goto out; } } while (0); |
3256 | |
3257 | if (!this || !this->private) { |
3258 | ret =-1; |
3259 | goto out; |
3260 | } |
3261 | |
3262 | priv = this->private; |
3263 | |
3264 | if (dict_get_str (this->options, "transport.socket.keepalive", |
3265 | &optstr) == 0) { |
3266 | if (gf_string2boolean (optstr, &tmp_bool) == -1) { |
3267 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'transport.socket.keepalive' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3269, GF_LOG_ERROR , "'transport.socket.keepalive' takes only " "boolean options, not taking any action" ); } while (0) |
3268 | "'transport.socket.keepalive' takes only "do { do { if (0) printf ("'transport.socket.keepalive' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3269, GF_LOG_ERROR , "'transport.socket.keepalive' takes only " "boolean options, not taking any action" ); } while (0) |
3269 | "boolean options, not taking any action")do { do { if (0) printf ("'transport.socket.keepalive' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3269, GF_LOG_ERROR , "'transport.socket.keepalive' takes only " "boolean options, not taking any action" ); } while (0); |
3270 | priv->keepalive = 1; |
3271 | ret = -1; |
3272 | goto out; |
3273 | } |
3274 | gf_log (this->name, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive")do { do { if (0) printf ("Reconfigured transport.socket.keepalive" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3274, GF_LOG_DEBUG, "Reconfigured transport.socket.keepalive" ); } while (0); |
3275 | |
3276 | priv->keepalive = tmp_bool; |
3277 | } |
3278 | else |
3279 | priv->keepalive = 1; |
3280 | |
3281 | optstr = NULL((void*)0); |
3282 | if (dict_get_str (this->options, "tcp-window-size", |
3283 | &optstr) == 0) { |
3284 | if (gf_string2bytesize (optstr, &windowsize) != 0) { |
3285 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid number format: %s", optstr) ; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3286, GF_LOG_ERROR, "invalid number format: %s", optstr); } while (0) |
3286 | "invalid number format: %s", optstr)do { do { if (0) printf ("invalid number format: %s", optstr) ; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3286, GF_LOG_ERROR, "invalid number format: %s", optstr); } while (0); |
3287 | goto out; |
3288 | } |
3289 | } |
3290 | |
3291 | priv->windowsize = (int)windowsize; |
3292 | |
3293 | ret = 0; |
3294 | out: |
3295 | return ret; |
3296 | |
3297 | } |
3298 | |
3299 | int |
3300 | socket_init (rpc_transport_t *this) |
3301 | { |
3302 | socket_private_t *priv = NULL((void*)0); |
3303 | gf_boolean_t tmp_bool = 0; |
3304 | uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE(0); |
3305 | char *optstr = NULL((void*)0); |
3306 | uint32_t keepalive = 0; |
3307 | uint32_t backlog = 0; |
3308 | int session_id = 0; |
3309 | |
3310 | if (this->private) { |
3311 | gf_log_callingfn (this->name, GF_LOG_ERROR,do { do { if (0) printf ("double init attempted"); } while (0 ); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__ , 3312, GF_LOG_ERROR, "double init attempted"); } while (0) |
3312 | "double init attempted")do { do { if (0) printf ("double init attempted"); } while (0 ); _gf_log_callingfn (this->name, "socket.c", __FUNCTION__ , 3312, GF_LOG_ERROR, "double init attempted"); } while (0); |
3313 | return -1; |
3314 | } |
3315 | |
3316 | priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t)__gf_calloc (1, sizeof (*priv), gf_common_mt_socket_private_t ); |
3317 | if (!priv) { |
3318 | return -1; |
3319 | } |
3320 | memset(priv,0,sizeof(*priv)); |
3321 | |
3322 | pthread_mutex_init (&priv->lock, NULL((void*)0)); |
3323 | |
3324 | priv->sock = -1; |
3325 | priv->idx = -1; |
3326 | priv->connected = -1; |
3327 | priv->nodelay = 1; |
3328 | priv->bio = 0; |
3329 | priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE(0); |
3330 | INIT_LIST_HEAD (&priv->ioq)do { (&priv->ioq)->next = (&priv->ioq)->prev = &priv->ioq; } while (0); |
3331 | |
3332 | /* All the below section needs 'this->options' to be present */ |
3333 | if (!this->options) |
3334 | goto out; |
3335 | |
3336 | if (dict_get (this->options, "non-blocking-io")) { |
3337 | optstr = data_to_str (dict_get (this->options, |
3338 | "non-blocking-io")); |
3339 | |
3340 | if (gf_string2boolean (optstr, &tmp_bool) == -1) { |
3341 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'non-blocking-io' takes only boolean options," " not taking any action"); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 3343, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options," " not taking any action"); } while (0) |
3342 | "'non-blocking-io' takes only boolean options,"do { do { if (0) printf ("'non-blocking-io' takes only boolean options," " not taking any action"); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 3343, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options," " not taking any action"); } while (0) |
3343 | " not taking any action")do { do { if (0) printf ("'non-blocking-io' takes only boolean options," " not taking any action"); } while (0); _gf_log (this->name , "socket.c", __FUNCTION__, 3343, GF_LOG_ERROR, "'non-blocking-io' takes only boolean options," " not taking any action"); } while (0); |
3344 | tmp_bool = 1; |
3345 | } |
3346 | |
3347 | if (!tmp_bool) { |
3348 | priv->bio = 1; |
3349 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("disabling non-blocking IO"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3350, GF_LOG_WARNING, "disabling non-blocking IO"); } while (0) |
3350 | "disabling non-blocking IO")do { do { if (0) printf ("disabling non-blocking IO"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3350, GF_LOG_WARNING, "disabling non-blocking IO"); } while (0); |
3351 | } |
3352 | } |
3353 | |
3354 | optstr = NULL((void*)0); |
3355 | |
3356 | // By default, we enable NODELAY |
3357 | if (dict_get (this->options, "transport.socket.nodelay")) { |
3358 | optstr = data_to_str (dict_get (this->options, |
3359 | "transport.socket.nodelay")); |
3360 | |
3361 | if (gf_string2boolean (optstr, &tmp_bool) == -1) { |
3362 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'transport.socket.nodelay' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3364, GF_LOG_ERROR , "'transport.socket.nodelay' takes only " "boolean options, not taking any action" ); } while (0) |
3363 | "'transport.socket.nodelay' takes only "do { do { if (0) printf ("'transport.socket.nodelay' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3364, GF_LOG_ERROR , "'transport.socket.nodelay' takes only " "boolean options, not taking any action" ); } while (0) |
3364 | "boolean options, not taking any action")do { do { if (0) printf ("'transport.socket.nodelay' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3364, GF_LOG_ERROR , "'transport.socket.nodelay' takes only " "boolean options, not taking any action" ); } while (0); |
3365 | tmp_bool = 1; |
3366 | } |
3367 | if (!tmp_bool) { |
3368 | priv->nodelay = 0; |
3369 | gf_log (this->name, GF_LOG_DEBUG,do { do { if (0) printf ("disabling nodelay"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3370, GF_LOG_DEBUG , "disabling nodelay"); } while (0) |
3370 | "disabling nodelay")do { do { if (0) printf ("disabling nodelay"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3370, GF_LOG_DEBUG , "disabling nodelay"); } while (0); |
3371 | } |
3372 | } |
3373 | |
3374 | optstr = NULL((void*)0); |
3375 | if (dict_get_str (this->options, "tcp-window-size", |
3376 | &optstr) == 0) { |
3377 | if (gf_string2bytesize (optstr, &windowsize) != 0) { |
3378 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid number format: %s", optstr) ; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3379, GF_LOG_ERROR, "invalid number format: %s", optstr); } while (0) |
3379 | "invalid number format: %s", optstr)do { do { if (0) printf ("invalid number format: %s", optstr) ; } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3379, GF_LOG_ERROR, "invalid number format: %s", optstr); } while (0); |
3380 | return -1; |
3381 | } |
3382 | } |
3383 | |
3384 | priv->windowsize = (int)windowsize; |
3385 | |
3386 | optstr = NULL((void*)0); |
3387 | /* Enable Keep-alive by default. */ |
3388 | priv->keepalive = 1; |
3389 | priv->keepaliveintvl = 2; |
3390 | priv->keepaliveidle = 20; |
3391 | if (dict_get_str (this->options, "transport.socket.keepalive", |
3392 | &optstr) == 0) { |
3393 | if (gf_string2boolean (optstr, &tmp_bool) == -1) { |
3394 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("'transport.socket.keepalive' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3396, GF_LOG_ERROR , "'transport.socket.keepalive' takes only " "boolean options, not taking any action" ); } while (0) |
3395 | "'transport.socket.keepalive' takes only "do { do { if (0) printf ("'transport.socket.keepalive' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3396, GF_LOG_ERROR , "'transport.socket.keepalive' takes only " "boolean options, not taking any action" ); } while (0) |
3396 | "boolean options, not taking any action")do { do { if (0) printf ("'transport.socket.keepalive' takes only " "boolean options, not taking any action"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3396, GF_LOG_ERROR , "'transport.socket.keepalive' takes only " "boolean options, not taking any action" ); } while (0); |
3397 | tmp_bool = 1; |
3398 | } |
3399 | |
3400 | if (!tmp_bool) |
3401 | priv->keepalive = 0; |
3402 | } |
3403 | |
3404 | if (dict_get_uint32 (this->options, |
3405 | "transport.socket.keepalive-interval", |
3406 | &keepalive) == 0) { |
3407 | priv->keepaliveintvl = keepalive; |
3408 | } |
3409 | |
3410 | if (dict_get_uint32 (this->options, |
3411 | "transport.socket.keepalive-time", |
3412 | &keepalive) == 0) { |
3413 | priv->keepaliveidle = keepalive; |
3414 | } |
3415 | |
3416 | if (dict_get_uint32 (this->options, |
3417 | "transport.socket.listen-backlog", |
3418 | &backlog) == 0) { |
3419 | priv->backlog = backlog; |
3420 | } |
3421 | |
3422 | optstr = NULL((void*)0); |
3423 | |
3424 | /* Check if socket read failures are to be logged */ |
3425 | priv->read_fail_log = 1; |
3426 | if (dict_get (this->options, "transport.socket.read-fail-log")) { |
3427 | optstr = data_to_str (dict_get (this->options, "transport.socket.read-fail-log")); |
3428 | if (gf_string2boolean (optstr, &tmp_bool) == -1) { |
3429 | gf_log (this->name, GF_LOG_WARNING,do { do { if (0) printf ("'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3431, GF_LOG_WARNING , "'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails" ); } while (0) |
3430 | "'transport.socket.read-fail-log' takes only "do { do { if (0) printf ("'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3431, GF_LOG_WARNING , "'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails" ); } while (0) |
3431 | "boolean options; logging socket read fails")do { do { if (0) printf ("'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3431, GF_LOG_WARNING , "'transport.socket.read-fail-log' takes only " "boolean options; logging socket read fails" ); } while (0); |
3432 | } |
3433 | else if (tmp_bool == _gf_false) { |
3434 | priv->read_fail_log = 0; |
3435 | } |
3436 | } |
3437 | |
3438 | priv->windowsize = (int)windowsize; |
3439 | |
3440 | priv->ssl_enabled = _gf_false; |
3441 | if (dict_get_str(this->options,SSL_ENABLED_OPT"transport.socket.ssl-enabled",&optstr) == 0) { |
3442 | if (gf_string2boolean (optstr, &priv->ssl_enabled) != 0) { |
3443 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid value given for ssl-enabled boolean" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3444, GF_LOG_ERROR, "invalid value given for ssl-enabled boolean" ); } while (0) |
3444 | "invalid value given for ssl-enabled boolean")do { do { if (0) printf ("invalid value given for ssl-enabled boolean" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3444, GF_LOG_ERROR, "invalid value given for ssl-enabled boolean" ); } while (0); |
3445 | } |
3446 | } |
3447 | |
3448 | priv->ssl_own_cert = DEFAULT_CERT_PATH"/etc/ssl/glusterfs.pem"; |
3449 | if (dict_get_str(this->options,SSL_OWN_CERT_OPT"transport.socket.ssl-own-cert",&optstr) == 0) { |
3450 | if (!priv->ssl_enabled) { |
3451 | gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-own-cert", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3453, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-own-cert" , "transport.socket.ssl-enabled"); } while (0) |
3452 | "%s specified without %s (ignored)",do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-own-cert", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3453, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-own-cert" , "transport.socket.ssl-enabled"); } while (0) |
3453 | SSL_OWN_CERT_OPT, SSL_ENABLED_OPT)do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-own-cert", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3453, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-own-cert" , "transport.socket.ssl-enabled"); } while (0); |
3454 | } |
3455 | priv->ssl_own_cert = optstr; |
3456 | } |
3457 | priv->ssl_own_cert = gf_strdup(priv->ssl_own_cert); |
3458 | |
3459 | priv->ssl_private_key = DEFAULT_KEY_PATH"/etc/ssl/glusterfs.key"; |
3460 | if (dict_get_str(this->options,SSL_PRIVATE_KEY_OPT"transport.socket.ssl-private-key",&optstr) == 0) { |
3461 | if (!priv->ssl_enabled) { |
3462 | gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-private-key", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3464, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-private-key" , "transport.socket.ssl-enabled"); } while (0) |
3463 | "%s specified without %s (ignored)",do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-private-key", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3464, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-private-key" , "transport.socket.ssl-enabled"); } while (0) |
3464 | SSL_PRIVATE_KEY_OPT, SSL_ENABLED_OPT)do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-private-key", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3464, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-private-key" , "transport.socket.ssl-enabled"); } while (0); |
3465 | } |
3466 | priv->ssl_private_key = optstr; |
3467 | } |
3468 | priv->ssl_private_key = gf_strdup(priv->ssl_private_key); |
3469 | |
3470 | priv->ssl_ca_list = DEFAULT_CA_PATH"/etc/ssl/glusterfs.ca"; |
3471 | if (dict_get_str(this->options,SSL_CA_LIST_OPT"transport.socket.ssl-ca-list",&optstr) == 0) { |
3472 | if (!priv->ssl_enabled) { |
3473 | gf_log(this->name,GF_LOG_WARNING,do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-ca-list", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3475, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-ca-list" , "transport.socket.ssl-enabled"); } while (0) |
3474 | "%s specified without %s (ignored)",do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-ca-list", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3475, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-ca-list" , "transport.socket.ssl-enabled"); } while (0) |
3475 | SSL_CA_LIST_OPT, SSL_ENABLED_OPT)do { do { if (0) printf ("%s specified without %s (ignored)", "transport.socket.ssl-ca-list", "transport.socket.ssl-enabled" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3475, GF_LOG_WARNING, "%s specified without %s (ignored)", "transport.socket.ssl-ca-list" , "transport.socket.ssl-enabled"); } while (0); |
3476 | } |
3477 | priv->ssl_ca_list = optstr; |
3478 | } |
3479 | priv->ssl_ca_list = gf_strdup(priv->ssl_ca_list); |
3480 | |
3481 | gf_log(this->name,GF_LOG_INFO,"SSL support is %s",do { do { if (0) printf ("SSL support is %s", priv->ssl_enabled ? "ENABLED" : "NOT enabled"); } while (0); _gf_log (this-> name, "socket.c", __FUNCTION__, 3482, GF_LOG_INFO,"SSL support is %s" , priv->ssl_enabled ? "ENABLED" : "NOT enabled"); } while ( 0) |
3482 | priv->ssl_enabled ? "ENABLED" : "NOT enabled")do { do { if (0) printf ("SSL support is %s", priv->ssl_enabled ? "ENABLED" : "NOT enabled"); } while (0); _gf_log (this-> name, "socket.c", __FUNCTION__, 3482, GF_LOG_INFO,"SSL support is %s" , priv->ssl_enabled ? "ENABLED" : "NOT enabled"); } while ( 0); |
3483 | /* |
3484 | * This might get overridden temporarily in socket_connect (q.v.) |
3485 | * if we're using the glusterd portmapper. |
3486 | */ |
3487 | priv->use_ssl = priv->ssl_enabled; |
3488 | |
3489 | priv->own_thread = priv->use_ssl; |
3490 | if (dict_get_str(this->options,OWN_THREAD_OPT"transport.socket.own-thread",&optstr) == 0) { |
3491 | if (gf_string2boolean (optstr, &priv->own_thread) != 0) { |
3492 | gf_log (this->name, GF_LOG_ERROR,do { do { if (0) printf ("invalid value given for own-thread boolean" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3493, GF_LOG_ERROR, "invalid value given for own-thread boolean" ); } while (0) |
3493 | "invalid value given for own-thread boolean")do { do { if (0) printf ("invalid value given for own-thread boolean" ); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3493, GF_LOG_ERROR, "invalid value given for own-thread boolean" ); } while (0); |
3494 | } |
3495 | } |
3496 | gf_log(this->name,GF_LOG_INFO,"using %s polling thread",do { do { if (0) printf ("using %s polling thread", priv-> own_thread ? "private" : "system"); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 3497, GF_LOG_INFO,"using %s polling thread" , priv->own_thread ? "private" : "system"); } while (0) |
3497 | priv->own_thread ? "private" : "system")do { do { if (0) printf ("using %s polling thread", priv-> own_thread ? "private" : "system"); } while (0); _gf_log (this ->name, "socket.c", __FUNCTION__, 3497, GF_LOG_INFO,"using %s polling thread" , priv->own_thread ? "private" : "system"); } while (0); |
3498 | |
3499 | if (priv->use_ssl) { |
3500 | SSL_library_init(); |
3501 | SSL_load_error_strings(); |
3502 | priv->ssl_meth = (SSL_METHOD *)TLSv1_method(); |
3503 | priv->ssl_ctx = SSL_CTX_new(priv->ssl_meth); |
3504 | |
3505 | if (SSL_CTX_set_cipher_list(priv->ssl_ctx, |
3506 | "HIGH:-SSLv2") == 0) { |
3507 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("failed to find any valid ciphers"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3508, GF_LOG_ERROR, "failed to find any valid ciphers"); } while (0) |
3508 | "failed to find any valid ciphers")do { do { if (0) printf ("failed to find any valid ciphers"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__ , 3508, GF_LOG_ERROR, "failed to find any valid ciphers"); } while (0); |
3509 | goto err; |
3510 | } |
3511 | |
3512 | if (!SSL_CTX_use_certificate_chain_file(priv->ssl_ctx, |
3513 | priv->ssl_own_cert)) { |
3514 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not load our cert"); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 3515, GF_LOG_ERROR , "could not load our cert"); } while (0) |
3515 | "could not load our cert")do { do { if (0) printf ("could not load our cert"); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 3515, GF_LOG_ERROR , "could not load our cert"); } while (0); |
3516 | goto err; |
3517 | } |
3518 | |
3519 | if (!SSL_CTX_use_PrivateKey_file(priv->ssl_ctx, |
3520 | priv->ssl_private_key, |
3521 | SSL_FILETYPE_PEM1)) { |
3522 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not load private key"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3523, GF_LOG_ERROR, "could not load private key"); } while (0) |
3523 | "could not load private key")do { do { if (0) printf ("could not load private key"); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3523, GF_LOG_ERROR, "could not load private key"); } while (0); |
3524 | goto err; |
3525 | } |
3526 | |
3527 | if (!SSL_CTX_load_verify_locations(priv->ssl_ctx, |
3528 | priv->ssl_ca_list,0)) { |
3529 | gf_log(this->name,GF_LOG_ERROR,do { do { if (0) printf ("could not load CA list"); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 3530, GF_LOG_ERROR , "could not load CA list"); } while (0) |
3530 | "could not load CA list")do { do { if (0) printf ("could not load CA list"); } while ( 0); _gf_log (this->name, "socket.c", __FUNCTION__, 3530, GF_LOG_ERROR , "could not load CA list"); } while (0); |
3531 | goto err; |
3532 | } |
3533 | |
3534 | #if (OPENSSL_VERSION_NUMBER0x10000003 < 0x00905100L) |
3535 | SSL_CTX_set_verify_depth(ctx,1); |
3536 | #endif |
3537 | |
3538 | priv->ssl_session_id = ++session_id; |
3539 | SSL_CTX_set_session_id_context(priv->ssl_ctx, |
3540 | (void *)&priv->ssl_session_id, |
3541 | sizeof(priv->ssl_session_id)); |
3542 | |
3543 | SSL_CTX_set_verify(priv->ssl_ctx,SSL_VERIFY_PEER0x01,0); |
3544 | } |
3545 | |
3546 | if (priv->own_thread) { |
3547 | priv->ot_state = OT_IDLE; |
3548 | pthread_cond_init (&priv->ot_event, NULL((void*)0)); |
3549 | } |
3550 | |
3551 | out: |
3552 | this->private = priv; |
3553 | return 0; |
3554 | |
3555 | err: |
3556 | if (priv->ssl_own_cert) { |
3557 | GF_FREE(priv->ssl_own_cert)__gf_free (priv->ssl_own_cert); |
3558 | } |
3559 | if (priv->ssl_private_key) { |
3560 | GF_FREE(priv->ssl_private_key)__gf_free (priv->ssl_private_key); |
3561 | } |
3562 | if (priv->ssl_ca_list) { |
3563 | GF_FREE(priv->ssl_ca_list)__gf_free (priv->ssl_ca_list); |
3564 | } |
3565 | GF_FREE(priv)__gf_free (priv); |
3566 | return -1; |
3567 | } |
3568 | |
3569 | |
3570 | void |
3571 | fini (rpc_transport_t *this) |
3572 | { |
3573 | socket_private_t *priv = NULL((void*)0); |
3574 | |
3575 | if (!this) |
3576 | return; |
3577 | |
3578 | priv = this->private; |
3579 | if (priv) { |
3580 | if (priv->sock != -1) { |
3581 | pthread_mutex_lock (&priv->lock); |
3582 | { |
3583 | __socket_ioq_flush (this); |
3584 | __socket_reset (this); |
3585 | } |
3586 | pthread_mutex_unlock (&priv->lock); |
3587 | } |
3588 | gf_log (this->name, GF_LOG_TRACE,do { do { if (0) printf ("transport %p destroyed", this); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3589, GF_LOG_TRACE, "transport %p destroyed", this); } while (0) |
3589 | "transport %p destroyed", this)do { do { if (0) printf ("transport %p destroyed", this); } while (0); _gf_log (this->name, "socket.c", __FUNCTION__, 3589, GF_LOG_TRACE, "transport %p destroyed", this); } while (0); |
3590 | |
3591 | pthread_mutex_destroy (&priv->lock); |
3592 | if (priv->ssl_private_key) { |
3593 | GF_FREE(priv->ssl_private_key)__gf_free (priv->ssl_private_key); |
3594 | } |
3595 | if (priv->ssl_own_cert) { |
3596 | GF_FREE(priv->ssl_own_cert)__gf_free (priv->ssl_own_cert); |
3597 | } |
3598 | if (priv->ssl_ca_list) { |
3599 | GF_FREE(priv->ssl_ca_list)__gf_free (priv->ssl_ca_list); |
3600 | } |
3601 | GF_FREE (priv)__gf_free (priv); |
3602 | } |
3603 | |
3604 | this->private = NULL((void*)0); |
3605 | } |
3606 | |
3607 | |
3608 | int32_t |
3609 | init (rpc_transport_t *this) |
3610 | { |
3611 | int ret = -1; |
3612 | |
3613 | ret = socket_init (this); |
3614 | |
3615 | if (ret == -1) { |
3616 | gf_log (this->name, GF_LOG_DEBUG, "socket_init() failed")do { do { if (0) printf ("socket_init() failed"); } while (0) ; _gf_log (this->name, "socket.c", __FUNCTION__, 3616, GF_LOG_DEBUG , "socket_init() failed"); } while (0); |
3617 | } |
3618 | |
3619 | return ret; |
3620 | } |
3621 | |
3622 | struct volume_options options[] = { |
3623 | { .key = {"remote-port", |
3624 | "transport.remote-port", |
3625 | "transport.socket.remote-port"}, |
3626 | .type = GF_OPTION_TYPE_INT |
3627 | }, |
3628 | { .key = {"transport.socket.listen-port", "listen-port"}, |
3629 | .type = GF_OPTION_TYPE_INT |
3630 | }, |
3631 | { .key = {"transport.socket.bind-address", "bind-address" }, |
3632 | .type = GF_OPTION_TYPE_INTERNET_ADDRESS |
3633 | }, |
3634 | { .key = {"transport.socket.connect-path", "connect-path"}, |
3635 | .type = GF_OPTION_TYPE_ANY |
3636 | }, |
3637 | { .key = {"transport.socket.bind-path", "bind-path"}, |
3638 | .type = GF_OPTION_TYPE_ANY |
3639 | }, |
3640 | { .key = {"transport.socket.listen-path", "listen-path"}, |
3641 | .type = GF_OPTION_TYPE_ANY |
3642 | }, |
3643 | { .key = { "transport.address-family", |
3644 | "address-family" }, |
3645 | .value = {"inet", "inet6", "unix", "inet-sdp" }, |
3646 | .type = GF_OPTION_TYPE_STR |
3647 | }, |
3648 | |
3649 | { .key = {"non-blocking-io"}, |
3650 | .type = GF_OPTION_TYPE_BOOL |
3651 | }, |
3652 | { .key = {"tcp-window-size"}, |
3653 | .type = GF_OPTION_TYPE_SIZET, |
3654 | .min = GF_MIN_SOCKET_WINDOW_SIZE(0), |
3655 | .max = GF_MAX_SOCKET_WINDOW_SIZE(1 * 1048576ULL) |
3656 | }, |
3657 | { .key = {"transport.socket.nodelay"}, |
3658 | .type = GF_OPTION_TYPE_BOOL |
3659 | }, |
3660 | { .key = {"transport.socket.lowlat"}, |
3661 | .type = GF_OPTION_TYPE_BOOL |
3662 | }, |
3663 | { .key = {"transport.socket.keepalive"}, |
3664 | .type = GF_OPTION_TYPE_BOOL |
3665 | }, |
3666 | { .key = {"transport.socket.keepalive-interval"}, |
3667 | .type = GF_OPTION_TYPE_INT |
3668 | }, |
3669 | { .key = {"transport.socket.keepalive-time"}, |
3670 | .type = GF_OPTION_TYPE_INT |
3671 | }, |
3672 | { .key = {"transport.socket.listen-backlog"}, |
3673 | .type = GF_OPTION_TYPE_INT |
3674 | }, |
3675 | { .key = {"transport.socket.read-fail-log"}, |
3676 | .type = GF_OPTION_TYPE_BOOL |
3677 | }, |
3678 | { .key = {SSL_ENABLED_OPT"transport.socket.ssl-enabled"}, |
3679 | .type = GF_OPTION_TYPE_BOOL |
3680 | }, |
3681 | { .key = {SSL_OWN_CERT_OPT"transport.socket.ssl-own-cert"}, |
3682 | .type = GF_OPTION_TYPE_STR |
3683 | }, |
3684 | { .key = {SSL_PRIVATE_KEY_OPT"transport.socket.ssl-private-key"}, |
3685 | .type = GF_OPTION_TYPE_STR |
3686 | }, |
3687 | { .key = {SSL_CA_LIST_OPT"transport.socket.ssl-ca-list"}, |
3688 | .type = GF_OPTION_TYPE_STR |
3689 | }, |
3690 | { .key = {OWN_THREAD_OPT"transport.socket.own-thread"}, |
3691 | .type = GF_OPTION_TYPE_BOOL |
3692 | }, |
3693 | { .key = {NULL((void*)0)} } |
3694 | }; |