cluster_mgmt_674338/ 0000755 0025700 0000047 00000000000 11522326554 013424 5 ustar aconway ais cluster_mgmt_674338/testagent.xml 0000644 0025700 0000047 00000004251 11504214154 016136 0 ustar aconway ais
This class represents a parent object
cluster_mgmt_674338/Makefile 0000664 0025700 0000047 00000001321 11522325517 015061 0 ustar aconway ais # Makefile for testagent
all: testagent
TESTAGENT_GEN_SRC= \
testagent_gen/qmf/org/apache/qpid/agent/example/Parent.cpp \
testagent_gen/qmf/org/apache/qpid/agent/example/Child.cpp \
testagent_gen/qmf/org/apache/qpid/agent/example/EventChildCreated.cpp \
testagent_gen/qmf/org/apache/qpid/agent/example/EventChildDestroyed.cpp \
testagent_gen/qmf/org/apache/qpid/agent/example/Package.cpp
$(TESTAGENT_GEN_SRC): testagent_gen.timestamp
testagent_gen.timestamp: testagent.xml
qmf-gen -o testagent_gen/qmf testagent.xml
touch $@
clean:
rm -rf testagent_gen testagent_gen.timestamp testagent *.pyc *~ brokertest.tmp
testagent: testagent.cpp $(TESTAGENT_GEN_SRC)
g++ -o $@ $^ -lqmf -I testagent_gen
cluster_mgmt_674338/cluster_tests.py 0000775 0025700 0000047 00000066650 11522277713 016725 0 ustar aconway ais #!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from itertools import chain
from tempfile import NamedTemporaryFile
log = getLogger("qpid.cluster_tests")
# Note: brokers that shut themselves down due to critical error during
# normal operation will still have an exit code of 0. Brokers that
# shut down because of an error found during initialize will exit with
# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
# and EXPECT_EXIT_FAIL in some of the tests below.
# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error
# should give non-0 exit status.
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
def readfile(filename):
"""Returns te content of file named filename as a string"""
f = file(filename)
try: return f.read()
finally: f.close()
class ShortTests(BrokerTest):
"""Short cluster functionality tests."""
def test_message_replication(self):
"""Test basic cluster message replication."""
# Start a cluster, send some messages to member 0.
cluster = self.cluster(2)
s0 = cluster[0].connect().session()
s0.sender("q; {create:always}").send(Message("x"))
s0.sender("q; {create:always}").send(Message("y"))
s0.connection.close()
# Verify messages available on member 1.
s1 = cluster[1].connect().session()
m = s1.receiver("q", capacity=1).fetch(timeout=1)
s1.acknowledge()
self.assertEqual("x", m.content)
s1.connection.close()
# Start member 2 and verify messages available.
s2 = cluster.start().connect().session()
m = s2.receiver("q", capacity=1).fetch(timeout=1)
s2.acknowledge()
self.assertEqual("y", m.content)
s2.connection.close()
def test_store_direct_update_match(self):
"""Verify that brokers stores an identical message whether they receive it
direct from clients or during an update, no header or other differences"""
cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
cluster.start(args=["--test-store-dump", "direct.dump"])
# Try messages with various headers
cluster[0].send_message("q", Message(durable=True, content="foobar",
subject="subject",
reply_to="reply_to",
properties={"n":10}))
# Try messages of different sizes
for size in range(0,10000,100):
cluster[0].send_message("q", Message(content="x"*size, durable=True))
# Try sending via named exchange
c = cluster[0].connect_old()
s = c.session(str(qpid.datatypes.uuid4()))
s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
props = s.delivery_properties(routing_key="foo", delivery_mode=2)
s.message_transfer(
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
os.remove("direct.dump")
os.remove("updatee.dump")
def test_sasl(self):
"""Test SASL authentication and encryption in a cluster"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
aclf.write("""
acl deny zag@QPID create queue
acl allow all all
""")
aclf.close()
cluster = self.cluster(2, args=["--auth", "yes",
"--sasl-config", sasl_config,
"--load-module", os.getenv("ACL_LIB"),
"--acl-file", acl])
# Valid user/password, ensure queue is created.
c = cluster[0].connect(username="zig", password="zig")
c.session().sender("ziggy;{create:always}")
c.close()
c = cluster[1].connect(username="zig", password="zig")
c.session().receiver("ziggy;{assert:always}")
c.close()
for b in cluster: b.ready() # Make sure all brokers still running.
# Valid user, bad password
try:
cluster[0].connect(username="zig", password="foo").close()
self.fail("Expected exception")
except messaging.exceptions.ConnectionError: pass
for b in cluster: b.ready() # Make sure all brokers still running.
# Bad user ID
try:
cluster[0].connect(username="foo", password="bar").close()
self.fail("Expected exception")
except messaging.exceptions.ConnectionError: pass
for b in cluster: b.ready() # Make sure all brokers still running.
# Action disallowed by ACL
c = cluster[0].connect(username="zag", password="zag")
try:
s = c.session()
s.sender("zaggy;{create:always}")
s.close()
self.fail("Expected exception")
except messaging.exceptions.UnauthorizedAccess: pass
# make sure the queue was not created at the other node.
c = cluster[0].connect(username="zag", password="zag")
try:
s = c.session()
s.sender("zaggy;{assert:always}")
s.close()
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
def test_user_id_update(self):
"""Ensure that user-id of an open session is updated to new cluster members"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,])
c = cluster[0].connect(username="zig", password="zig")
s = c.session().sender("q;{create:always}")
s.send(Message("x", user_id="zig")) # Message sent before start new broker
cluster.start()
s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker
# Verify brokers are healthy and messages are on the queue.
self.assertEqual("x", cluster[0].get_message("q").content)
self.assertEqual("y", cluster[1].get_message("q").content)
def test_link_events(self):
"""Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
args = ["--mgmt-pub-interval", 1] # Publish management information every second.
broker1 = self.cluster(1, args)[0]
broker2 = self.cluster(1, args)[0]
qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING)
qr = self.popen(["qpid-route", "route", "add",
broker1.host_port(), broker2.host_port(),
"amq.fanout", "key"
], EXPECT_EXIT_OK)
# Look for link event in printevents output.
retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out")))
broker1.ready()
broker2.ready()
def test_queue_cleaner(self):
""" Regression test to ensure that cleanup of expired messages works correctly """
cluster = self.cluster(2, args=["--queue-purge-interval", 3])
s0 = cluster[0].connect().session()
sender = s0.sender("my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue':1}}}}")
#send 10 messages that will all expire and be cleaned up
for i in range(1, 10):
msg = Message("message-%s" % i)
msg.properties["qpid.LVQ_key"] = "a"
msg.ttl = 0.1
sender.send(msg)
#wait for queue cleaner to run
time.sleep(3)
#test all is ok by sending and receiving a message
msg = Message("non-expiring")
msg.properties["qpid.LVQ_key"] = "b"
sender.send(msg)
s0.connection.close()
s1 = cluster[1].connect().session()
m = s1.receiver("my-lvq", capacity=1).fetch(timeout=1)
s1.acknowledge()
self.assertEqual("non-expiring", m.content)
s1.connection.close()
for b in cluster: b.ready() # Make sure all brokers still running.
def test_amqfailover_visible(self):
"""Verify that the amq.failover exchange can be seen by
QMF-based tools - regression test for BZ615300."""
broker1 = self.cluster(1)[0]
broker2 = self.cluster(1)[0]
qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE)
out = qs.communicate()[0]
assert out.find("amq.failover") > 0
def evaluate_address(self, session, address):
"""Create a receiver just to evaluate an address for its side effects"""
r = session.receiver(address)
r.close()
def test_expire_fanout(self):
"""Regression test for QPID-2874: Clustered broker crashes in assertion in
cluster/ExpiryPolicy.cpp.
Caused by a fan-out message being updated as separate messages"""
cluster = self.cluster(1)
session0 = cluster[0].connect().session()
# Create 2 queues bound to fanout exchange.
self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}")
self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}")
queues = ["q1", "q2"]
# Send a fanout message with a long timeout
s = session0.sender("amq.fanout")
s.send(Message("foo", ttl=100), sync=False)
# Start a new member, check the messages
cluster.start()
session1 = cluster[1].connect().session()
for q in queues: self.assert_browse(session1, "q1", ["foo"])
def test_dr_no_message(self):
"""Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141
Joining broker crashes with 'error deliveryRecord no update message'
"""
cluster = self.cluster(1)
session0 = cluster[0].connect().session()
s = session0.sender("q1;{create:always}")
s.send(Message("a", ttl=0.05), sync=False)
s.send(Message("b", ttl=0.05), sync=False)
r1 = session0.receiver("q1")
self.assertEqual("a", r1.fetch(timeout=0).content)
r2 = session0.receiver("q1;{mode:browse}")
self.assertEqual("b", r2.fetch(timeout=0).content)
# Leave messages un-acknowledged, let the expire, then start new broker.
time.sleep(.1)
cluster.start()
self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
def test_route_update(self):
"""Regression test for https://issues.apache.org/jira/browse/QPID-2982
Links and bridges associated with routes were not replicated on update.
This meant extra management objects and caused an exit if a management
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
["qpid-route", "route", "add", cluster0[0].host_port(),
cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"])
cluster0.start()
# Wait for qpid-tool:list on cluster0[0] to generate expected output.
pattern = re.compile("org.apache.qpid.broker.*link")
qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
class Scanner(Thread):
def __init__(self): self.found = False; Thread.__init__(self)
def run(self):
for l in qpid_tool.stdout:
if pattern.search(l): self.found = True; return
scanner = Scanner()
scanner.start()
start = time.time()
try:
# Wait up to 5 second timeout for scanner to find expected output
while not scanner.found and time.time() < start + 5:
qpid_tool.stdin.write("list\n") # Ask qpid-tool to list
for b in cluster0: b.ready() # Raise if any brokers are down
finally:
qpid_tool.stdin.write("quit\n")
qpid_tool.wait()
scanner.join()
assert scanner.found
# Verify logs are consistent
cluster_test_logs.verify_logs()
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION= is set"""
def duration(self):
d = self.config.defines.get("DURATION")
if d: return float(d)*60
else: return 3 # Default is to be quick
def test_failover(self):
"""Test fail-over during continuous send-receive with errors"""
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
for b in cluster: ErrorGenerator(b)
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
sender = NumberedSender(cluster[1], 1000) # Max queue depth
receiver = NumberedReceiver(cluster[2], sender)
receiver.start()
sender.start()
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
ErrorGenerator(b)
time.sleep(5)
sender.stop()
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self, args=[]):
"""
Stress test: Run management clients and other clients concurrently
while killing and restarting brokers.
"""
class ClientLoop(StoppableThread):
"""Run a client executable in a loop."""
def __init__(self, broker, cmd):
StoppableThread.__init__(self)
self.broker=broker
self.cmd = cmd # Client command.
self.lock = Lock()
self.process = None # Client process.
self.start()
def run(self):
try:
while True:
self.lock.acquire()
try:
if self.stopped: break
self.process = self.broker.test.popen(
self.cmd, expect=EXPECT_UNKNOWN)
finally: self.lock.release()
try: exit = self.process.wait()
except OSError, e:
# Seems to be a race in wait(), it throws
# "no such process" during test shutdown.
# Doesn't indicate a test error, ignore.
return
except Exception, e:
self.process.unexpected(
"client of %s: %s"%(self.broker.name, e))
self.lock.acquire()
try:
# Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
if exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
finally: self.lock.release()
except Exception, e:
self.error = RethrownException("Error in ClientLoop.run")
def stop(self):
"""Stop the running client and wait for it to exit"""
self.lock.acquire()
try:
if self.stopped: return
self.stopped = True
if self.process:
try: self.process.kill() # Kill the client.
except OSError: pass # The client might not be running.
finally: self.lock.release()
StoppableThread.stop(self)
# body of test_management()
args += ["--mgmt-pub-interval", 1]
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
cluster = self.cluster(3, args)
clients = [] # Per-broker list of clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
def start_clients(broker):
"""Start ordinary clients for a broker."""
cmds=[
["qpid-tool", "localhost:%s"%(broker.port())],
["qpid-perftest", "--count", 50000,
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
["testagent", "localhost", str(broker.port())] ]
clients.append([ClientLoop(broker, cmd) for cmd in cmds])
def start_mclients(broker):
"""Start management clients that make multiple connections."""
cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
while time.time() < endtime:
time.sleep(5)
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]
b.expect = EXPECT_EXIT_FAIL
b.kill()
# Stop the brokers clients and all the mclients.
for c in clients[alive] + mclients:
try: c.stop()
except: pass # Ignore expected errors due to broker shutdown.
clients[alive] = []
mclients = []
# Start another broker and clients
alive += 1
cluster.start()
start_clients(cluster[-1])
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
# Verify that logs are consistent
cluster_test_logs.verify_logs()
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
def test_connect_consistent(self): # FIXME aconway 2011-01-18:
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
cluster = self.cluster(2, args=args)
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
cluster_test_logs.verify_logs()
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
"""
def args(self):
assert BrokerTest.store_lib
return ["--load-module", BrokerTest.store_lib]
def test_store_loaded(self):
"""Ensure we are indeed loading a working store"""
broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL)
m = Message("x", durable=True)
broker.send_message("q", m)
broker.kill()
broker = self.broker(self.args(), name="recoverme")
self.assertEqual("x", broker.get_message("q").content)
def test_kill_restart(self):
"""Verify we can kill/resetart a broker with store in a cluster"""
cluster = self.cluster(1, self.args())
cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill()
# Send a message, retrieve from the restarted broker
cluster[0].send_message("q", "x")
m = cluster.start("restartme").get_message("q")
self.assertEqual("x", m.content)
def stop_cluster(self,broker):
"""Clean shut-down of a cluster"""
self.assertEqual(0, qpid_cluster.main(
["-kf", broker.host_port()]))
def test_persistent_restart(self):
"""Verify persistent cluster shutdown/restart scenarios"""
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True)
a.send_message("q", Message("1", durable=True))
# Kill & restart one member.
c.kill()
self.assertEqual(a.get_message("q").content, "1")
a.send_message("q", Message("2", durable=True))
c = cluster.start("c", expect=EXPECT_EXIT_OK)
self.assertEqual(c.get_message("q").content, "2")
# Shut down the entire cluster cleanly and bring it back up
a.send_message("q", Message("3", durable=True))
self.stop_cluster(a)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "3")
def test_persistent_partial_failure(self):
# Kill 2 members, shut down the last cleanly then restart
# Ensure we use the clean database
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
a.send_message("q", Message("4", durable=True))
a.kill()
b.kill()
self.assertEqual(c.get_message("q").content, "4")
c.send_message("q", Message("clean", durable=True))
self.stop_cluster(c)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "clean")
def test_wrong_cluster_id(self):
# Start a cluster1 broker, then try to restart in cluster2
cluster1 = self.cluster(0, args=self.args())
a = cluster1.start("a", expect=EXPECT_EXIT_OK)
a.terminate()
cluster2 = self.cluster(1, args=self.args())
try:
a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
a.ready()
self.fail("Expected exception")
except: pass
def test_wrong_shutdown_id(self):
# Start 2 members and shut down.
cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(b.wait(), 0)
# Restart with a different member and shut down.
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(c.wait(), 0)
# Mix members from both shutdown events, they should fail
# TODO aconway 2010-03-11: can't predict the exit status of these
# as it depends on the order of delivery of initial-status messages.
# See comment at top of this file.
a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)
b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False)
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
def test_solo_store_clean(self):
# A single node cluster should always leave a clean store.
cluster = self.cluster(0, self.args())
a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
a.send_message("q", Message("x", durable=True))
a.kill()
a = cluster.start("a")
self.assertEqual(a.get_message("q").content, "x")
def test_last_store_clean(self):
# Verify that only the last node in a cluster to shut down has
# a clean store. Start with cluster of 3, reduce to 1 then
# increase again to ensure that a node that was once alone but
# finally did not finish as the last node does not get a clean
# store.
cluster = self.cluster(0, self.args())
a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
self.assertEqual(a.store_state(), "clean")
b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
self.assertEqual(b.store_state(), "dirty")
self.assertEqual(c.store_state(), "dirty")
retry(lambda: a.store_state() == "dirty")
a.send_message("q", Message("x", durable=True))
a.kill()
b.kill() # c is last man, will mark store clean
retry(lambda: c.store_state() == "clean")
a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
retry(lambda: c.store_state() == "dirty")
c.kill() # a is now last man
retry(lambda: a.store_state() == "clean")
a.kill()
self.assertEqual(a.store_state(), "clean")
self.assertEqual(b.store_state(), "dirty")
self.assertEqual(c.store_state(), "dirty")
def test_restart_clean(self):
"""Verify that we can re-start brokers one by one in a
persistent cluster after a clean oshutdown"""
cluster = self.cluster(0, self.args())
a = cluster.start("a", expect=EXPECT_EXIT_OK)
b = cluster.start("b", expect=EXPECT_EXIT_OK)
c = cluster.start("c", expect=EXPECT_EXIT_OK)
a.send_message("q", Message("x", durable=True))
self.stop_cluster(a)
a = cluster.start("a")
b = cluster.start("b")
c = cluster.start("c")
self.assertEqual(c.get_message("q").content, "x")
def test_join_sub_size(self):
"""Verify that after starting a cluster with cluster-size=N,
we can join new members even if size < N-1"""
cluster = self.cluster(0, self.args()+["--cluster-size=3"])
a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL)
b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
c = cluster.start("c")
a.send_message("q", Message("x", durable=True))
a.send_message("q", Message("y", durable=True))
a.kill()
b.kill()
a = cluster.start("a")
self.assertEqual(c.get_message("q").content, "x")
b = cluster.start("b")
self.assertEqual(c.get_message("q").content, "y")
cluster_mgmt_674338/brokertest.py 0000664 0025700 0000047 00000063312 11522277713 016173 0 ustar aconway ais #
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Support library for tests that start multiple brokers, e.g. cluster
# or federation
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
from qpid import connection, messaging, util
from qpid.compat import format_exc
from qpid.harness import Skipped
from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
from logging import getLogger
log = getLogger("qpid.brokertest")
# Values for expected outcome of process at end of test
EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test.
EXPECT_RUNNING=3 # Expect to still be running at end of test
EXPECT_UNKNOWN=4 # No expectation, don't check exit status.
def find_exe(program):
"""Find an executable in the system PATH"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
mydir, name = os.path.split(program)
if mydir:
if is_exe(program): return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file): return exe_file
return None
def is_running(pid):
try:
os.kill(pid, 0)
return True
except:
return False
class BadProcessStatus(Exception):
pass
class ExceptionWrapper:
"""Proxy object that adds a message to exceptions raised"""
def __init__(self, obj, msg):
self.obj = obj
self.msg = msg
def __getattr__(self, name):
func = getattr(self.obj, name)
if type(func) != callable:
return func
return lambda *args, **kwargs: self._wrap(func, args, kwargs)
def _wrap(self, func, args, kwargs):
try:
return func(*args, **kwargs)
except Exception, e:
raise Exception("%s: %s" %(self.msg, str(e)))
def error_line(filename, n=1):
"""Get the last n line(s) of filename for error messages"""
result = []
try:
f = open(filename)
try:
for l in f:
if len(result) == n: result.pop(0)
result.append(" "+l)
finally: f.close()
except: return ""
return ":\n" + "".join(result)
def retry(function, timeout=10, delay=.01):
"""Call function until it returns True or timeout expires.
Double the delay for each retry. Return True if function
returns true, False if timeout expires."""
while not function():
if delay > timeout: delay = timeout
time.sleep(delay)
timeout -= delay
if timeout <= 0: return False
delay *= 2
return True
class Popen(subprocess.Popen):
"""
Can set and verify expectation of process status at end of test.
Dumps command line, stdout, stderr to data dir for debugging.
"""
class DrainThread(Thread):
"""Thread to drain a file object and write the data to a file."""
def __init__(self, infile, outname):
Thread.__init__(self)
self.infile, self.outname = infile, outname
self.outfile = None
def run(self):
try:
for line in self.infile:
if self.outfile is None:
self.outfile = open(self.outname, "w")
self.outfile.write(line)
finally:
self.infile.close()
if self.outfile is not None: self.outfile.close()
class OutStream(ExceptionWrapper):
"""Wrapper for output streams, handles exceptions & draining output"""
def __init__(self, infile, outfile, msg):
ExceptionWrapper.__init__(self, infile, msg)
self.infile, self.outfile = infile, outfile
self.thread = None
def drain(self):
if self.thread is None:
self.thread = Popen.DrainThread(self.infile, self.outfile)
self.thread.start()
def outfile(self, ext): return "%s.%s" % (self.pname, ext)
def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
"""Run cmd (should be a list of arguments)
expect - if set verify expectation at end of test.
drain - if true (default) drain stdout/stderr to files.
"""
self._clean = False
self._clean_lock = Lock()
assert find_exe(cmd[0]), "executable not found: "+cmd[0]
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
self.returncode = None
self.expect = expect
try:
subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True)
except ValueError: # Windows can't do close_fds
subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE)
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
msg = "Process %s" % self.pname
self.stdin = ExceptionWrapper(self.stdin, msg)
self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg)
self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg)
f = open(self.outfile("cmd"), "w")
try: f.write(self.cmd_str())
finally: f.close()
log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
if drain: self.drain()
def __str__(self): return "Popen<%s>"%(self.pname)
def drain(self):
"""Start threads to drain stdout/err"""
self.stdout.drain()
self.stderr.drain()
def _cleanup(self):
"""Close pipes to sub-process"""
self._clean_lock.acquire()
try:
if self._clean: return
self._clean = True
self.stdin.close()
self.drain() # Drain output pipes.
self.stdout.thread.join() # Drain thread closes pipe.
self.stderr.thread.join()
finally: self._clean_lock.release()
def unexpected(self,msg):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
def stop(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
try: self.kill() # Just make sure its dead
except: pass
elif self.expect == EXPECT_RUNNING:
try:
self.kill()
except:
self.unexpected("expected running, exit code %d" % self.wait())
else:
retry(lambda: self.poll() is not None)
if self.returncode is None: # Still haven't stopped
self.kill()
self.unexpected("still running")
elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
self.unexpected("exit code %d" % self.returncode)
elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
self.unexpected("expected error")
finally:
self.wait() # Clean up the process.
def communicate(self, input=None):
if input:
self.stdin.write(input)
self.stdin.close()
outerr = (self.stdout.read(), self.stderr.read())
self.wait()
return outerr
def is_running(self):
return self.poll() is None
def assert_running(self):
if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4
if self.returncode is None:
# Pass _deadstate only if it has been set, there is no _deadstate
# parameter in Python 2.6
if _deadstate is None: ret = subprocess.Popen.poll(self)
else: ret = subprocess.Popen.poll(self, _deadstate)
if (ret != -1):
self.returncode = ret
self._cleanup()
return self.returncode
def wait(self):
if self.returncode is None:
self.drain()
try: self.returncode = subprocess.Popen.wait(self)
except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
self._cleanup()
return self.returncode
def terminate(self):
try: subprocess.Popen.terminate(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
def kill(self):
try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGKILL)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
def cmd_str(self): return " ".join([str(s) for s in self.cmd])
def checkenv(name):
value = os.getenv(name)
if not value: raise Exception("Environment variable %s is not set" % name)
return value
def find_in_file(str, filename):
if not os.path.exists(filename): return False
f = open(filename)
try: return str in f.read()
finally: f.close()
class Broker(Popen):
"A broker process. Takes care of start, stop and logging."
_broker_count = 0
def __str__(self): return "Broker<%s %s>"%(self.name, self.pname)
def find_log(self):
self.log = "%s.log" % self.name
i = 1
while (os.path.exists(self.log)):
self.log = "%s-%d.log" % (self.name, i)
i += 1
def get_log(self):
return os.path.abspath(self.log)
def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
"""Start a broker daemon. name determines the data-dir and log
file names."""
self.test = test
self._port=port
if BrokerTest.store_lib:
args = args + ['--load-module', BrokerTest.store_lib]
if BrokerTest.sql_store_lib:
args = args + ['--load-module', BrokerTest.sql_store_lib]
args = args + ['--catalog', BrokerTest.sql_catalog]
if BrokerTest.sql_clfs_store_lib:
args = args + ['--load-module', BrokerTest.sql_clfs_store_lib]
args = args + ['--catalog', BrokerTest.sql_catalog]
cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir"] + args
if not "--auth" in args: cmd.append("--auth=no")
if wait != None:
cmd += ["--wait", str(wait)]
if name: self.name = name
else:
self.name = "broker%d" % Broker._broker_count
Broker._broker_count += 1
self.find_log()
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
if log_level != None:
cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
Popen.__init__(self, cmd, expect, drain=False)
test.cleanup_stop(self)
self._host = "127.0.0.1"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
self._log_ready = False
def host(self): return self._host
def port(self):
# Read port from broker process stdout if not already read.
if (self._port == 0):
try: self._port = int(self.stdout.readline())
except ValueError:
raise Exception("Can't get port for broker %s (%s)%s" %
(self.name, self.pname, error_line(self.log,5)))
return self._port
def unexpected(self,msg):
raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
def connect(self, **kwargs):
"""New API connection to the broker."""
return messaging.Connection.establish(self.host_port(), **kwargs)
def connect_old(self):
"""Old API connection to the broker."""
socket = qpid.util.connect(self.host(),self.port())
connection = qpid.connection.Connection (sock=socket)
connection.start()
return connection;
def declare_queue(self, queue):
c = self.connect_old()
s = c.session(str(qpid.datatypes.uuid4()))
s.queue_declare(queue=queue)
c.close()
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
if xprops != None: s += ", x-declare:{" + xprops + "}"
return s + "}}"
def send_message(self, queue, message, durable=True, xprops=None, session=None):
if session == None:
s = self.connect().session()
else:
s = session
s.sender(self._prep_sender(queue, durable, xprops)).send(message)
if session == None:
s.connection.close()
def send_messages(self, queue, messages, durable=True, xprops=None, session=None):
if session == None:
s = self.connect().session()
else:
s = session
sender = s.sender(self._prep_sender(queue, durable, xprops))
for m in messages: sender.send(m)
if session == None:
s.connection.close()
def get_message(self, queue):
s = self.connect().session()
m = s.receiver(queue+"; {create:always}", capacity=1).fetch(timeout=1)
s.acknowledge()
s.connection.close()
return m
def get_messages(self, queue, n):
s = self.connect().session()
receiver = s.receiver(queue+"; {create:always}", capacity=n)
m = [receiver.fetch(timeout=1) for i in range(n)]
s.acknowledge()
s.connection.close()
return m
def host_port(self): return "%s:%s" % (self.host(), self.port())
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
if self._log_ready: return True
self._log_ready = find_in_file("notice Broker running", self.log)
def ready(self, **kwargs):
"""Wait till broker is ready to serve clients"""
# First make sure the broker is listening by checking the log.
if not retry(self.log_ready, timeout=30):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
# Create a connection and a session. For a cluster broker this will
# return after cluster init has finished.
try:
c = self.connect(**kwargs)
try: c.session()
finally: c.close()
except: raise RethrownException(
"Broker %s failed ready test%s"%(self.name,error_line(self.log, 5)))
def store_state(self):
uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
null_uuid="00000000-0000-0000-0000-000000000000\n"
if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
if uuids[0] == null_uuid: return "empty"
if uuids[1] == null_uuid: return "dirty"
return "clean"
class Cluster:
"""A cluster of brokers in a test."""
_cluster_count = 0
def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
Cluster._cluster_count += 1
# Use unique cluster name
self.args = copy(args)
self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
self.args += [ "--load-module", BrokerTest.cluster_lib ]
self.start_n(count, expect=expect, wait=wait)
def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
return self._brokers[-1]
def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
for i in range(count): self.start(expect=expect, wait=wait, args=args)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
def __getitem__(self,index): return self._brokers[index]
def __iter__(self): return self._brokers.__iter__()
class BrokerTest(TestCase):
"""
Tracks processes started by test and kills at end of test.
Provides a well-known working directory for each test.
"""
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
cluster_lib = os.getenv("CLUSTER_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
receiver_exec = os.getenv("RECEIVER_EXEC")
sender_exec = os.getenv("SENDER_EXEC")
sql_store_lib = os.getenv("STORE_SQL_LIB")
sql_clfs_store_lib = os.getenv("STORE_SQL_CLFS_LIB")
sql_catalog = os.getenv("STORE_CATALOG")
store_lib = os.getenv("STORE_LIB")
test_store_lib = os.getenv("TEST_STORE_LIB")
rootdir = os.getcwd()
def configure(self, config): self.config=config
def setUp(self):
outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
os.makedirs(self.dir)
os.chdir(self.dir)
self.stopem = [] # things to stop at end of test
def tearDown(self):
err = []
for p in self.stopem:
try: p.stop()
except Exception, e: err.append(str(e))
self.stopem = [] # reset in case more processes start
os.chdir(self.rootdir)
if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
def cleanup_stop(self, stopable):
"""Call thing.stop at end of test"""
self.stopem.append(stopable)
def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
p = Popen(cmd, expect, drain)
self.cleanup_stop(p)
return p
def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
"""Create and return a broker ready for use"""
b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
"""Create and return a cluster ready for use"""
cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
def assert_browse(self, session, queue, expect_contents, timeout=0):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
r = session.receiver("%s;{mode:browse}"%(queue))
actual_contents = []
try:
for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
except messaging.Empty: pass
r.close()
self.assertEqual(expect_contents, actual_contents)
class RethrownException(Exception):
"""Captures the stack trace of the current exception to be thrown later"""
def __init__(self, msg=""):
Exception.__init__(self, msg+"\n"+format_exc())
class StoppableThread(Thread):
"""
Base class for threads that do something in a loop and periodically check
to see if they have been stopped.
"""
def __init__(self):
self.stopped = False
self.error = None
Thread.__init__(self)
def stop(self):
self.stopped = True
self.join()
if self.error: raise self.error
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
def __init__(self, broker, max_depth=None, queue="test-queue"):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
"""
Thread.__init__(self)
self.sender = broker.test.popen(
["qpid-send",
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
"--content-stdin"
],
expect=EXPECT_RUNNING)
self.condition = Condition()
self.max = max_depth
self.received = 0
self.stopped = False
self.error = None
def write_message(self, n):
self.sender.stdin.write(str(n)+"\n")
self.sender.stdin.flush()
def run(self):
try:
self.sent = 0
while not self.stopped:
if self.max:
self.condition.acquire()
while not self.stopped and self.sent - self.received > self.max:
self.condition.wait()
self.condition.release()
self.write_message(self.sent)
self.sent += 1
except Exception: self.error = RethrownException(self.sender.pname)
def notify_received(self, count):
"""Called by receiver to enable flow control. count = messages received so far."""
self.condition.acquire()
self.received = count
self.condition.notify()
self.condition.release()
def stop(self):
self.condition.acquire()
try:
self.stopped = True
self.condition.notify()
finally: self.condition.release()
self.join()
self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
class NumberedReceiver(Thread):
"""
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
def __init__(self, broker, sender = None, queue="test-queue"):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
Thread.__init__(self)
self.test = broker.test
self.receiver = self.test.popen(
["qpid-receive",
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
"--forever"
],
expect=EXPECT_RUNNING,
drain=False)
self.lock = Lock()
self.error = None
self.sender = sender
def read_message(self):
return int(self.receiver.stdout.readline())
def run(self):
try:
self.received = 0
m = self.read_message()
while m != -1:
assert(m <= self.received) # Check for missing messages
if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
self.sender.notify_received(self.received)
m = self.read_message()
except Exception:
self.error = RethrownException(self.receiver.pname)
def stop(self):
"""Returns when termination message is received"""
self.join()
if self.error: raise self.error
class ErrorGenerator(StoppableThread):
"""
Thread that continuously generates errors by trying to consume from
a non-existent queue. For cluster regression tests, error handling
caused issues in the past.
"""
def __init__(self, broker):
StoppableThread.__init__(self)
self.broker=broker
broker.test.cleanup_stop(self)
self.start()
def run(self):
c = self.broker.connect_old()
try:
while not self.stopped:
try:
c.session(str(qpid.datatypes.uuid4())).message_subscribe(
queue="non-existent-queue")
assert(False)
except qpid.session.SessionException: pass
time.sleep(0.01)
except: pass # Normal if broker is killed.
def import_script(path):
"""
Import executable script at path as a module.
Requires some trickery as scripts are not in standard module format
"""
f = open(path)
try:
name=os.path.split(path)[1].replace("-","_")
return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
finally: f.close()
cluster_mgmt_674338/testagent.cpp 0000644 0025700 0000047 00000012773 11504214154 016130 0 ustar aconway ais /*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include
#include
#include
#include
#include
#include "qmf/org/apache/qpid/agent/example/Parent.h"
#include "qmf/org/apache/qpid/agent/example/Child.h"
#include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h"
#include "qmf/org/apache/qpid/agent/example/EventChildCreated.h"
#include "qmf/org/apache/qpid/agent/example/Package.h"
#include
#include
#include
#include
static bool running = true;
using namespace std;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::Mutex;
namespace _qmf = qmf::org::apache::qpid::agent::example;
class ChildClass;
//==============================================================
// CoreClass is the operational class that corresponds to the
// "Parent" class in the management schema.
//==============================================================
class CoreClass : public Manageable
{
string name;
ManagementAgent* agent;
_qmf::Parent* mgmtObject;
std::vector children;
Mutex vectorLock;
public:
CoreClass(ManagementAgent* agent, string _name);
~CoreClass() { mgmtObject->resourceDestroy(); }
ManagementObject* GetManagementObject(void) const
{ return mgmtObject; }
void doLoop();
status_t ManagementMethod (uint32_t methodId, Args& args, string& text);
};
class ChildClass : public Manageable
{
string name;
_qmf::Child* mgmtObject;
public:
ChildClass(ManagementAgent* agent, CoreClass* parent, string name);
~ChildClass() { mgmtObject->resourceDestroy(); }
ManagementObject* GetManagementObject(void) const
{ return mgmtObject; }
void doWork()
{
mgmtObject->inc_count(2);
}
};
CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent)
{
static uint64_t persistId = 0x111222333444555LL;
mgmtObject = new _qmf::Parent(agent, this, name);
agent->addObject(mgmtObject, persistId++);
mgmtObject->set_state("IDLE");
}
void CoreClass::doLoop()
{
// Periodically bump a counter to provide a changing statistical value
while (running) {
qpid::sys::sleep(1);
mgmtObject->inc_count();
mgmtObject->set_state("IN_LOOP");
{
Mutex::ScopedLock _lock(vectorLock);
for (std::vector::iterator iter = children.begin();
iter != children.end();
iter++) {
(*iter)->doWork();
}
}
}
}
Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, string& /*text*/)
{
Mutex::ScopedLock _lock(vectorLock);
switch (methodId) {
case _qmf::Parent::METHOD_CREATE_CHILD:
_qmf::ArgsParentCreate_child& ioArgs = (_qmf::ArgsParentCreate_child&) args;
ChildClass *child = new ChildClass(agent, this, ioArgs.i_name);
ioArgs.o_childRef = child->GetManagementObject()->getObjectId();
children.push_back(child);
agent->raiseEvent(_qmf::EventChildCreated(ioArgs.i_name));
return STATUS_OK;
}
return STATUS_NOT_IMPLEMENTED;
}
ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name)
{
mgmtObject = new _qmf::Child(agent, this, parent, name);
agent->addObject(mgmtObject);
}
//==============================================================
// Main program
//==============================================================
ManagementAgent::Singleton* singleton;
void shutdown(int)
{
running = false;
}
int main_int(int argc, char** argv)
{
singleton = new ManagementAgent::Singleton();
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
signal(SIGINT, shutdown);
// Create the qmf management agent
ManagementAgent* agent = singleton->getInstance();
// Register the Qmf_example schema with the agent
_qmf::Package packageInit(agent);
// Start the agent. It will attempt to make a connection to the
// management broker
agent->init(host, port, 5, false, ".magentdata");
// Allocate some core objects
CoreClass core1(agent, "Example Core Object #1");
CoreClass core2(agent, "Example Core Object #2");
CoreClass core3(agent, "Example Core Object #3");
core1.doLoop();
// done, cleanup and exit
delete singleton;
return 0;
}
int main(int argc, char** argv)
{
try {
return main_int(argc, argv);
} catch(std::exception& e) {
cerr << "Top Level Exception: " << e.what() << endl;
return 1;
}
}
cluster_mgmt_674338/qpid-python-test 0000755 0025700 0000047 00000035156 11504214155 016606 0 ustar aconway ais #!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# TODO: summarize, test harness preconditions (e.g. broker is alive)
import logging, optparse, os, struct, sys, time, traceback, types
from fnmatch import fnmatchcase as match
from getopt import GetoptError
from logging import getLogger, StreamHandler, Formatter, Filter, \
WARN, DEBUG, ERROR
from qpid.harness import Skipped
from qpid.util import URL
levels = {
"DEBUG": DEBUG,
"WARN": WARN,
"ERROR": ERROR
}
sorted_levels = [(v, k) for k, v in levels.items()]
sorted_levels.sort()
sorted_levels = [v for k, v in sorted_levels]
parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
description="Run tests matching the specified PATTERNs.")
parser.add_option("-l", "--list", action="store_true", default=False,
help="list tests instead of executing them")
parser.add_option("-b", "--broker", default="localhost",
help="run tests against BROKER (default %default)")
parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE")
parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN",
help="only display log messages of LEVEL or higher severity: "
"%s (default %%default)" % ", ".join(sorted_levels))
parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append",
dest="log_categories", default=[],
help="log only categories matching CATEGORY pattern")
parser.add_option("-m", "--module", action="append", default=[],
dest="modules", help="add module to test search path")
parser.add_option("-i", "--ignore", action="append", default=[],
help="ignore tests matching IGNORE pattern")
parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append",
default=[],
help="ignore tests matching patterns in IFILE")
parser.add_option("-H", "--halt-on-error", action="store_true", default=False,
dest="hoe", help="halt if an error is encountered")
parser.add_option("-t", "--time", action="store_true", default=False,
help="report timing information on test run")
parser.add_option("-D", "--define", metavar="DEFINE", dest="defines",
action="append", default=[], help="define test parameters")
class Config:
def __init__(self):
self.broker = URL("localhost")
self.defines = {}
self.log_file = None
self.log_level = WARN
self.log_categories = []
opts, args = parser.parse_args()
includes = []
excludes = ["*__*__"]
config = Config()
list_only = opts.list
config.broker = URL(opts.broker)
for d in opts.defines:
try:
idx = d.index("=")
name = d[:idx]
value = d[idx+1:]
config.defines[name] = value
except ValueError:
config.defines[d] = None
config.log_file = opts.log_file
config.log_level = levels[opts.log_level.upper()]
config.log_categories = opts.log_categories
excludes.extend([v.strip() for v in opts.ignore])
for v in opts.ignore_file:
f = open(v)
for line in f:
line = line.strip()
if line.startswith("#"):
continue
excludes.append(line)
f.close()
for a in args:
includes.append(a.strip())
if not includes:
if opts.modules:
includes.append("*")
else:
includes.extend(["qpid.tests.*"])
def is_ignored(path):
for p in excludes:
if match(path, p):
return True
return False
def is_included(path):
if is_ignored(path):
return False
for p in includes:
if match(path, p):
return True
return False
def is_smart():
return sys.stdout.isatty() and os.environ.get("TERM", "dumb") != "dumb"
try:
import fcntl, termios
def width():
if is_smart():
s = struct.pack("HHHH", 0, 0, 0, 0)
fd_stdout = sys.stdout.fileno()
x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s)
rows, cols, xpx, ypx = struct.unpack("HHHH", x)
return cols
else:
try:
return int(os.environ.get("COLUMNS", "80"))
except ValueError:
return 80
WIDTH = width()
def resize(sig, frm):
global WIDTH
WIDTH = width()
import signal
signal.signal(signal.SIGWINCH, resize)
except ImportError:
WIDTH = 80
def vt100_attrs(*attrs):
return "\x1B[%sm" % ";".join(map(str, attrs))
vt100_reset = vt100_attrs(0)
KEYWORDS = {"pass": (32,),
"skip": (33,),
"fail": (31,),
"start": (34,),
"total": (34,),
"ignored": (33,),
"selected": (34,),
"elapsed": (34,),
"average": (34,)}
COLORIZE = is_smart()
def colorize_word(word, text=None):
if text is None:
text = word
return colorize(text, *KEYWORDS.get(word, ()))
def colorize(text, *attrs):
if attrs and COLORIZE:
return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset)
else:
return text
def indent(text):
lines = text.split("\n")
return " %s" % "\n ".join(lines)
class Interceptor:
def __init__(self):
self.newline = False
self.indent = False
self.passthrough = True
self.dirty = False
self.last = None
def begin(self):
self.newline = True
self.indent = True
self.passthrough = False
self.dirty = False
self.last = None
def reset(self):
self.newline = False
self.indent = False
self.passthrough = True
class StreamWrapper:
def __init__(self, interceptor, stream, prefix=" "):
self.interceptor = interceptor
self.stream = stream
self.prefix = prefix
def fileno(self):
return self.stream.fileno()
def isatty(self):
return self.stream.isatty()
def write(self, s):
if self.interceptor.passthrough:
self.stream.write(s)
return
if s:
self.interceptor.dirty = True
if self.interceptor.newline:
self.interceptor.newline = False
self.stream.write(" %s\n" % colorize_word("start"))
self.interceptor.indent = True
if self.interceptor.indent:
self.stream.write(self.prefix)
if s.endswith("\n"):
s = s.replace("\n", "\n%s" % self.prefix)[:-2]
self.interceptor.indent = True
else:
s = s.replace("\n", "\n%s" % self.prefix)
self.interceptor.indent = False
self.stream.write(s)
if s:
self.interceptor.last = s[-1]
def flush(self):
self.stream.flush()
interceptor = Interceptor()
out_wrp = StreamWrapper(interceptor, sys.stdout)
err_wrp = StreamWrapper(interceptor, sys.stderr)
out = sys.stdout
err = sys.stderr
sys.stdout = out_wrp
sys.stderr = err_wrp
class PatternFilter(Filter):
def __init__(self, *patterns):
Filter.__init__(self, patterns)
self.patterns = patterns
def filter(self, record):
if not self.patterns:
return True
for p in self.patterns:
if match(record.name, p):
return True
return False
root = getLogger()
handler = StreamHandler(sys.stdout)
filter = PatternFilter(*config.log_categories)
handler.addFilter(filter)
handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s"))
root.addHandler(handler)
root.setLevel(WARN)
log = getLogger("qpid.test")
PASS = "pass"
SKIP = "skip"
FAIL = "fail"
class Runner:
def __init__(self):
self.exceptions = []
self.skip = False
def passed(self):
return not self.exceptions
def skipped(self):
return self.skip
def failed(self):
return self.exceptions and not self.skip
def halt(self):
return self.exceptions or self.skip
def run(self, name, phase):
try:
phase()
except KeyboardInterrupt:
raise
except:
exi = sys.exc_info()
if issubclass(exi[0], Skipped):
self.skip = True
self.exceptions.append((name, exi))
def status(self):
if self.passed():
return PASS
elif self.skipped():
return SKIP
elif self.failed():
return FAIL
else:
return None
def print_exceptions(self):
for name, info in self.exceptions:
if issubclass(info[0], Skipped):
print indent("".join(traceback.format_exception_only(*info[:2]))).rstrip()
else:
print "Error during %s:" % name
print indent("".join(traceback.format_exception(*info))).rstrip()
ST_WIDTH = 8
def run_test(name, test, config):
patterns = filter.patterns
level = root.level
filter.patterns = config.log_categories
root.setLevel(config.log_level)
parts = name.split(".")
line = None
output = ""
for part in parts:
if line:
if len(line) + len(part) >= (WIDTH - ST_WIDTH - 1):
output += "%s. \\\n" % line
line = " %s" % part
else:
line = "%s.%s" % (line, part)
else:
line = part
if line:
output += "%s %s" % (line, (((WIDTH - ST_WIDTH) - len(line))*"."))
sys.stdout.write(output)
sys.stdout.flush()
interceptor.begin()
try:
runner = test()
finally:
interceptor.reset()
if interceptor.dirty:
if interceptor.last != "\n":
sys.stdout.write("\n")
sys.stdout.write(output)
print " %s" % colorize_word(runner.status())
if runner.failed() or runner.skipped():
runner.print_exceptions()
root.setLevel(level)
filter.patterns = patterns
return runner.status()
class FunctionTest:
def __init__(self, test):
self.test = test
def name(self):
return "%s.%s" % (self.test.__module__, self.test.__name__)
def run(self):
return run_test(self.name(), self._run, config)
def _run(self):
runner = Runner()
runner.run("test", lambda: self.test(config))
return runner
def __repr__(self):
return "FunctionTest(%r)" % self.test
class MethodTest:
def __init__(self, cls, method):
self.cls = cls
self.method = method
def name(self):
return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method)
def run(self):
return run_test(self.name(), self._run, config)
def _run(self):
runner = Runner()
inst = self.cls(self.method)
test = getattr(inst, self.method)
if hasattr(inst, "configure"):
runner.run("configure", lambda: inst.configure(config))
if runner.halt(): return runner
if hasattr(inst, "setUp"):
runner.run("setup", inst.setUp)
if runner.halt(): return runner
elif hasattr(inst, "setup"):
runner.run("setup", inst.setup)
if runner.halt(): return runner
runner.run("test", test)
if hasattr(inst, "tearDown"):
runner.run("teardown", inst.tearDown)
elif hasattr(inst, "teardown"):
runner.run("teardown", inst.teardown)
return runner
def __repr__(self):
return "MethodTest(%r, %r)" % (self.cls, self.method)
class PatternMatcher:
def __init__(self, *patterns):
self.patterns = patterns
def matches(self, name):
for p in self.patterns:
if match(name, p):
return True
return False
class FunctionScanner(PatternMatcher):
def inspect(self, obj):
return type(obj) == types.FunctionType and self.matches(name)
def descend(self, func):
# the None is required for older versions of python
return; yield None
def extract(self, func):
yield FunctionTest(func)
class ClassScanner(PatternMatcher):
def inspect(self, obj):
return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__)
def descend(self, cls):
# the None is required for older versions of python
return; yield None
def extract(self, cls):
names = dir(cls)
names.sort()
for name in names:
obj = getattr(cls, name)
t = type(obj)
if t == types.MethodType and name.startswith("test"):
yield MethodTest(cls, name)
class ModuleScanner:
def inspect(self, obj):
return type(obj) == types.ModuleType
def descend(self, obj):
names = dir(obj)
names.sort()
for name in names:
yield getattr(obj, name)
def extract(self, obj):
# the None is required for older versions of python
return; yield None
class Harness:
def __init__(self):
self.scanners = [
ModuleScanner(),
ClassScanner("*Test", "*Tests", "*TestCase"),
FunctionScanner("test_*")
]
self.tests = []
self.scanned = []
def scan(self, *roots):
objects = list(roots)
while objects:
obj = objects.pop(0)
for s in self.scanners:
if s.inspect(obj):
self.tests.extend(s.extract(obj))
for child in s.descend(obj):
if not (child in self.scanned or child in objects):
objects.append(child)
self.scanned.append(obj)
modules = opts.modules
if not modules:
modules.extend(["qpid.tests"])
h = Harness()
for name in modules:
m = __import__(name, None, None, ["dummy"])
h.scan(m)
filtered = [t for t in h.tests if is_included(t.name())]
ignored = [t for t in h.tests if is_ignored(t.name())]
total = len(filtered) + len(ignored)
passed = 0
failed = 0
skipped = 0
start = time.time()
for t in filtered:
if list_only:
print t.name()
else:
st = t.run()
if st == PASS:
passed += 1
elif st == SKIP:
skipped += 1
elif st == FAIL:
failed += 1
if opts.hoe:
break
end = time.time()
run = passed + failed
if not list_only:
if passed:
_pass = "pass"
else:
_pass = "fail"
if failed:
outcome = "fail"
else:
outcome = "pass"
if ignored:
ign = "ignored"
else:
ign = "pass"
if skipped:
skip = "skip"
else:
skip = "pass"
print colorize("Totals:", 1),
totals = [colorize_word("total", "%s tests" % total),
colorize_word(_pass, "%s passed" % passed),
colorize_word(skip, "%s skipped" % skipped),
colorize_word(ign, "%s ignored" % len(ignored)),
colorize_word(outcome, "%s failed" % failed)]
print ", ".join(totals),
if opts.hoe and failed > 0:
print " -- (halted after %s)" % run
else:
print
if opts.time and run > 0:
print colorize("Timing:", 1),
timing = [colorize_word("elapsed", "%.2fs elapsed" % (end - start)),
colorize_word("average", "%.2fs average" % ((end - start)/run))]
print ", ".join(timing)
if failed or skipped:
sys.exit(1)
else:
sys.exit(0)
cluster_mgmt_674338/runme.sh 0000775 0025700 0000047 00000001045 11522326554 015113 0 ustar aconway ais #!/bin/sh
# Run cluster management tests on installed qpid.
#
# Required RPMS:
#
# qpid-cpp-server-cluster
# qpid-cpp-server
# python-qpid
# qpid-tools
# qpid-cpp-client
# qpid-cpp-client-devel
# qmf
# qmf-devel
#
make # Build testagent
export QPIDD_EXEC=/usr/sbin/qpidd
export QPID_CLUSTER_EXEC=/usr/bin/qpid-cluster
export CLUSTER_LIB=/usr/lib64/qpid/daemon/cluster.so
export PATH=$PATH:$PWD
OUT=brokertest.tmp
rm -rf $OUT
while ./qpid-python-test -DOUTDIR=$OUT -m cluster_tests *Long*test_management* -DDURATION=4; do
rm -rf $OUT
done
cluster_mgmt_674338/cluster_test_logs.py 0000755 0025700 0000047 00000010633 11522075074 017546 0 ustar aconway ais #!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Functions for comparing broker log files, used by cluster_tests.py.
import os, os.path, re, glob
from itertools import izip
def split_log(log):
"""Split a broker log at checkpoints where a member joins.
Return the set of checkpoints discovered."""
checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:")
outfile = None
checkpoints = []
for l in open(log):
match = checkpoint_re.search(l)
if match:
checkpoint = match.groups()[0]
checkpoints.append(checkpoint)
if outfile: outfile.close()
outfile = open("%s.%s"%(log, checkpoint), 'w')
if outfile: outfile.write(l)
if outfile: outfile.close()
return checkpoints
def filter_log(log):
"""Filter the contents of a log file to remove data that is expected
to differ between brokers in a cluster. Filtered log contents between
the same checkpoints should match across the cluster."""
out = open("%s.filter"%(log), 'w')
# Lines to skip entirely, expected differences
skip = "|".join([
'local connection', # Only on local broker
'UPDATER|UPDATEE', # Ignore update process
'stall for update|unstall, ignore update|cancelled offer .* unstall',
'caught up',
'active for links|Passivating links|Activating links',
'info Connection.* connected to', # UpdateClient connection
'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
'warning CLOSING .* unsent data',
'Inter-broker link ',
'Running in a cluster, marking store'
])
skip_re = re.compile(skip)
# Regex to match a UUID
uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
# Substitutions to remove expected differences
subs = [
(r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d ', ''), # Remove timestamp
(r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id
(r' local\)| shadow\)', ')'), # Remove local/shadow indication
(r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready.
(r'OFFER', 'READY'), # Treat offer as equivalent to ready.
# System UUID expected to be different
(r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'),
# TODO aconway 2010-12-20: review if these should be expected:
(r' len=\d+', ' len=NN'), # buffer lengths
(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name
(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs
]
for l in open(log):
if skip_re.search(l): continue
for pattern,subst in subs: l = re.sub(pattern,subst,l)
out.write(l)
out.close()
def verify_logs():
"""Compare log files from cluster brokers, verify that they correspond correctly."""
for l in glob.glob("*.log"): filter_log(l)
checkpoints = set()
for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l)))
errors=[]
for c in checkpoints:
fragments = glob.glob("*.filter.%s"%(c))
fragments.sort(reverse=True, key=os.path.getsize)
while len(fragments) >= 2:
a = fragments.pop(0)
b = fragments[0]
for ab in izip(open(a), open(b)):
if ab[0] != ab[1]:
errors.append("\n %s %s"%(a, b))
break
if errors:
raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors))
# Can be run as a script.
if __name__ == "__main__":
verify_logs()