Etcdserver: Timed Out Waiting for Read Index Response
"Fossies" - the Fresh Open up Source Software Archive
Member "etcd-3.v.2/server/etcdserver/v3_server.go" (1 Feb 2022, 30901 Bytes) of parcel /linux/misc/etcd-iii.5.2.tar.gz:
As a special service "Fossies" has tried to format the requested source folio into HTML format using (guessed) Become source code syntax highlighting (style: standard) with prefixed line numbers and lawmaking folding option. Alternatively y'all can here view or download the uninterpreted source code file.
one // Copyright 2015 The etcd Authors two // three // Licensed under the Apache License, Version 2.0 (the "License"); iv // you may not use this file except in compliance with the License. five // Y'all may obtain a copy of the License at half-dozen // 7 // http://www.apache.org/licenses/LICENSE-two.0 eight // 9 // Unless required past applicable police or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR Weather condition OF Whatever KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations nether the License. 14 15 package etcdserver 16 17 import ( eighteen "bytes" xix "context" 20 "encoding/base64" 21 "encoding/binary" 22 "strconv" 23 "time" 24 25 pb "go.etcd.io/etcd/api/v3/etcdserverpb" 26 "go.etcd.io/etcd/api/v3/membershippb" 27 "become.etcd.io/etcd/pkg/v3/traceutil" 28 "go.etcd.io/etcd/raft/v3" 29 "go.etcd.io/etcd/server/v3/auth" thirty "go.etcd.io/etcd/server/v3/etcdserver/api/membership" 31 "go.etcd.io/etcd/server/v3/lease" 32 "get.etcd.io/etcd/server/v3/lease/leasehttp" 33 "go.etcd.io/etcd/server/v3/mvcc" 34 35 "github.com/gogo/protobuf/proto" 36 "go.uber.org/zap" 37 "golang.org/x/crypto/bcrypt" 38 ) 39 40 const ( 41 // In the health example, at that place might be a pocket-size gap (10s of entries) between 42 // the practical index and committed index. 43 // Even so, if the committed entries are very heavy to utilize, the gap might abound. 44 // We should end accepting new proposals if the gap growing to a sure point. 45 maxGapBetweenApplyAndCommitIndex = 5000 46 traceThreshold = 100 * time.Millisecond 47 readIndexRetryTime = 500 * time.Millisecond 48 ) 49 50 blazon RaftKV interface { 51 Range (ctx context.Context, r *atomic number 82.RangeRequest) (*atomic number 82.RangeResponse, error) 52 Put (ctx context.Context, r *pb.PutRequest) (*lead.PutResponse, error) 53 DeleteRange (ctx context.Context, r *pb.DeleteRangeRequest) (*lead.DeleteRangeResponse, error) 54 Txn (ctx context.Context, r *pb.TxnRequest) (*lead.TxnResponse, error) 55 Compact (ctx context.Context, r *atomic number 82.CompactionRequest) (*pb.CompactionResponse, error) 56 } 57 58 type Lessor interface { 59 // LeaseGrant sends LeaseGrant request to raft and apply information technology later committed. 60 LeaseGrant (ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) 61 // LeaseRevoke sends LeaseRevoke request to raft and apply information technology after committed. 62 LeaseRevoke (ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) 63 64 // LeaseRenew renews the charter with given ID. The renewed TTL is returned. Or an mistake 65 // is returned. 66 LeaseRenew (ctx context.Context, id lease.LeaseID) ( int64 , mistake) 67 68 // LeaseTimeToLive retrieves lease data. 69 LeaseTimeToLive (ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, mistake) seventy 71 // LeaseLeases lists all leases. 72 LeaseLeases (ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) 73 } 74 75 type Authenticator interface { 76 AuthEnable (ctx context.Context, r *atomic number 82.AuthEnableRequest) (*pb.AuthEnableResponse, error) 77 AuthDisable (ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) 78 AuthStatus (ctx context.Context, r *lead.AuthStatusRequest) (*pb.AuthStatusResponse, fault) 79 Authenticate (ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, fault) 80 UserAdd (ctx context.Context, r *atomic number 82.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) 81 UserDelete (ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, fault) 82 UserChangePassword (ctx context.Context, r *atomic number 82.AuthUserChangePasswordRequest) (*atomic number 82.AuthUserChangePasswordResponse, fault) 83 UserGrantRole (ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*atomic number 82.AuthUserGrantRoleResponse, mistake) 84 UserGet (ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, fault) 85 UserRevokeRole (ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*atomic number 82.AuthUserRevokeRoleResponse, fault) 86 RoleAdd (ctx context.Context, r *lead.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, mistake) 87 RoleGrantPermission (ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) 88 RoleGet (ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, fault) 89 RoleRevokePermission (ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*lead.AuthRoleRevokePermissionResponse, error) 90 RoleDelete (ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, fault) 91 UserList (ctx context.Context, r *lead.AuthUserListRequest) (*pb.AuthUserListResponse, error) 92 RoleList (ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, mistake) 93 } 94 95 func (s *EtcdServer) Range (ctx context.Context, r *lead.RangeRequest) (*atomic number 82.RangeResponse, fault) { 96 trace := traceutil. New ( "range" , 97 due south. Logger (), 98 traceutil.Field{Key: "range_begin" , Value: cord (r.Cardinal)}, 99 traceutil.Field{Key: "range_end" , Value: cord (r.RangeEnd)}, 100 ) 101 ctx = context. WithValue (ctx, traceutil.TraceKey, trace) 102 103 var resp *pb.RangeResponse 104 var err error 105 defer func (start fourth dimension.Time) { 106 warnOfExpensiveReadOnlyRangeRequest (southward. Logger (), s.Cfg.WarningApplyDuration, start, r, resp, err) 107 if resp != zilch { 108 trace. AddField ( 109 traceutil.Field{Key: "response_count" , Value: len (resp.Kvs)}, 110 traceutil.Field{Key: "response_revision" , Value: resp.Header.Revision}, 111 ) 112 } 113 trace. LogIfLong (traceThreshold) 114 }(time. Now ()) 115 116 if !r.Serializable { 117 err = s. linearizableReadNotify (ctx) 118 trace. Step ( "agreement among raft nodes earlier linearized reading" ) 119 if err != nil { 120 return cipher , err 121 } 122 } 123 chk := func (ai *auth.AuthInfo) error { 124 return s.authStore. IsRangePermitted (ai, r.Key, r.RangeEnd) 125 } 126 127 get := func () { resp, err = s.applyV3Base. Range (ctx, nil , r) } 128 if serr := s. doSerialize (ctx, chk, get); serr != nil { 129 err = serr 130 return nil , err 131 } 132 return resp, err 133 } 134 135 func (s *EtcdServer) Put (ctx context.Context, r *atomic number 82.PutRequest) (*pb.PutResponse, mistake) { 136 ctx = context. WithValue (ctx, traceutil.StartTimeKey, time. At present ()) 137 resp, err := s. raftRequest (ctx, lead.InternalRaftRequest{Put: r}) 138 if err != nil { 139 return nothing , err 140 } 141 return resp.(*pb.PutResponse), aught 142 } 143 144 func (s *EtcdServer) DeleteRange (ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, mistake) { 145 resp, err := s. raftRequest (ctx, pb.InternalRaftRequest{DeleteRange: r}) 146 if err != nothing { 147 return nil , err 148 } 149 render resp.(*pb.DeleteRangeResponse), nil 150 } 151 152 func (s *EtcdServer) Txn (ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, fault) { 153 if isTxnReadonly (r) { 154 trace := traceutil. New ( "transaction" , 155 s. Logger (), 156 traceutil.Field{Key: "read_only" , Value: true }, 157 ) 158 ctx = context. WithValue (ctx, traceutil.TraceKey, trace) 159 if ! isTxnSerializable (r) { 160 err := s. linearizableReadNotify (ctx) 161 trace. Step ( "agreement among raft nodes before linearized reading" ) 162 if err != nil { 163 render nil , err 164 } 165 } 166 var resp *pb.TxnResponse 167 var err error 168 chk := func (ai *auth.AuthInfo) error { 169 return checkTxnAuth (s.authStore, ai, r) 170 } 171 172 defer func (start fourth dimension.Time) { 173 warnOfExpensiveReadOnlyTxnRequest (s. Logger (), s.Cfg.WarningApplyDuration, showtime, r, resp, err) 174 trace. LogIfLong (traceThreshold) 175 }(time. At present ()) 176 177 get := func () { resp, _, err = southward.applyV3Base. Txn (ctx, r) } 178 if serr := s. doSerialize (ctx, chk, get); serr != nil { 179 return nix , serr 180 } 181 return resp, err 182 } 183 184 ctx = context. WithValue (ctx, traceutil.StartTimeKey, time. At present ()) 185 resp, err := s. raftRequest (ctx, pb.InternalRaftRequest{Txn: r}) 186 if err != nil { 187 return nil , err 188 } 189 render resp.(*pb.TxnResponse), nil 190 } 191 192 func isTxnSerializable (r *pb.TxnRequest) bool { 193 for _, u := range r.Success { 194 if r := u. GetRequestRange (); r == nil || !r.Serializable { 195 render false 196 } 197 } 198 for _, u := range r.Failure { 199 if r := u. GetRequestRange (); r == nil || !r.Serializable { 200 return false 201 } 202 } 203 return truthful 204 } 205 206 func isTxnReadonly (r *pb.TxnRequest) bool { 207 for _, u := range r.Success { 208 if r := u. GetRequestRange (); r == naught { 209 return imitation 210 } 211 } 212 for _, u := range r.Failure { 213 if r := u. GetRequestRange (); r == null { 214 return imitation 215 } 216 } 217 return truthful 218 } 219 220 func (s *EtcdServer) Compact (ctx context.Context, r *atomic number 82.CompactionRequest) (*pb.CompactionResponse, fault) { 221 startTime := time. Now () 222 result, err := s. processInternalRaftRequestOnce (ctx, pb.InternalRaftRequest{Compaction: r}) 223 trace := traceutil. TODO () 224 if result != nil && effect.trace != nil { 225 trace = upshot.trace 226 defer func () { 227 trace. LogIfLong (traceThreshold) 228 }() 229 applyStart := result.trace. GetStartTime () 230 issue.trace. SetStartTime (startTime) 231 trace. InsertStep ( 0 , applyStart, "process raft request" ) 232 } 233 if r.Physical && result != nil && result.physc != nil { 234 <-result.physc 235 // The compaction is washed deleting keys; the hash is now settled 236 // but the data is not necessarily committed. If in that location'southward a crash, 237 // the hash may revert to a hash prior to compaction completing 238 // if the compaction resumes. Force the finished compaction to 239 // commit and so it won't resume following a crash. 240 southward.be. ForceCommit () 241 trace. Step ( "physically apply compaction" ) 242 } 243 if err != nothing { 244 return aught , err 245 } 246 if result.err != nil { 247 return nothing , event.err 248 } 249 resp := result.resp.(*lead.CompactionResponse) 250 if resp == zero { 251 resp = &pb.CompactionResponse{} 252 } 253 if resp.Header == goose egg { 254 resp.Header = &pb.ResponseHeader{} 255 } 256 resp.Header.Revision = southward.kv. Rev () 257 trace. AddField (traceutil.Field{Fundamental: "response_revision" , Value: resp.Header.Revision}) 258 return resp, nothing 259 } 260 261 func (south *EtcdServer) LeaseGrant (ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 262 // no id given? choose i 263 for r.ID == int64 (lease.NoLease) { 264 // but use positive int64 id'due south 265 r.ID = int64 (s.reqIDGen. Next () & (( 1 << 63 ) - 1 )) 266 } 267 resp, err := s. raftRequestOnce (ctx, atomic number 82.InternalRaftRequest{LeaseGrant: r}) 268 if err != null { 269 return nix , err 270 } 271 return resp.(*pb.LeaseGrantResponse), nothing 272 } 273 274 func (due south *EtcdServer) LeaseRevoke (ctx context.Context, r *lead.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { 275 resp, err := s. raftRequestOnce (ctx, pb.InternalRaftRequest{LeaseRevoke: r}) 276 if err != nil { 277 return null , err 278 } 279 return resp.(*atomic number 82.LeaseRevokeResponse), nada 280 } 281 282 func (s *EtcdServer) LeaseRenew (ctx context.Context, id lease.LeaseID) ( int64 , error) { 283 ttl, err := southward.lessor. Renew (id) 284 if err == nada { // already requested to primary lessor(leader) 285 render ttl, nil 286 } 287 if err != lease.ErrNotPrimary { 288 return - 1 , err 289 } 290 291 cctx, cancel := context. WithTimeout (ctx, south.Cfg. ReqTimeout ()) 292 defer cancel () 293 294 // renewals don't get through raft; forrad to leader manually 295 for cctx. Err () == nil && err != nil { 296 leader, lerr := south. waitLeader (cctx) 297 if lerr != cipher { 298 return - 1 , lerr 299 } 300 for _, url := range leader.PeerURLs { 301 lurl := url + leasehttp.LeasePrefix 302 ttl, err = leasehttp. RenewHTTP (cctx, id, lurl, due south.peerRt) 303 if err == nil || err == lease.ErrLeaseNotFound { 304 render ttl, err 305 } 306 } 307 // Throttle in instance of e.k. connectedness problems. 308 time. Sleep ( 50 * time.Millisecond) 309 } 310 311 if cctx. Err () == context.DeadlineExceeded { 312 return - ane , ErrTimeout 313 } 314 return - i , ErrCanceled 315 } 316 317 func (s *EtcdServer) LeaseTimeToLive (ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { 318 if s. Leader () == s. ID () { 319 // primary; timetolive directly from leader 320 le := s.lessor. Lookup (lease. LeaseID (r.ID)) 321 if le == nil { 322 return zilch , lease.ErrLeaseNotFound 323 } 324 // TODO: fill out ResponseHeader 325 resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64 (le. Remaining (). Seconds ()), GrantedTTL: le. TTL ()} 326 if r.Keys { 327 ks := le. Keys () 328 kbs := make ([][] byte , len (ks)) 329 for i := range ks { 330 kbs[i] = [] byte (ks[i]) 331 } 332 resp.Keys = kbs 333 } 334 return resp, cipher 335 } 336 337 cctx, cancel := context. WithTimeout (ctx, s.Cfg. ReqTimeout ()) 338 defer cancel () 339 340 // forwards to leader 341 for cctx. Err () == nil { 342 leader, err := s. waitLeader (cctx) 343 if err != nil { 344 render nada , err 345 } 346 for _, url := range leader.PeerURLs { 347 lurl := url + leasehttp.LeaseInternalPrefix 348 resp, err := leasehttp. TimeToLiveHTTP (cctx, charter. LeaseID (r.ID), r.Keys, lurl, due south.peerRt) 349 if err == nil { 350 return resp.LeaseTimeToLiveResponse, zilch 351 } 352 if err == lease.ErrLeaseNotFound { 353 return nil , err 354 } 355 } 356 } 357 358 if cctx. Err () == context.DeadlineExceeded { 359 render nil , ErrTimeout 360 } 361 return nil , ErrCanceled 362 } 363 364 func (s *EtcdServer) LeaseLeases (ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { 365 ls := s.lessor. Leases () 366 lss := make ([]*pb.LeaseStatus, len (ls)) 367 for i := range ls { 368 lss[i] = &atomic number 82.LeaseStatus{ID: int64 (ls[i].ID)} 369 } 370 return &pb.LeaseLeasesResponse{Header: newHeader (due south), Leases: lss}, zip 371 } 372 373 func (south *EtcdServer) waitLeader (ctx context.Context) (*membership.Fellow member, fault) { 374 leader := s.cluster. Member (s. Leader ()) 375 for leader == nil { 376 // look an ballot 377 dur := time. Duration (s.Cfg.ElectionTicks) * time. Elapsing (s.Cfg.TickMs) * time.Millisecond 378 select { 379 case <-time. After (dur): 380 leader = southward.cluster. Member (south. Leader ()) 381 example <-south.stopping: 382 return nil , ErrStopped 383 case <-ctx. Washed (): 384 render nada , ErrNoLeader 385 } 386 } 387 if leader == naught || len (leader.PeerURLs) == 0 { 388 return nil , ErrNoLeader 389 } 390 render leader, naught 391 } 392 393 func (s *EtcdServer) Alarm (ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { 394 resp, err := s. raftRequestOnce (ctx, pb.InternalRaftRequest{Warning: r}) 395 if err != nix { 396 render nil , err 397 } 398 return resp.(*atomic number 82.AlarmResponse), nil 399 } 400 401 func (southward *EtcdServer) AuthEnable (ctx context.Context, r *pb.AuthEnableRequest) (*lead.AuthEnableResponse, error) { 402 resp, err := s. raftRequestOnce (ctx, pb.InternalRaftRequest{AuthEnable: r}) 403 if err != zip { 404 return zip , err 405 } 406 return resp.(*pb.AuthEnableResponse), nil 407 } 408 409 func (due south *EtcdServer) AuthDisable (ctx context.Context, r *atomic number 82.AuthDisableRequest) (*lead.AuthDisableResponse, error) { 410 resp, err := s. raftRequest (ctx, lead.InternalRaftRequest{AuthDisable: r}) 411 if err != nil { 412 return nil , err 413 } 414 render resp.(*pb.AuthDisableResponse), nil 415 } 416 417 func (s *EtcdServer) AuthStatus (ctx context.Context, r *pb.AuthStatusRequest) (*pb.AuthStatusResponse, mistake) { 418 resp, err := due south. raftRequest (ctx, atomic number 82.InternalRaftRequest{AuthStatus: r}) 419 if err != nil { 420 return nil , err 421 } 422 return resp.(*pb.AuthStatusResponse), null 423 } 424 425 func (south *EtcdServer) Authenticate (ctx context.Context, r *pb.AuthenticateRequest) (*atomic number 82.AuthenticateResponse, error) { 426 if err := southward. linearizableReadNotify (ctx); err != null { 427 render nil , err 428 } 429 430 lg := s. Logger () 431 432 var resp proto.Message 433 for { 434 checkedRevision, err := s. AuthStore (). CheckPassword (r.Name, r.Password) 435 if err != nil { 436 if err != auth.ErrAuthNotEnabled { 437 lg. Warn ( 438 "invalid hallmark was requested" , 439 zap. Cord ( "user" , r.Name), 440 zap. Mistake (err), 441 ) 442 } 443 return nil , err 444 } 445 446 st, err := due south. AuthStore (). GenTokenPrefix () 447 if err != nil { 448 return nil , err 449 } 450 451 // internalReq doesn't need to have Countersign because the above s.AuthStore().CheckPassword() already did it. 452 // In addition, it will let a WAL entry not record password as a manifestly text. 453 internalReq := &atomic number 82.InternalAuthenticateRequest{ 454 Name: r.Proper name, 455 SimpleToken: st, 456 } 457 458 resp, err = s. raftRequestOnce (ctx, lead.InternalRaftRequest{Authenticate: internalReq}) 459 if err != nil { 460 return nil , err 461 } 462 if checkedRevision == s. AuthStore (). Revision () { 463 suspension 464 } 465 466 lg. Info ( "revision when countersign checked became dried; retrying" ) 467 } 468 469 return resp.(*pb.AuthenticateResponse), zero 470 } 471 472 func (s *EtcdServer) UserAdd (ctx context.Context, r *pb.AuthUserAddRequest) (*lead.AuthUserAddResponse, error) { 473 if r.Options == nil || !r.Options.NoPassword { 474 hashedPassword, err := bcrypt. GenerateFromPassword ([] byte (r.Password), south.authStore. BcryptCost ()) 475 if err != nil { 476 return nil , err 477 } 478 r.HashedPassword = base64.StdEncoding. EncodeToString (hashedPassword) 479 r.Countersign = "" 480 } 481 482 resp, err := s. raftRequest (ctx, pb.InternalRaftRequest{AuthUserAdd: r}) 483 if err != nil { 484 return null , err 485 } 486 return resp.(*atomic number 82.AuthUserAddResponse), naught 487 } 488 489 func (s *EtcdServer) UserDelete (ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { 490 resp, err := due south. raftRequest (ctx, lead.InternalRaftRequest{AuthUserDelete: r}) 491 if err != nil { 492 return nil , err 493 } 494 return resp.(*pb.AuthUserDeleteResponse), cypher 495 } 496 497 func (s *EtcdServer) UserChangePassword (ctx context.Context, r *atomic number 82.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { 498 if r.Password != "" { 499 hashedPassword, err := bcrypt. GenerateFromPassword ([] byte (r.Password), s.authStore. BcryptCost ()) 500 if err != cypher { 501 render nil , err 502 } 503 r.HashedPassword = base64.StdEncoding. EncodeToString (hashedPassword) 504 r.Password = "" 505 } 506 507 resp, err := s. raftRequest (ctx, lead.InternalRaftRequest{AuthUserChangePassword: r}) 508 if err != nil { 509 return nil , err 510 } 511 return resp.(*lead.AuthUserChangePasswordResponse), nil 512 } 513 514 func (south *EtcdServer) UserGrantRole (ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, fault) { 515 resp, err := s. raftRequest (ctx, atomic number 82.InternalRaftRequest{AuthUserGrantRole: r}) 516 if err != zippo { 517 render nil , err 518 } 519 return resp.(*lead.AuthUserGrantRoleResponse), cypher 520 } 521 522 func (south *EtcdServer) UserGet (ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { 523 resp, err := south. raftRequest (ctx, pb.InternalRaftRequest{AuthUserGet: r}) 524 if err != nothing { 525 return nothing , err 526 } 527 return resp.(*pb.AuthUserGetResponse), nil 528 } 529 530 func (south *EtcdServer) UserList (ctx context.Context, r *lead.AuthUserListRequest) (*atomic number 82.AuthUserListResponse, fault) { 531 resp, err := s. raftRequest (ctx, lead.InternalRaftRequest{AuthUserList: r}) 532 if err != zip { 533 return nil , err 534 } 535 return resp.(*pb.AuthUserListResponse), goose egg 536 } 537 538 func (s *EtcdServer) UserRevokeRole (ctx context.Context, r *atomic number 82.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, fault) { 539 resp, err := southward. raftRequest (ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r}) 540 if err != nil { 541 return nil , err 542 } 543 return resp.(*lead.AuthUserRevokeRoleResponse), zilch 544 } 545 546 func (south *EtcdServer) RoleAdd (ctx context.Context, r *atomic number 82.AuthRoleAddRequest) (*lead.AuthRoleAddResponse, mistake) { 547 resp, err := s. raftRequest (ctx, lead.InternalRaftRequest{AuthRoleAdd: r}) 548 if err != nil { 549 render nil , err 550 } 551 return resp.(*pb.AuthRoleAddResponse), nil 552 } 553 554 func (southward *EtcdServer) RoleGrantPermission (ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { 555 resp, err := s. raftRequest (ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r}) 556 if err != goose egg { 557 return nil , err 558 } 559 return resp.(*pb.AuthRoleGrantPermissionResponse), cypher 560 } 561 562 func (due south *EtcdServer) RoleGet (ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { 563 resp, err := s. raftRequest (ctx, pb.InternalRaftRequest{AuthRoleGet: r}) 564 if err != nil { 565 return zip , err 566 } 567 render resp.(*pb.AuthRoleGetResponse), cypher 568 } 569 570 func (s *EtcdServer) RoleList (ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { 571 resp, err := s. raftRequest (ctx, pb.InternalRaftRequest{AuthRoleList: r}) 572 if err != nil { 573 return cypher , err 574 } 575 render resp.(*pb.AuthRoleListResponse), nix 576 } 577 578 func (s *EtcdServer) RoleRevokePermission (ctx context.Context, r *atomic number 82.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, fault) { 579 resp, err := south. raftRequest (ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r}) 580 if err != nil { 581 return nil , err 582 } 583 return resp.(*pb.AuthRoleRevokePermissionResponse), nil 584 } 585 586 func (s *EtcdServer) RoleDelete (ctx context.Context, r *pb.AuthRoleDeleteRequest) (*lead.AuthRoleDeleteResponse, mistake) { 587 resp, err := southward. raftRequest (ctx, pb.InternalRaftRequest{AuthRoleDelete: r}) 588 if err != null { 589 return nil , err 590 } 591 render resp.(*pb.AuthRoleDeleteResponse), zero 592 } 593 594 func (s *EtcdServer) raftRequestOnce (ctx context.Context, r atomic number 82.InternalRaftRequest) (proto.Message, error) { 595 result, err := southward. processInternalRaftRequestOnce (ctx, r) 596 if err != nix { 597 return nil , err 598 } 599 if result.err != null { 600 return zilch , result.err 601 } 602 if startTime, ok := ctx. Value (traceutil.StartTimeKey).(time.Time); ok && outcome.trace != nix { 603 applyStart := consequence.trace. GetStartTime () 604 // The trace object is created in apply. Here reset the commencement fourth dimension to trace 605 // the raft request fourth dimension past the difference between the request start fourth dimension 606 // and apply start time 607 result.trace. SetStartTime (startTime) 608 result.trace. InsertStep ( 0 , applyStart, "process raft request" ) 609 result.trace. LogIfLong (traceThreshold) 610 } 611 return outcome.resp, nil 612 } 613 614 func (southward *EtcdServer) raftRequest (ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { 615 return s. raftRequestOnce (ctx, r) 616 } 617 618 // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nada error on hallmark failure. 619 func (southward *EtcdServer) doSerialize (ctx context.Context, chk func (*auth.AuthInfo) error, become func ()) error { 620 trace := traceutil. Get (ctx) 621 ai, err := south. AuthInfoFromCtx (ctx) 622 if err != zip { 623 return err 624 } 625 if ai == nil { 626 // chk expects non-nil AuthInfo; employ empty credentials 627 ai = &auth.AuthInfo{} 628 } 629 if err = chk (ai); err != nil { 630 return err 631 } 632 trace. Step ( "go hallmark metadata" ) 633 // fetch response for serialized request 634 get () 635 // cheque for stale token revision in instance the auth store was updated while 636 // the request has been handled. 637 if ai.Revision != 0 && ai.Revision != s.authStore. Revision () { 638 return auth.ErrAuthOldRevision 639 } 640 return nothing 641 } 642 643 func (s *EtcdServer) processInternalRaftRequestOnce (ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { 644 ai := s. getAppliedIndex () 645 ci := s. getCommittedIndex () 646 if ci > ai+maxGapBetweenApplyAndCommitIndex { 647 render goose egg , ErrTooManyRequests 648 } 649 650 r.Header = &lead.RequestHeader{ 651 ID: s.reqIDGen. Adjacent (), 652 } 653 654 // cheque authinfo if it is not InternalAuthenticateRequest 655 if r.Authenticate == nil { 656 authInfo, err := south. AuthInfoFromCtx (ctx) 657 if err != goose egg { 658 return zippo , err 659 } 660 if authInfo != goose egg { 661 r.Header.Username = authInfo.Username 662 r.Header.AuthRevision = authInfo.Revision 663 } 664 } 665 666 data, err := r. Marshal () 667 if err != nil { 668 return nil , err 669 } 670 671 if len (data) > int (southward.Cfg.MaxRequestBytes) { 672 return zip , ErrRequestTooLarge 673 } 674 675 id := r.ID 676 if id == 0 { 677 id = r.Header.ID 678 } 679 ch := due south.westward. Annals (id) 680 681 cctx, cancel := context. WithTimeout (ctx, south.Cfg. ReqTimeout ()) 682 defer cancel () 683 684 outset := time. Now () 685 err = s.r. Propose (cctx, information) 686 if err != zip { 687 proposalsFailed. Inc () 688 s.west. Trigger (id, nil ) // GC wait 689 render nil , err 690 } 691 proposalsPending. Inc () 692 defer proposalsPending. Dec () 693 694 select { 695 case x := <-ch: 696 return ten.(*applyResult), cipher 697 case <-cctx. Washed (): 698 proposalsFailed. Inc () 699 s.westward. Trigger (id, aught ) // GC expect 700 return nil , south. parseProposeCtxErr (cctx. Err (), start) 701 case <-south.done: 702 return nothing , ErrStopped 703 } 704 } 705 706 // Watchable returns a watchable interface attached to the etcdserver. 707 func (s *EtcdServer) Watchable () mvcc.WatchableKV { return southward. KV () } 708 709 func (southward *EtcdServer) linearizableReadLoop () { 710 for { 711 requestId := due south.reqIDGen. Adjacent () 712 leaderChangedNotifier := due south. LeaderChangedNotify () 713 select { 714 case <-leaderChangedNotifier: 715 continue 716 instance <-s.readwaitc: 717 case <-s.stopping: 718 return 719 } 720 721 // as a unmarried loop is can unlock multiple reads, it is not very useful 722 // to propagate the trace from Txn or Range. 723 trace := traceutil. New ( "linearizableReadLoop" , s. Logger ()) 724 725 nextnr := newNotifier () 726 s.readMu. Lock () 727 nr := s.readNotifier 728 s.readNotifier = nextnr 729 s.readMu. Unlock () 730 731 confirmedIndex, err := s. requestCurrentIndex (leaderChangedNotifier, requestId) 732 if isStopped (err) { 733 return 734 } 735 if err != nil { 736 nr. notify (err) 737 continue 738 } 739 740 trace. Step ( "read alphabetize received" ) 741 742 trace. AddField (traceutil.Field{Key: "readStateIndex" , Value: confirmedIndex}) 743 744 appliedIndex := s. getAppliedIndex () 745 trace. AddField (traceutil.Field{Key: "appliedIndex" , Value: strconv. FormatUint (appliedIndex, 10 )}) 746 747 if appliedIndex < confirmedIndex { 748 select { 749 case <-southward.applyWait. Wait (confirmedIndex): 750 case <-s.stopping: 751 return 752 } 753 } 754 // unblock all fifty-reads requested at indices before confirmedIndex 755 nr. notify ( cypher ) 756 trace. Pace ( "applied index is at present lower than readState.Index" ) 757 758 trace. LogAllStepsIfLong (traceThreshold) 759 } 760 } 761 762 func isStopped (err error) bool { 763 return err == raft.ErrStopped || err == ErrStopped 764 } 765 766 func (s *EtcdServer) requestCurrentIndex (leaderChangedNotifier <- chan struct {}, requestId uint64 ) ( uint64 , error) { 767 err := south. sendReadIndex (requestId) 768 if err != nil { 769 return 0 , err 770 } 771 772 lg := s. Logger () 773 errorTimer := time. NewTimer (s.Cfg. ReqTimeout ()) 774 defer errorTimer. Stop () 775 retryTimer := fourth dimension. NewTimer (readIndexRetryTime) 776 defer retryTimer. Stop () 777 778 firstCommitInTermNotifier := due south. FirstCommitInTermNotify () 779 780 for { 781 select { 782 instance rs := <-s.r.readStateC: 783 requestIdBytes := uint64ToBigEndianBytes (requestId) 784 gotOwnResponse := bytes. Equal (rs.RequestCtx, requestIdBytes) 785 if !gotOwnResponse { 786 // a previous request might fourth dimension out. at present we should ignore the response of it and 787 // proceed waiting for the response of the current requests. 788 responseId := uint64 ( 0 ) 789 if len (rs.RequestCtx) == 8 { 790 responseId = binary.BigEndian. Uint64 (rs.RequestCtx) 791 } 792 lg. Warn ( 793 "ignored out-of-date read alphabetize response; local node read indexes queueing up and waiting to be in sync with leader" , 794 zap. Uint64 ( "sent-request-id" , requestId), 795 zap. Uint64 ( "received-request-id" , responseId), 796 ) 797 slowReadIndex. Inc () 798 continue 799 } 800 return rs.Index, nil 801 instance <-leaderChangedNotifier: 802 readIndexFailed. Inc () 803 // return a retryable fault. 804 render 0 , ErrLeaderChanged 805 case <-firstCommitInTermNotifier: 806 firstCommitInTermNotifier = south. FirstCommitInTermNotify () 807 lg. Info ( "first commit in current term: resending ReadIndex request" ) 808 err := s. sendReadIndex (requestId) 809 if err != nil { 810 return 0 , err 811 } 812 retryTimer. Reset (readIndexRetryTime) 813 continue 814 case <-retryTimer.C: 815 lg. Warn ( 816 "waiting for ReadIndex response took too long, retrying" , 817 zap. Uint64 ( "sent-request-id" , requestId), 818 zap. Duration ( "retry-timeout" , readIndexRetryTime), 819 ) 820 err := s. sendReadIndex (requestId) 821 if err != nil { 822 return 0 , err 823 } 824 retryTimer. Reset (readIndexRetryTime) 825 continue 826 case <-errorTimer.C: 827 lg. Warn ( 828 "timed out waiting for read index response (local node might take irksome network)" , 829 zap. Duration ( "timeout" , s.Cfg. ReqTimeout ()), 830 ) 831 slowReadIndex. Inc () 832 return 0 , ErrTimeout 833 example <-s.stopping: 834 render 0 , ErrStopped 835 } 836 } 837 } 838 839 func uint64ToBigEndianBytes (number uint64 ) [] byte { 840 byteResult := make ([] byte , eight ) 841 binary.BigEndian. PutUint64 (byteResult, number) 842 return byteResult 843 } 844 845 func (s *EtcdServer) sendReadIndex (requestIndex uint64 ) error { 846 ctxToSend := uint64ToBigEndianBytes (requestIndex) 847 848 cctx, cancel := context. WithTimeout (context. Groundwork (), south.Cfg. ReqTimeout ()) 849 err := southward.r. ReadIndex (cctx, ctxToSend) 850 cancel () 851 if err == raft.ErrStopped { 852 return err 853 } 854 if err != nil { 855 lg := s. Logger () 856 lg. Warn ( "failed to get read alphabetize from Raft" , zap. Error (err)) 857 readIndexFailed. Inc () 858 return err 859 } 860 return nada 861 } 862 863 func (s *EtcdServer) LinearizableReadNotify (ctx context.Context) error { 864 return southward. linearizableReadNotify (ctx) 865 } 866 867 func (south *EtcdServer) linearizableReadNotify (ctx context.Context) error { 868 s.readMu. RLock () 869 nc := s.readNotifier 870 due south.readMu. RUnlock () 871 872 // signal linearizable loop for electric current notify if information technology hasn't been already 873 select { 874 case s.readwaitc <- struct {}{}: 875 default : 876 } 877 878 // wait for read state notification 879 select { 880 case <-nc.c: 881 return nc.err 882 case <-ctx. Done (): 883 return ctx. Err () 884 case <-s.done: 885 return ErrStopped 886 } 887 } 888 889 func (s *EtcdServer) AuthInfoFromCtx (ctx context.Context) (*auth.AuthInfo, error) { 890 authInfo, err := s. AuthStore (). AuthInfoFromCtx (ctx) 891 if authInfo != nil || err != nil { 892 render authInfo, err 893 } 894 if !due south.Cfg.ClientCertAuthEnabled { 895 return zilch , nil 896 } 897 authInfo = s. AuthStore (). AuthInfoFromTLS (ctx) 898 return authInfo, zero 899 } 900 901 func (south *EtcdServer) Downgrade (ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { 902 switch r.Action { 903 case atomic number 82.DowngradeRequest_VALIDATE: 904 render s. downgradeValidate (ctx, r.Version) 905 case atomic number 82.DowngradeRequest_ENABLE: 906 return s. downgradeEnable (ctx, r) 907 case pb.DowngradeRequest_CANCEL: 908 render south. downgradeCancel (ctx) 909 default : 910 return nix , ErrUnknownMethod 911 } 912 } 913 914 func (s *EtcdServer) downgradeValidate (ctx context.Context, 5 cord ) (*pb.DowngradeResponse, error) { 915 resp := &pb.DowngradeResponse{} 916 917 targetVersion, err := convertToClusterVersion (v) 918 if err != nothing { 919 return aught , err 920 } 921 922 // gets leaders commit alphabetize and wait for local store to finish applying that index 923 // to avoid using stale downgrade information 924 err = southward. linearizableReadNotify (ctx) 925 if err != nil { 926 render nada , err 927 } 928 929 cv := south. ClusterVersion () 930 if cv == nil { 931 return naught , ErrClusterVersionUnavailable 932 } 933 resp.Version = cv. String () 934 935 allowedTargetVersion := membership. AllowedDowngradeVersion (cv) 936 if !targetVersion. Equal (*allowedTargetVersion) { 937 return nil , ErrInvalidDowngradeTargetVersion 938 } 939 940 downgradeInfo := s.cluster. DowngradeInfo () 941 if downgradeInfo.Enabled { 942 // Todo: render the downgrade status along with the error msg 943 return goose egg , ErrDowngradeInProcess 944 } 945 return resp, nil 946 } 947 948 func (s *EtcdServer) downgradeEnable (ctx context.Context, r *atomic number 82.DowngradeRequest) (*pb.DowngradeResponse, fault) { 949 // validate downgrade capability before starting downgrade 950 v := r.Version 951 lg := s. Logger () 952 if resp, err := s. downgradeValidate (ctx, 5); err != goose egg { 953 lg. Warn ( "reject downgrade asking" , zap. Error (err)) 954 render resp, err 955 } 956 targetVersion, err := convertToClusterVersion (five) 957 if err != zilch { 958 lg. Warn ( "reject downgrade request" , zap. Error (err)) 959 return nothing , err 960 } 961 962 raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true , Ver: targetVersion. Cord ()} 963 _, err = s. raftRequest (ctx, atomic number 82.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) 964 if err != null { 965 lg. Warn ( "reject downgrade asking" , zap. Error (err)) 966 return nil , err 967 } 968 resp := pb.DowngradeResponse{Version: s. ClusterVersion (). String ()} 969 render &resp, zilch 970 } 971 972 func (s *EtcdServer) downgradeCancel (ctx context.Context) (*pb.DowngradeResponse, mistake) { 973 // gets leaders commit alphabetize and wait for local store to finish applying that index 974 // to avoid using stale downgrade information 975 if err := s. linearizableReadNotify (ctx); err != naught { 976 return nil , err 977 } 978 979 downgradeInfo := south.cluster. DowngradeInfo () 980 if !downgradeInfo.Enabled { 981 return nil , ErrNoInflightDowngrade 982 } 983 984 raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false } 985 _, err := s. raftRequest (ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) 986 if err != zero { 987 return nil , err 988 } 989 resp := atomic number 82.DowngradeResponse{Version: s. ClusterVersion (). String ()} 990 return &resp, nil 991 }
Source: https://fossies.org/linux/etcd/server/etcdserver/v3_server.go
0 Response to "Etcdserver: Timed Out Waiting for Read Index Response"
Postar um comentário