Java源码示例:com.alibaba.otter.canal.store.model.Event

示例1
protected boolean isPermit(Event event, long state) {
    if (txState.intValue() == 1 && inTransaction.get()) { // 如果处于事务中,直接允许通过。因为事务头已经做过判断
        return true;
    } else if (txState.intValue() == 0) {
        boolean result = super.isPermit(event, state);
        if (result) {
            // 可能第一条送过来的数据不为Begin,需要做判断处理,如果非事务,允许直接通过,比如DDL语句
            if (isTransactionBegin(event)) {
                if (txState.compareAndSet(0, 1)) {
                    inTransaction.set(true);
                    return true; // 事务允许通过
                }
            } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
                // 当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
                return true; // DDL/DCL/TransactionEnd允许通过
            }
        }
    }

    return false;
}
 
示例2
@Test
public void testOnePutOneGet() {
    MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
    eventStore.setBatchMode(BatchMode.MEMSIZE);
    eventStore.start();

    boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
    Assert.assertTrue(result);

    Position position = eventStore.getFirstPosition();
    Events<Event> entrys = eventStore.tryGet(position, 1);
    Assert.assertTrue(entrys.getEvents().size() == 1);
    Assert.assertEquals(position, entrys.getPositionRange().getStart());
    Assert.assertEquals(position, entrys.getPositionRange().getEnd());

    eventStore.stop();
}
 
示例3
@Test
public void testOnePutOneGet() {
    MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
    eventStore.start();

    boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
    Assert.assertTrue(result);

    Position position = eventStore.getFirstPosition();
    Events<Event> entrys = eventStore.tryGet(position, 1);
    Assert.assertTrue(entrys.getEvents().size() == 1);
    Assert.assertEquals(position, entrys.getPositionRange().getStart());
    Assert.assertEquals(position, entrys.getPositionRange().getEnd());

    eventStore.stop();
}
 
示例4
public boolean tryPut(List<Event> datas) throws CanalStoreException {
    System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
    for (Event data : datas) {

        Date date = new Date(data.getExecuteTime());
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
            // System.out.println(MessageFormat.format(messgae, new Object[]
            // { Thread.currentThread().getName(),
            // header.getLogfilename(), header.getLogfileoffset(),
            // format.format(date),
            // data.getEntry().getEntryType(), "" }));
            System.out.println(data.getEntryType());

        } else {
            System.out.println(MessageFormat.format(messgae,
                new Object[] { Thread.currentThread().getName(), data.getJournalName(),
                        String.valueOf(data.getPosition()), format.format(date) }));
        }

    }
    System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
    return true;
}
 
示例5
/**
 * 根据不同的参数,选择不同的方式获取数据
 */
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
                                TimeUnit unit) {
    if (timeout == null) {
        return eventStore.tryGet(start, batchSize);
    } else {
        try {
            if (timeout <= 0) {
                return eventStore.get(start, batchSize);
            } else {
                return eventStore.get(start, batchSize, timeout, unit);
            }
        } catch (Exception e) {
            throw new CanalServerException(e);
        }
    }
}
 
示例6
public void put(List<Event> datas) throws InterruptedException, CanalStoreException {
    for (Event data : datas) {
        Date date = new Date(data.getExecuteTime());
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
            // System.out.println(MessageFormat.format(messgae, new Object[]
            // { Thread.currentThread().getName(),
            // header.getLogfilename(), header.getLogfileoffset(),
            // format.format(date),
            // data.getEntry().getEntryType(), "" }));
            System.out.println(data.getEntryType());

        } else {
            System.out.println(MessageFormat.format(messgae,
                new Object[] { Thread.currentThread().getName(), data.getJournalName(),
                        String.valueOf(data.getPosition()), format.format(date) }));
        }
    }
}
 
示例7
public boolean put(List<Event> datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
    for (Event data : datas) {
        Date date = new Date(data.getExecuteTime());
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
            // System.out.println(MessageFormat.format(messgae, new Object[]
            // { Thread.currentThread().getName(),
            // header.getLogfilename(), header.getLogfileoffset(),
            // format.format(date),
            // data.getEntry().getEntryType(), "" }));
            System.out.println(data.getEntryType());

        } else {
            System.out.println(MessageFormat.format(messgae,
                new Object[] { Thread.currentThread().getName(), data.getJournalName(),
                        String.valueOf(data.getPosition()), format.format(date) }));
        }
    }
    return true;
}
 
示例8
public boolean tryPut(List<Event> datas) throws CanalStoreException {
    System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
    for (Event data : datas) {

        Date date = new Date(data.getExecuteTime());
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
            // System.out.println(MessageFormat.format(messgae, new Object[]
            // { Thread.currentThread().getName(),
            // header.getLogfilename(), header.getLogfileoffset(),
            // format.format(date),
            // data.getEntry().getEntryType(), "" }));
            System.out.println(data.getEntryType());

        } else {
            System.out.println(MessageFormat.format(messgae,
                new Object[] { Thread.currentThread().getName(), data.getJournalName(),
                        String.valueOf(data.getPosition()), format.format(date) }));
        }

    }
    System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
    return true;
}
 
示例9
/**
 * 根据entry创建对应的Position对象
 */
public static LogPosition createPosition(Event event) {
    EntryPosition position = new EntryPosition();
    position.setJournalName(event.getJournalName());
    position.setPosition(event.getPosition());
    position.setTimestamp(event.getExecuteTime());
    // add serverId at 2016-06-28
    position.setServerId(event.getServerId());
    // add gtid
    position.setGtid(event.getGtid());

    LogPosition logPosition = new LogPosition();
    logPosition.setPostion(position);
    logPosition.setIdentity(event.getLogIdentity());
    return logPosition;
}
 
示例10
public void put(List<Event> data) throws InterruptedException, CanalStoreException {
    if (data == null || data.isEmpty()) {
        return;
    }

    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (!checkFreeSlotAt(putSequence.get() + data.size())) { // 检查是否有空位
                notFull.await(); // wait until not full
            }
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        doPut(data);
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
    } finally {
        lock.unlock();
    }
}
 
示例11
public boolean tryPut(List<Event> data) throws CanalStoreException {
    if (data == null || data.isEmpty()) {
        return true;
    }

    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (!checkFreeSlotAt(putSequence.get() + data.size())) {
            return false;
        } else {
            doPut(data);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
 
示例12
public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (!checkUnGetSlotAt((LogPosition) start, batchSize))
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }

        return doGet(start, batchSize);
    } finally {
        lock.unlock();
    }
}
 
示例13
/**
 * 根据entry创建对应的Position对象
 */
public static LogPosition createPosition(Event event) {
    EntryPosition position = new EntryPosition();
    position.setJournalName(event.getJournalName());
    position.setPosition(event.getPosition());
    position.setTimestamp(event.getExecuteTime());
    // add serverId at 2016-06-28
    position.setServerId(event.getServerId());
    // add gtid
    position.setGtid(event.getGtid());

    LogPosition logPosition = new LogPosition();
    logPosition.setPostion(position);
    logPosition.setIdentity(event.getLogIdentity());
    return logPosition;
}
 
示例14
@Test
public void testOnePutOneGet() {
    MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
    eventStore.setBatchMode(BatchMode.MEMSIZE);
    eventStore.start();

    boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
    Assert.assertTrue(result);

    Position position = eventStore.getFirstPosition();
    Events<Event> entrys = eventStore.tryGet(position, 1);
    Assert.assertTrue(entrys.getEvents().size() == 1);
    Assert.assertEquals(position, entrys.getPositionRange().getStart());
    Assert.assertEquals(position, entrys.getPositionRange().getEnd());

    eventStore.stop();
}
 
示例15
@Test
public void testOnePutOneGet() {
    MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
    eventStore.start();

    boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
    Assert.assertTrue(result);

    Position position = eventStore.getFirstPosition();
    Events<Event> entrys = eventStore.tryGet(position, 1);
    Assert.assertTrue(entrys.getEvents().size() == 1);
    Assert.assertEquals(position, entrys.getPositionRange().getStart());
    Assert.assertEquals(position, entrys.getPositionRange().getEnd());

    eventStore.stop();
}
 
示例16
protected boolean isPermit(Event event, long state) {
    if (txState.intValue() == 1 && inTransaction.get()) { // 如果处于事务中,直接允许通过。因为事务头已经做过判断
        return true;
    } else if (txState.intValue() == 0) {
        boolean result = super.isPermit(event, state);
        if (result) {
            // 可能第一条送过来的数据不为Begin,需要做判断处理,如果非事务,允许直接通过,比如DDL语句
            if (isTransactionBegin(event)) {
                if (txState.compareAndSet(0, 1)) {
                    inTransaction.set(true);
                    return true; // 事务允许通过
                }
            } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
                return true; // DDL/DCL允许通过
            }
        }
    }

    return false;
}
 
示例17
protected boolean doSink(List<Event> events) {
    int size = events.size();
    for (int i = 0; i < events.size(); i++) {
        Event event = events.get(i);
        try {
            barrier.await(event);// 进行timeline的归并调度处理
            if (filterTransactionEntry) {
                super.doSink(Arrays.asList(event));
            } else if (i == size - 1) {
                // 针对事务数据,只有到最后一条数据都通过后,才进行sink操作,保证原子性
                // 同时批量sink,也要保证在最后一条数据释放状态之前写出数据,否则就有并发问题
                return super.doSink(events);
            }
        } catch (InterruptedException e) {
            return false;
        } finally {
            barrier.clear(event);
        }
    }

    return false;
}
 
示例18
protected boolean isPermit(Event event, long state) {
    if (txState.intValue() == 1 && inTransaction.get()) { // 如果处于事务中,直接允许通过。因为事务头已经做过判断
        return true;
    } else if (txState.intValue() == 0) {
        boolean result = super.isPermit(event, state);
        if (result) {
            // 可能第一条送过来的数据不为Begin,需要做判断处理,如果非事务,允许直接通过,比如DDL语句
            if (isTransactionBegin(event)) {
                if (txState.compareAndSet(0, 1)) {
                    inTransaction.set(true);
                    return true; // 事务允许通过
                }
            } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
                // 当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
                return true; // DDL/DCL/TransactionEnd允许通过
            }
        }
    }

    return false;
}
 
示例19
protected boolean doSink(List<Event> events) {
    int size = events.size();
    for (int i = 0; i < events.size(); i++) {
        Event event = events.get(i);
        try {
            barrier.await(event);// 进行timeline的归并调度处理
            if (filterTransactionEntry) {
                super.doSink(Arrays.asList(event));
            } else if (i == size - 1) {
                // 针对事务数据,只有到最后一条数据都通过后,才进行sink操作,保证原子性
                // 同时批量sink,也要保证在最后一条数据释放状态之前写出数据,否则就有并发问题
                return super.doSink(events);
            }
        } catch (InterruptedException e) {
            return false;
        } finally {
            barrier.clear(event);
        }
    }

    return false;
}
 
示例20
public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (!checkUnGetSlotAt((LogPosition) start, batchSize))
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }

        return doGet(start, batchSize);
    } finally {
        lock.unlock();
    }
}
 
示例21
/**
 * 根据不同的参数,选择不同的方式获取数据
 */
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
                                TimeUnit unit) {
    if (timeout == null) {
        return eventStore.tryGet(start, batchSize);
    } else {
        try {
            if (timeout <= 0) {
                return eventStore.get(start, batchSize);
            } else {
                return eventStore.get(start, batchSize, timeout, unit);
            }
        } catch (Exception e) {
            throw new CanalServerException(e);
        }
    }
}
 
示例22
public void put(List<Event> data) throws InterruptedException, CanalStoreException {
    if (data == null || data.isEmpty()) {
        return;
    }

    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (!checkFreeSlotAt(putSequence.get() + data.size())) { // 检查是否有空位
                notFull.await(); // wait until not full
            }
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        doPut(data);
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
    } finally {
        lock.unlock();
    }
}
 
示例23
public boolean put(List<Event> datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
    for (Event data : datas) {
        Date date = new Date(data.getExecuteTime());
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
            // System.out.println(MessageFormat.format(messgae, new Object[]
            // { Thread.currentThread().getName(),
            // header.getLogfilename(), header.getLogfileoffset(),
            // format.format(date),
            // data.getEntry().getEntryType(), "" }));
            System.out.println(data.getEntryType());

        } else {
            System.out.println(MessageFormat.format(messgae,
                new Object[] { Thread.currentThread().getName(), data.getJournalName(),
                        String.valueOf(data.getPosition()), format.format(date) }));
        }
    }
    return true;
}
 
示例24
public void await(Event event) throws InterruptedException {
    try {
        super.await(event);
    } catch (InterruptedException e) {
        // 出现线程中断,可能是因为关闭或者主备切换
        // 主备切换对应的事务尾会未正常发送,需要强制设置为事务结束,允许其他队列通过
        reset();
        throw e;
    }
}
 
示例25
public void await(Event event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
    try {
        super.await(event, timeout, unit);
    } catch (InterruptedException e) {
        // 出现线程中断,可能是因为关闭或者主备切换
        // 主备切换对应的事务尾会未正常发送,需要强制设置为事务结束,允许其他队列通过
        reset();
        throw e;
    }
}
 
示例26
/**
 * 判断自己的timestamp是否可以通过,带超时控制
 * 
 * @throws InterruptedException
 * @throws TimeoutException
 */
public void await(Event event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
    long timestamp = getTimestamp(event);
    try {
        lock.lockInterruptibly();
        single(timestamp);
        while (isPermit(event, timestamp) == false) {
            condition.await(timeout, unit);
        }
    } finally {
        lock.unlock();
    }
}
 
示例27
private void profiling(List<Event> events, OP op) {
    long localExecTime = 0L;
    int deltaRows = 0;
    if (events != null && !events.isEmpty()) {
        for (Event e : events) {
            if (localExecTime == 0 && e.getExecuteTime() > 0) {
                localExecTime = e.getExecuteTime();
            }
            deltaRows += e.getRowsCount();
        }
    }
    switch (op) {
        case PUT:
            putTableRows.addAndGet(deltaRows);
            if (localExecTime > 0) {
                putExecTime.lazySet(localExecTime);
            }
            break;
        case GET:
            getTableRows.addAndGet(deltaRows);
            if (localExecTime > 0) {
                getExecTime.lazySet(localExecTime);
            }
            break;
        case ACK:
            ackTableRows.addAndGet(deltaRows);
            if (localExecTime > 0) {
                ackExecTime.lazySet(localExecTime);
            }
            break;
        default:
            break;
    }
}
 
示例28
public void start() throws CanalStoreException {
    super.start();
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    indexMask = bufferSize - 1;
    entries = new Event[bufferSize];
}
 
示例29
public boolean put(List<Event> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
    if (data == null || data.isEmpty()) {
        return true;
    }

    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            if (checkFreeSlotAt(putSequence.get() + data.size())) {
                doPut(data);
                return true;
            }
            if (nanos <= 0) {
                return false;
            }

            try {
                nanos = notFull.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
        }
    } finally {
        lock.unlock();
    }
}
 
示例30
public Events<Event> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,
                                                                                    CanalStoreException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            if (checkUnGetSlotAt((LogPosition) start, batchSize)) {
                return doGet(start, batchSize);
            }

            if (nanos <= 0) {
                // 如果时间到了,有多少取多少
                return doGet(start, batchSize);
            }

            try {
                nanos = notEmpty.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }

        }
    } finally {
        lock.unlock();
    }
}